Implemented new thread scheduling logic that doesn't need a central scheduler thread, removed all uses of atomic waits

This commit is contained in:
Mr-Wiseguy
2024-04-25 02:57:06 -04:00
parent 5382ce6fd0
commit 68ee43f5ac
13 changed files with 440 additions and 571 deletions
+130 -134
View File
@@ -1,10 +1,38 @@
#include <thread>
#include <atomic>
#include "blockingconcurrentqueue.h"
#include "ultra64.h"
#include "ultramodern.hpp"
#include "recomp.h"
struct QueuedMessage {
PTR(OSMesgQueue) mq;
OSMesg mesg;
bool jam;
};
static moodycamel::BlockingConcurrentQueue<QueuedMessage> external_messages {};
void enqueue_external_message(PTR(OSMesgQueue) mq, OSMesg msg, bool jam) {
external_messages.enqueue({mq, msg, jam});
}
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block);
void dequeue_external_messages(RDRAM_ARG1) {
QueuedMessage to_send;
while (external_messages.try_dequeue(to_send)) {
do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false);
}
}
void ultramodern::wait_for_external_message(RDRAM_ARG1) {
QueuedMessage to_send;
external_messages.wait_dequeue(to_send);
do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false);
}
extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
mq->blocked_on_recv = NULLPTR;
@@ -27,168 +55,136 @@ s32 MQ_IS_FULL(OSMesgQueue* mq) {
return MQ_GET_COUNT(mq) >= mq->msgCount;
}
void thread_queue_insert(RDRAM_ARG PTR(OSThread)* queue, PTR(OSThread) toadd_) {
PTR(OSThread)* cur = queue;
OSThread* toadd = TO_PTR(OSThread, toadd_);
while (*cur && TO_PTR(OSThread, *cur)->priority > toadd->priority) {
cur = &TO_PTR(OSThread, *cur)->next;
}
toadd->next = (*cur);
*cur = toadd_;
}
OSThread* thread_queue_pop(RDRAM_ARG PTR(OSThread)* queue) {
PTR(OSThread) ret = *queue;
*queue = TO_PTR(OSThread, ret)->next;
return TO_PTR(OSThread, ret);
}
bool thread_queue_empty(RDRAM_ARG PTR(OSThread)* queue) {
return *queue == NULLPTR;
}
extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
// Prevent accidentally blocking anything that isn't a game thread
if (!ultramodern::is_game_thread()) {
flags = OS_MESG_NOBLOCK;
}
ultramodern::disable_preemption();
if (flags == OS_MESG_NOBLOCK) {
// If non-blocking, fail if the queue is full
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block) {
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
if (!block) {
// If non-blocking, fail if the queue is full.
if (MQ_IS_FULL(mq)) {
ultramodern::enable_preemption();
return -1;
return false;
}
} else {
// Otherwise, yield this thread until the queue has room
}
else {
// Otherwise, yield this thread until the queue has room.
while (MQ_IS_FULL(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, ultramodern::this_thread());
ultramodern::enable_preemption();
ultramodern::pause_self(PASS_RDRAM1);
ultramodern::disable_preemption();
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_send), ultramodern::this_thread());
ultramodern::run_next_thread(PASS_RDRAM1);
ultramodern::wait_for_resumed(PASS_RDRAM1);
}
}
s32 last = (mq->first + mq->validCount) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[last] = msg;
mq->validCount++;
OSThread* to_run = nullptr;
if (jam) {
// Jams insert at the head of the message queue's buffer.
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
mq->validCount++;
}
else {
// Sends insert at the tail of the message queue's buffer.
s32 last = (mq->first + mq->validCount) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[last] = msg;
mq->validCount++;
}
if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) {
to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv);
// If any threads were blocked on receiving from this message queue, pop the first one and schedule it.
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv);
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
}
ultramodern::enable_preemption();
if (to_run) {
debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id);
if (ultramodern::is_game_thread()) {
OSThread* self = TO_PTR(OSThread, ultramodern::this_thread());
if (to_run->priority > self->priority) {
ultramodern::swap_to_thread(PASS_RDRAM to_run);
} else {
ultramodern::schedule_running_thread(to_run);
}
} else {
ultramodern::schedule_running_thread(to_run);
}
}
return 0;
return true;
}
extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
ultramodern::disable_preemption();
if (flags == OS_MESG_NOBLOCK) {
// If non-blocking, fail if the queue is full
if (MQ_IS_FULL(mq)) {
ultramodern::enable_preemption();
return -1;
}
} else {
// Otherwise, yield this thread in a loop until the queue is no longer full
while (MQ_IS_FULL(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on jam\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, ultramodern::this_thread());
ultramodern::enable_preemption();
ultramodern::pause_self(PASS_RDRAM1);
ultramodern::disable_preemption();
}
}
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
mq->validCount++;
OSThread *to_run = nullptr;
if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) {
to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv);
}
ultramodern::enable_preemption();
if (to_run) {
debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id);
OSThread *self = TO_PTR(OSThread, ultramodern::this_thread());
if (to_run->priority > self->priority) {
ultramodern::swap_to_thread(PASS_RDRAM to_run);
} else {
ultramodern::schedule_running_thread(to_run);
}
}
return 0;
}
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
OSMesg *msg = TO_PTR(OSMesg, msg_);
ultramodern::disable_preemption();
if (flags == OS_MESG_NOBLOCK) {
bool do_recv(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, bool block) {
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
if (!block) {
// If non-blocking, fail if the queue is empty
if (MQ_IS_EMPTY(mq)) {
ultramodern::enable_preemption();
return -1;
return false;
}
} else {
// Otherwise, yield this thread in a loop until the queue is no longer full
while (MQ_IS_EMPTY(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
thread_queue_insert(PASS_RDRAM &mq->blocked_on_recv, ultramodern::this_thread());
ultramodern::enable_preemption();
ultramodern::pause_self(PASS_RDRAM1);
ultramodern::disable_preemption();
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv), ultramodern::this_thread());
ultramodern::run_next_thread(PASS_RDRAM1);
ultramodern::wait_for_resumed(PASS_RDRAM1);
}
}
if (msg_ != NULLPTR) {
*msg = TO_PTR(OSMesg, mq->msg)[mq->first];
*TO_PTR(OSMesg, msg_) = TO_PTR(OSMesg, mq->msg)[mq->first];
}
mq->first = (mq->first + 1) % mq->msgCount;
mq->validCount--;
OSThread *to_run = nullptr;
// If any threads were blocked on sending to this message queue, pop the first one and schedule it.
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_send);
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
}
if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_send)) {
to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_send);
return true;
}
extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
bool jam = false;
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
if (!ultramodern::is_game_thread()) {
enqueue_external_message(mq_, msg, jam);
return 0;
}
ultramodern::enable_preemption();
if (to_run) {
debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id);
OSThread *self = TO_PTR(OSThread, ultramodern::this_thread());
if (to_run->priority > self->priority) {
ultramodern::swap_to_thread(PASS_RDRAM to_run);
} else {
ultramodern::schedule_running_thread(to_run);
}
}
return 0;
// Handle any messages that have been received from an external thread.
dequeue_external_messages(PASS_RDRAM1);
// Try to send the message.
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
// Check the queue to see if this thread should swap execution to another.
ultramodern::check_running_queue(PASS_RDRAM1);
return sent ? 0 : -1;
}
extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
bool jam = true;
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
if (!ultramodern::is_game_thread()) {
enqueue_external_message(mq_, msg, jam);
return 0;
}
// Handle any messages that have been received from an external thread.
dequeue_external_messages(PASS_RDRAM1);
// Try to send the message.
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
// Check the queue to see if this thread should swap execution to another.
ultramodern::check_running_queue(PASS_RDRAM1);
return sent ? 0 : -1;
}
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
assert(ultramodern::is_game_thread() && "RecvMesg not allowed outside of game threads.");
// Handle any messages that have been received from an external thread.
dequeue_external_messages(PASS_RDRAM1);
// Try to receive a message.
bool received = do_recv(PASS_RDRAM mq_, msg_, flags == OS_MESG_BLOCK);
// Check the queue to see if this thread should swap execution to another.
ultramodern::check_running_queue(PASS_RDRAM1);
return received ? 0 : -1;
}