ksys: Add MessageBroker and finish MessageDispatcher

This commit is contained in:
Léo Lam
2021-02-02 19:15:11 +01:00
parent 6e2e22cca5
commit df58679dda
12 changed files with 240 additions and 64 deletions
+2
View File
@@ -11,6 +11,8 @@ target_sources(uking PRIVATE
Thread/Message.h
Thread/MessageAck.cpp
Thread/MessageAck.h
Thread/MessageBroker.cpp
Thread/MessageBroker.h
Thread/MessageDispatcher.cpp
Thread/MessageDispatcher.h
Thread/MessageDispatcherBase.cpp
+2 -2
View File
@@ -34,7 +34,7 @@ void* Message::getUserData() const {
}
u32 Message::getField48() const {
return _48;
return mBrokerId;
}
bool Message::shouldBeProcessed() const {
@@ -56,7 +56,7 @@ void Message::setDestination(const MesTransceiverId& dest) {
}
void Message::setField48(const u32& v) {
_48 = v;
mBrokerId = v;
}
} // namespace ksys
+5 -3
View File
@@ -50,7 +50,7 @@ public:
mDestination = other.getDestination();
mType = other.getType();
mUserData = other.getUserData();
_48 = other.getField48();
mBrokerId = other.getField48();
mDelayParams = other.mDelayParams;
mShouldAck = other.shouldAck();
return *this;
@@ -76,7 +76,7 @@ public:
void reset() {
mType = {};
mUserData = {};
_48 = 0xffffffff;
mBrokerId = 0xffffffff;
mDelayParams = {};
mShouldAck = {};
mSource.reset();
@@ -90,12 +90,14 @@ public:
bool isValid() const { return mDestination.isRegistered(); }
void setBrokerId_(u32 id) { mBrokerId = id; }
private:
MesTransceiverId mSource{};
MesTransceiverId mDestination{};
MessageType mType{};
void* mUserData{};
u32 _48 = 0xffffffff;
u32 mBrokerId = 0xffffffff;
DelayParams mDelayParams{};
bool mShouldAck = true;
};
@@ -0,0 +1,22 @@
#include "KingSystem/Utils/Thread/MessageBroker.h"
#include "KingSystem/Utils/Thread/MessageTransceiverBase.h"
namespace ksys {
IMessageBroker::IMessageBroker() = default;
IMessageBroker::~IMessageBroker() = default;
bool IMessageBroker::registerTransceiver(const MessageTransceiverBase& transceiver) {
return getRegister()->registerTransceiver(*transceiver.getId());
}
void IMessageBroker::deregisterTransceiver(const MessageTransceiverBase& transceiver) {
return getRegister()->deregisterTransceiver(*transceiver.getId());
}
int IMessageBroker::countTransceivers() {
return getRegister()->countTransceivers();
}
} // namespace ksys
@@ -0,0 +1,53 @@
#pragma once
#include <basis/seadTypes.h>
#include <prim/seadRuntimeTypeInfo.h>
namespace ksys {
class Message;
class MessageTransceiverBase;
class MessageQueue;
struct MesTransceiverId;
class IMessageBrokerRegister {
public:
struct IForEachContext {
virtual ~IForEachContext() = default;
virtual void process(const MesTransceiverId& id) = 0;
};
virtual ~IMessageBrokerRegister() = default;
virtual bool registerTransceiver(const MesTransceiverId& id) = 0;
virtual void deregisterTransceiver(const MesTransceiverId& id) = 0;
virtual void forEachRegistered(IForEachContext& context) = 0;
virtual void setId(const u32& id) = 0;
virtual const u32& getId() const = 0;
virtual int countTransceivers() const = 0;
};
class IMessageBroker {
public:
class SetIdArg {
SEAD_RTTI_BASE(SetIdArg)
public:
SetIdArg() = default;
explicit SetIdArg(u32 id) { setId(id); }
virtual ~SetIdArg() = default;
u32 getId() const { return mId; }
void setId(u32 id) { mId = id; }
private:
u32 mId{};
};
IMessageBroker();
virtual ~IMessageBroker();
virtual void setId(const SetIdArg& arg) = 0;
virtual IMessageBrokerRegister* getRegister() = 0;
bool registerTransceiver(const MessageTransceiverBase& transceiver);
void deregisterTransceiver(const MessageTransceiverBase& transceiver);
int countTransceivers();
};
} // namespace ksys
@@ -7,17 +7,18 @@
#include "KingSystem/Utils/HeapUtil.h"
#include "KingSystem/Utils/SafeDelete.h"
#include "KingSystem/Utils/Thread/Message.h"
#include "KingSystem/Utils/Thread/MessageBroker.h"
#include "KingSystem/Utils/Thread/MessageReceiverEx.h"
namespace ksys {
MessageDispatcher::Queue::Queue() = default;
MessageQueue::MessageQueue() = default;
MessageDispatcher::Queue::~Queue() {
Queue::clear();
MessageQueue::~MessageQueue() {
MessageQueue::clear();
}
Message* MessageDispatcher::Queue::findUnusedEntry() const {
Message* MessageQueue::findUnusedEntry() const {
for (Message& entry : mMessages) {
if (!entry.isValid())
return &entry;
@@ -25,7 +26,7 @@ Message* MessageDispatcher::Queue::findUnusedEntry() const {
return nullptr;
}
bool MessageDispatcher::Queue::addMessage(const Message& message) {
bool MessageQueue::addMessage(const Message& message) {
if (!message.getSource().isRegistered())
return false;
@@ -40,7 +41,7 @@ bool MessageDispatcher::Queue::addMessage(const Message& message) {
return true;
}
void MessageDispatcher::Queue::processQueue(MessageProcessor& processor) {
void MessageQueue::processQueue(MessageProcessor& processor) {
for (auto& message : mMessages) {
if (!message.isValid())
break;
@@ -50,7 +51,7 @@ void MessageDispatcher::Queue::processQueue(MessageProcessor& processor) {
}
}
void MessageDispatcher::Queue::clear() {
void MessageQueue::clear() {
for (auto it = mMessages.begin(); it != mMessages.end(); ++it)
it->resetIfValid();
}
@@ -228,6 +229,16 @@ bool MessageDispatcher::sendMessage(const MesTransceiverId& src, const MesTransc
return queues->getQueue().addMessage(message);
}
bool MessageDispatcher::Queues::sendMessageOnProcessingThread(const MesTransceiverId& src,
const MesTransceiverId& dest,
const MessageType& type,
void* user_data, bool ack) {
const auto message = Message{src, dest, type, user_data, {}, ack};
if (!isProcessing())
return false;
return mMainQueue.addMessage(message);
}
// NON_MATCHING: branching: deduplicated Message destructor call
bool MessageDispatcher::sendMessageOnProcessingThread(const MesTransceiverId& src,
const MesTransceiverId& dest,
@@ -235,12 +246,78 @@ bool MessageDispatcher::sendMessageOnProcessingThread(const MesTransceiverId& sr
bool ack) {
if (!isProcessingOnCurrentThread())
return false;
return mQueues->sendMessageOnProcessingThread(src, dest, type, user_data, ack);
}
auto* queues = mQueues;
const auto message = Message{src, dest, type, user_data, {}, ack};
if (!queues->isProcessing())
struct AddMessageContext : IMessageBrokerRegister::IForEachContext {
AddMessageContext(MessageQueue* queue, Message* message) : queue(queue), message(message) {}
void process(const MesTransceiverId& id) override {
if (!id.isRegistered())
return;
message->setDestination(id);
result = queue->addMessage(*message);
}
MessageQueue* queue;
Message* message;
bool result = false;
};
bool MessageDispatcher::sendMessage(const MesTransceiverId& src, IMessageBrokerRegister& reg,
const MessageType& type, void* user_data, bool ack) {
auto queues = mQueues;
Message::DelayParams delay_params;
// This should probably be a Queues member function, but putting this here removes
// the need to include Message.h in the header.
return [&] {
auto message = Message{src, type, user_data, delay_params, ack};
message.setBrokerId_(reg.getId());
queues->getCritSection().lock();
AddMessageContext ctx{queues->getQueue().getQueue(), &message};
reg.forEachRegistered(ctx);
queues->getCritSection().unlock();
return ctx.result;
}();
}
struct AddMessageMainContext : IMessageBrokerRegister::IForEachContext {
AddMessageMainContext(MessageDispatcher::MainQueue* queue, Message* message)
: queue(queue), message(message) {}
void process(const MesTransceiverId& id) override {
if (!id.isRegistered())
return;
message->setDestination(id);
result = queue->addMessage(*message);
}
MessageDispatcher::MainQueue* queue;
Message* message;
bool result = false;
};
bool MessageDispatcher::sendMessageOnProcessingThread(const MesTransceiverId& src,
IMessageBrokerRegister& reg,
const MessageType& type, void* user_data,
bool ack) {
if (!isProcessingOnCurrentThread())
return false;
return queues->getMainQueue().addMessage(message);
auto queues = mQueues;
Message::DelayParams delay_params;
return [&] {
if (!queues->isProcessing())
return false;
auto message = Message{src, type, user_data, delay_params, ack};
message.setBrokerId_(reg.getId());
AddMessageMainContext ctx{&queues->getMainQueue(), &message};
reg.forEachRegistered(ctx);
return ctx.result;
}();
}
void MessageDispatcher::Queues::process() {
+25 -17
View File
@@ -22,6 +22,20 @@ class Message;
class MessageProcessor;
struct MesTransceiverId;
class MessageQueue {
public:
MessageQueue();
virtual ~MessageQueue();
virtual bool addMessage(const Message& message);
virtual void processQueue(MessageProcessor& processor);
virtual void clear();
private:
Message* findUnusedEntry() const;
util::UniqueArrayPtr<Message, 3000> mMessages;
};
class MessageDispatcher : public MessageDispatcherBase {
SEAD_SINGLETON_DISPOSER(MessageDispatcher)
SEAD_RTTI_OVERRIDE(MessageDispatcher, MessageDispatcherBase)
@@ -47,24 +61,14 @@ public:
const MessageType& type, void* user_data, bool ack) override;
bool sendMessageOnProcessingThread(const MesTransceiverId& src, const MesTransceiverId& dest,
const MessageType& type, void* user_data, bool ack) override;
void m_8() override;
void m_9() override;
bool sendMessage(const MesTransceiverId& src, IMessageBrokerRegister& reg,
const MessageType& type, void* user_data, bool ack) override;
bool sendMessageOnProcessingThread(const MesTransceiverId& src, IMessageBrokerRegister& reg,
const MessageType& type, void* user_data, bool ack) override;
void update() override;
private:
class Queue {
public:
Queue();
virtual ~Queue();
virtual bool addMessage(const Message& message);
virtual void processQueue(MessageProcessor& processor);
virtual void clear();
private:
Message* findUnusedEntry() const;
util::UniqueArrayPtr<Message, 3000> mMessages;
};
friend struct AddMessageMainContext;
class DoubleBufferedQueue {
public:
@@ -74,13 +78,14 @@ private:
void clear();
void processQueue(MessageProcessor& processor);
void swapBuffer() { mActiveIdx ^= 1; }
MessageQueue* getQueue() { return &mBuffer[mActiveIdx ^ 1]; }
private:
u32 mActiveIdx = 1;
Queue mBuffer[2];
MessageQueue mBuffer[2];
};
class MainQueue final {
class MainQueue {
public:
MainQueue();
virtual ~MainQueue();
@@ -104,6 +109,9 @@ private:
MainQueue& getMainQueue() { return mMainQueue; }
bool isProcessing() const { return mIsProcessing; }
void process();
bool sendMessageOnProcessingThread(const MesTransceiverId& src,
const MesTransceiverId& dest, const MessageType& type,
void* user_data, bool ack);
private:
struct DummyLogger : public MessageProcessor::Logger {
@@ -4,6 +4,7 @@
namespace ksys {
class IMessageBrokerRegister;
class MessageReceiverEx;
struct MesTransceiverId;
struct MessageType;
@@ -22,9 +23,11 @@ public:
const MesTransceiverId& dest,
const MessageType& type, void* user_data,
bool ack) = 0;
// TODO
virtual void m_8() = 0;
virtual void m_9() = 0;
virtual bool sendMessage(const MesTransceiverId& src, IMessageBrokerRegister& reg,
const MessageType& type, void* user_data, bool ack) = 0;
virtual bool sendMessageOnProcessingThread(const MesTransceiverId& src,
IMessageBrokerRegister& reg, const MessageType& type,
void* user_data, bool ack) = 0;
virtual void update() = 0;
protected:
@@ -1,4 +1,5 @@
#include "KingSystem/Utils/Thread/MessageTransceiverBase.h"
#include "KingSystem/Utils/Thread/MessageBroker.h"
#include "KingSystem/Utils/Thread/MessageReceiverEx.h"
namespace ksys {
@@ -9,12 +10,12 @@ MessageTransceiverBase::MessageTransceiverBase() = default;
MessageTransceiverBase::~MessageTransceiverBase() = default;
bool MessageTransceiverBase::checkGeneratorFlag() const {
return getGenerator()->checkFlag();
bool MessageTransceiverBase::checkReceiverFlag() const {
return getReceiver()->checkFlag();
}
bool MessageTransceiverBase::checkGeneratorCounter() const {
return getGenerator()->checkCounter();
bool MessageTransceiverBase::checkReceiverCounter() const {
return getReceiver()->checkCounter();
}
bool MessageTransceiverBase::m2() {
@@ -41,4 +42,8 @@ void MessageTransceiverBase::setGlobalDispatcher(MessageDispatcherBase* dispatch
sDispatcher = dispatcher;
}
IMessageBrokerRegister* MessageTransceiverBase::getRegister(IMessageBroker& broker) const {
return broker.getRegister();
}
} // namespace ksys
@@ -4,6 +4,8 @@
namespace ksys {
class IMessageBroker;
class IMessageBrokerRegister;
class MessageDispatcherBase;
struct MesTransceiverId;
class MessageReceiverEx;
@@ -12,14 +14,16 @@ class MessageTransceiverBase {
public:
MessageTransceiverBase();
virtual ~MessageTransceiverBase();
bool checkGeneratorFlag() const;
bool checkGeneratorCounter() const;
bool checkReceiverFlag() const;
bool checkReceiverCounter() const;
virtual bool m2();
virtual bool m3();
virtual bool m4();
virtual bool m5();
virtual MessageReceiverEx* getGenerator() const = 0;
virtual MessageReceiverEx* getReceiver() const = 0;
MessageDispatcherBase* getDispatcher();
MesTransceiverId* getId() const { return mId; }
IMessageBrokerRegister* getRegister(IMessageBroker& broker) const;
static void setGlobalDispatcher(MessageDispatcherBase* dispatcher);