diff --git a/data/uking_functions.csv b/data/uking_functions.csv index 60c04e5d..6fe3f552 100644 --- a/data/uking_functions.csv +++ b/data/uking_functions.csv @@ -92621,16 +92621,16 @@ 0x00000071011f4394,uking::MethodTreeMgr::rtti1,288, 0x00000071011f44b4,uking::MethodTreeMgr::rtti2,92, 0x00000071011f4510,sub_71011F4510,140, -0x00000071011f459c,GlobalMessage::sb::ctor,424, -0x00000071011f4744,sub_71011F4744,268, -0x00000071011f4850,sub_71011F4850,52, -0x00000071011f4884,sub_71011F4884,20, -0x00000071011f4898,sub_71011F4898,440, -0x00000071011f4a50,nullsub_4667,4, +0x00000071011f459c,GlobalMessage::sb::ctor,424,_ZN4ksys17MessageDispatcher6QueuesC1EPNS_16MessageProcessor6LoggerE +0x00000071011f4744,sub_71011F4744,268,_ZN4ksys17MessageDispatcher6QueuesD1Ev +0x00000071011f4850,sub_71011F4850,52,_ZN4ksys17MessageDispatcher9MainQueue5clearEv +0x00000071011f4884,sub_71011F4884,20,_ZN4ksys17MessageDispatcher9MainQueueD1Ev +0x00000071011f4898,sub_71011F4898,440,_ZN4ksys17MessageDispatcher19DoubleBufferedQueueD1Ev +0x00000071011f4a50,nullsub_4667,4,_ZN4ksys17MessageDispatcher6Queues11DummyLoggerD1Ev 0x00000071011f4a54,nullsub_4668,4, -0x00000071011f4a58,sub_71011F4A58,72, -0x00000071011f4aa0,sub_71011F4AA0,112, -0x00000071011f4b10,nullsub_4669,4, +0x00000071011f4a58,sub_71011F4A58,72,_ZN4ksys17MessageDispatcher9MainQueue10addMessageERKNS_7MessageE +0x00000071011f4aa0,sub_71011F4AA0,112,_ZN4ksys17MessageDispatcher9MainQueue12processQueueERNS_16MessageProcessorE +0x00000071011f4b10,nullsub_4669,4,_ZN4ksys17MessageDispatcher6Queues11DummyLogger3logERKNS_7MessageEb 0x00000071011f4b14,nullsub_4670,4, 0x00000071011f4b18,j__ZdlPv_1220,4, 0x00000071011f4b1c,nullsub_4671,4, @@ -92650,14 +92650,14 @@ 0x00000071011f5668,GlobalMessage::x_4,248, 0x00000071011f5760,GlobalMessage::rtti1,204, 0x00000071011f582c,GlobalMessage::rtti2,92, -0x00000071011f5888,j__ZdlPv_1221,4, +0x00000071011f5888,j__ZdlPv_1221,4,_ZN4ksys17MessageDispatcher6Queues11DummyLoggerD0Ev 0x00000071011f588c,sub_71011F588C,140, -0x00000071011f5918,sub_71011F5918,240, -0x00000071011f5a08,sub_71011F5A08,240, -0x00000071011f5af8,sub_71011F5AF8,460, -0x00000071011f5cc4,sub_71011F5CC4,228, -0x00000071011f5da8,sub_71011F5DA8,132, -0x00000071011f5e2c,sub_71011F5E2C,52, +0x00000071011f5918,sub_71011F5918,240,_ZN4ksys17MessageDispatcher5QueueD1Ev +0x00000071011f5a08,sub_71011F5A08,240,_ZN4ksys17MessageDispatcher5QueueD0Ev +0x00000071011f5af8,sub_71011F5AF8,460,_ZN4ksys17MessageDispatcher5Queue10addMessageERKNS_7MessageE +0x00000071011f5cc4,sub_71011F5CC4,228,_ZN4ksys17MessageDispatcher5Queue12processQueueERNS_16MessageProcessorE +0x00000071011f5da8,sub_71011F5DA8,132,_ZN4ksys17MessageDispatcher5Queue5clearEv +0x00000071011f5e2c,sub_71011F5E2C,52,_ZN4ksys17MessageDispatcher9MainQueueD0Ev 0x00000071011f5e60,j__ZdlPv_1222,4, 0x00000071011f5e64,sub_71011F5E64,116, 0x00000071011f5ed8,j__ZdlPv_1223,4, diff --git a/src/KingSystem/Utils/CMakeLists.txt b/src/KingSystem/Utils/CMakeLists.txt index e071707f..c7123d7b 100644 --- a/src/KingSystem/Utils/CMakeLists.txt +++ b/src/KingSystem/Utils/CMakeLists.txt @@ -11,6 +11,8 @@ target_sources(uking PRIVATE Thread/Message.h Thread/MessageAck.cpp Thread/MessageAck.h + Thread/MessageDispatcher.cpp + Thread/MessageDispatcher.h Thread/MessageDispatcherBase.cpp Thread/MessageDispatcherBase.h Thread/MessageProcessor.cpp @@ -64,4 +66,5 @@ target_sources(uking PRIVATE StrTreeMap.h TypeTraits.h Types.h + UniqueArrayPtr.h ) diff --git a/src/KingSystem/Utils/Thread/Message.h b/src/KingSystem/Utils/Thread/Message.h index c534ffbd..07da260d 100644 --- a/src/KingSystem/Utils/Thread/Message.h +++ b/src/KingSystem/Utils/Thread/Message.h @@ -1,7 +1,7 @@ #pragma once #include - +#include #include "KingSystem/Utils/Thread/MessageTransceiverId.h" namespace ksys { @@ -37,6 +37,7 @@ public: }; Message(); + Message(const Message& message) { *this = message; } Message(const MesTransceiverId& source, const MesTransceiverId& destination, const MessageType& type, void* user_data, const DelayParams& delay_params, bool ack); Message(const MesTransceiverId& source, const MessageType& type, void* user_data, @@ -44,6 +45,17 @@ public: virtual ~Message(); + Message& operator=(const Message& other) { + mSource = other.getSource(); + mDestination = other.getDestination(); + mType = other.getType(); + mUserData = other.getUserData(); + _48 = other.getField48(); + mDelayParams = other.mDelayParams; + mShouldAck = other.shouldAck(); + return *this; + } + virtual const MesTransceiverId& getSource() const; virtual const MesTransceiverId& getDestination() const; virtual const MessageType& getType() const; @@ -61,6 +73,35 @@ public: --mDelayParams.delay_ticks; } + void reset() { + mType = {}; + mUserData = {}; + _48 = 0xffffffff; + mDelayParams = {}; + mShouldAck = {}; + mSource.reset(); + mDestination.reset(); + } + + void resetIfValid() { + if (isValid()) + reset(); + } + + bool isValid() const { return checkTransceiver(mDestination); } + + static bool checkTransceiver(const MesTransceiverId& id) { + if (!id.next) + return false; + + MesTransceiverId* next = *id.next; + if (!next) + return false; + + const auto& fields = [](const MesTransceiverId& i) { return std::tie(i.queue_id, i.id); }; + return fields(id) == fields(*next); + } + private: MesTransceiverId mSource{}; MesTransceiverId mDestination{}; diff --git a/src/KingSystem/Utils/Thread/MessageDispatcher.cpp b/src/KingSystem/Utils/Thread/MessageDispatcher.cpp new file mode 100644 index 00000000..a43a1596 --- /dev/null +++ b/src/KingSystem/Utils/Thread/MessageDispatcher.cpp @@ -0,0 +1,114 @@ +#include "KingSystem/Utils/Thread/MessageDispatcher.h" +#include "KingSystem/Utils/Thread/Message.h" + +namespace ksys { + +SEAD_SINGLETON_DISPOSER_IMPL(MessageDispatcher) + +MessageDispatcher::Queue::Queue() = default; + +MessageDispatcher::Queue::~Queue() { + Queue::clear(); +} + +Message* MessageDispatcher::Queue::findUnusedEntry() const { + for (Message& entry : mMessages) { + if (!entry.isValid()) + return &entry; + } + return nullptr; +} + +bool MessageDispatcher::Queue::addMessage(const Message& message) { + if (!Message::checkTransceiver(message.getSource())) + return false; + + if (!Message::checkTransceiver(message.getDestination())) + return false; + + auto* entry = findUnusedEntry(); + if (!entry) + return false; + + *entry = message; + return true; +} + +void MessageDispatcher::Queue::processQueue(MessageProcessor& processor) { + for (auto& message : mMessages) { + if (!message.isValid()) + break; + + if (processor.process(&message)) + message.resetIfValid(); + } +} + +void MessageDispatcher::Queue::clear() { + for (auto it = mMessages.begin(); it != mMessages.end(); ++it) + it->resetIfValid(); +} + +MessageDispatcher::DoubleBufferedQueue::DoubleBufferedQueue() = default; + +MessageDispatcher::DoubleBufferedQueue::~DoubleBufferedQueue() = default; + +bool MessageDispatcher::DoubleBufferedQueue::addMessage(const Message& message) { + return mBuffer[mActiveIdx ^ 1].addMessage(message); +} + +void MessageDispatcher::DoubleBufferedQueue::clear() { + mBuffer[0].clear(); + mBuffer[1].clear(); +} + +void MessageDispatcher::DoubleBufferedQueue::processQueue(MessageProcessor& processor) { + mActiveIdx ^= 1; + mBuffer[mActiveIdx].processQueue(processor); +} + +MessageDispatcher::Queues::DummyLogger::~DummyLogger() = default; + +MessageDispatcher::MainQueue::MainQueue() = default; + +MessageDispatcher::MainQueue::~MainQueue() = default; + +bool MessageDispatcher::MainQueue::addMessage(const Message& message) { + const bool ret = mQueue.addMessage(message); + if (ret) + mHasMessageToProcess = true; + return ret; +} + +void MessageDispatcher::MainQueue::clear() { + mQueue.clear(); +} + +void MessageDispatcher::MainQueue::processQueue(MessageProcessor& processor) { + for (u32 i = 0; mHasMessageToProcess && i < 1000; ++i) { + mHasMessageToProcess = false; + mQueue.processQueue(processor); + } +} + +MessageDispatcher::Queues::TransceiverIdBuffer::TransceiverIdBuffer() { + for (auto it = mBuffer.begin(); it != mBuffer.end(); ++it) + *it = nullptr; +} + +MessageDispatcher::Queues::TransceiverIdBuffer::~TransceiverIdBuffer() { + for (auto it = mBuffer.begin(); it != mBuffer.end(); ++it) { + if (auto* id = *it; id && Message::checkTransceiver(*id)) + id->reset(); + } +} + +MessageDispatcher::Queues::Queues(MessageProcessor::Logger* logger) + : mProcessor(logger == nullptr ? &mDummyLogger : logger) {} + +MessageDispatcher::Queues::~Queues() { + mQueue.clear(); + mMainQueue.clear(); +} + +} // namespace ksys diff --git a/src/KingSystem/Utils/Thread/MessageDispatcher.h b/src/KingSystem/Utils/Thread/MessageDispatcher.h new file mode 100644 index 00000000..5ad36708 --- /dev/null +++ b/src/KingSystem/Utils/Thread/MessageDispatcher.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include "KingSystem/Utils/Thread/MessageDispatcherBase.h" +#include "KingSystem/Utils/Thread/MessageProcessor.h" +#include "KingSystem/Utils/UniqueArrayPtr.h" + +namespace ksys { + +class Message; +class MessageProcessor; + +class MessageDispatcher : public MessageDispatcherBase { + SEAD_SINGLETON_DISPOSER(MessageDispatcher) + SEAD_RTTI_OVERRIDE(MessageDispatcher, MessageDispatcherBase) + MessageDispatcher() = default; + ~MessageDispatcher() override; + +public: + void registerTransceiver(MessageReceiverEx& receiver) override; + void deregisterTransceiver(MessageReceiverEx& receiver) override; + bool sendMessage(const MesTransceiverId& src, const MesTransceiverId& dest, + const MessageType& type, void* user_data, bool ack) override; + bool sendMessageOnProcessingThread(const MesTransceiverId& src, const MesTransceiverId& dest, + const MessageType& type, void* user_data, bool ack) override; + void m_8() override; + void m_9() override; + void update() override; + +private: + class Queue { + public: + Queue(); + virtual ~Queue(); + virtual bool addMessage(const Message& message); + virtual void processQueue(MessageProcessor& processor); + virtual void clear(); + + private: + Message* findUnusedEntry() const; + + util::UniqueArrayPtr mMessages; + }; + + class DoubleBufferedQueue { + public: + DoubleBufferedQueue(); + ~DoubleBufferedQueue(); + bool addMessage(const Message& message); + void clear(); + void processQueue(MessageProcessor& processor); + + private: + u32 mActiveIdx = 1; + Queue mBuffer[2]; + }; + + class MainQueue { + public: + MainQueue(); + virtual ~MainQueue(); + virtual bool addMessage(const Message& message); + virtual void clear(); + virtual void processQueue(MessageProcessor& processor); + + private: + DoubleBufferedQueue mQueue; + bool mHasMessageToProcess = false; + }; + + class Queues { + public: + explicit Queues(MessageProcessor::Logger* logger); + ~Queues(); + + private: + struct DummyLogger : public MessageProcessor::Logger { + ~DummyLogger() override; + void log(const Message& message, bool success) override {} + }; + + struct TransceiverIdBuffer { + TransceiverIdBuffer(); + ~TransceiverIdBuffer(); + + util::UniqueArrayPtr mBuffer; + }; + + sead::CriticalSection mCritSection; + u32 mId = 0xffffffff; + DummyLogger mDummyLogger; + TransceiverIdBuffer mTransceiverIds; + DoubleBufferedQueue mQueue; + MainQueue mMainQueue; + MessageProcessor mProcessor; + bool mIsProcessing = false; + }; +}; + +} // namespace ksys diff --git a/src/KingSystem/Utils/Thread/MessageDispatcherBase.h b/src/KingSystem/Utils/Thread/MessageDispatcherBase.h index c33b472c..bcdf16e5 100644 --- a/src/KingSystem/Utils/Thread/MessageDispatcherBase.h +++ b/src/KingSystem/Utils/Thread/MessageDispatcherBase.h @@ -4,20 +4,28 @@ namespace ksys { +class MessageReceiverEx; +struct MesTransceiverId; +struct MessageType; + class MessageDispatcherBase { SEAD_RTTI_BASE(MessageDispatcherBase) public: MessageDispatcherBase(); virtual ~MessageDispatcherBase(); + virtual void registerTransceiver(MessageReceiverEx& receiver) = 0; + virtual void deregisterTransceiver(MessageReceiverEx& receiver) = 0; + virtual bool sendMessage(const MesTransceiverId& src, const MesTransceiverId& dest, + const MessageType& type, void* user_data, bool ack) = 0; + virtual bool sendMessageOnProcessingThread(const MesTransceiverId& src, + const MesTransceiverId& dest, + const MessageType& type, void* user_data, + bool ack) = 0; // TODO - virtual void m_4() = 0; - virtual void m_5() = 0; - virtual void m_6() = 0; - virtual void m_7() = 0; virtual void m_8() = 0; virtual void m_9() = 0; - virtual void m_10() = 0; + virtual void update() = 0; protected: void setAsGlobalInstance(); diff --git a/src/KingSystem/Utils/Thread/MessageProcessor.cpp b/src/KingSystem/Utils/Thread/MessageProcessor.cpp index 296d24a8..88ce1b93 100644 --- a/src/KingSystem/Utils/Thread/MessageProcessor.cpp +++ b/src/KingSystem/Utils/Thread/MessageProcessor.cpp @@ -1,5 +1,4 @@ #include "KingSystem/Utils/Thread/MessageProcessor.h" -#include #include "KingSystem/Utils/Thread/Message.h" #include "KingSystem/Utils/Thread/MessageAck.h" #include "KingSystem/Utils/Thread/MessageReceiver.h" @@ -10,18 +9,6 @@ MessageProcessor::MessageProcessor(Logger* logger) : mLogger(logger) {} MessageProcessor::~MessageProcessor() = default; -static bool checkTransceiver(const MesTransceiverId& id) { - if (!id.next) - return false; - - MesTransceiverId* next = *id.next; - if (!next) - return false; - - const auto& fields = [](const MesTransceiverId& i) { return std::tie(i.queue_id, i.id); }; - return fields(id) == fields(*next); -} - bool MessageProcessor::process(Message* message) { message->decrementDelay(); @@ -32,17 +19,17 @@ bool MessageProcessor::process(Message* message) { bool dest_valid = false; const auto& dest = message->getDestination(); - if (checkTransceiver(dest)) { + if (Message::checkTransceiver(dest)) { success = dest.receiver->receive(*message) & 1; mLogger->log(*message, success); dest_valid = true; } const auto& src = message->getSource(); - if (!message->hasDelayer() || checkTransceiver(src)) { + if (!message->hasDelayer() || Message::checkTransceiver(src)) { if (message->shouldAck()) { const auto& source = message->getSource(); - if (checkTransceiver(source)) { + if (Message::checkTransceiver(source)) { auto* receiver = source.receiver; const MessageAck ack{dest_valid, success, message->getDestination(), message->getType(), message->getUserData()}; diff --git a/src/KingSystem/Utils/Thread/MessageProcessor.h b/src/KingSystem/Utils/Thread/MessageProcessor.h index c934e37e..a53775b8 100644 --- a/src/KingSystem/Utils/Thread/MessageProcessor.h +++ b/src/KingSystem/Utils/Thread/MessageProcessor.h @@ -13,6 +13,7 @@ public: virtual void log(const Message& message, bool success) = 0; }; + /// @param logger Must be non null. explicit MessageProcessor(Logger* logger); virtual ~MessageProcessor(); virtual bool process(Message* message); diff --git a/src/KingSystem/Utils/UniqueArrayPtr.h b/src/KingSystem/Utils/UniqueArrayPtr.h new file mode 100644 index 00000000..c8adadc7 --- /dev/null +++ b/src/KingSystem/Utils/UniqueArrayPtr.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include "KingSystem/Utils/SafeDelete.h" + +namespace ksys::util { + +template +class UniqueArrayPtr { +public: + UniqueArrayPtr() = default; + UniqueArrayPtr(const UniqueArrayPtr&) = delete; + // Not movable either for now. + UniqueArrayPtr(UniqueArrayPtr&&) = delete; + ~UniqueArrayPtr() { safeDeleteArray(mData); } + + auto& operator=(const UniqueArrayPtr&) = delete; + auto& operator=(UniqueArrayPtr&&) = delete; + + T* data() const { return mData; } + std::size_t size() const { return N; } + + T* begin() const { return mData; } + T* end() const { return mData + N; } + T*& operator[](std::size_t i) const { return mData[i]; } + +private: + T* mData = new T[N]; +}; + +} // namespace ksys::util