coming
天行健 君子以自强不息
记录我的成长
该类提供订阅-发布模式的消息队列机制,供其他需要消息队列机制的类使用。定义于src/musikcore/runtime/MessageQueue.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| class MessageQueue : public IMessageQueue { public: MessageQueue(const MessageQueue&) = delete; MessageQueue(const MessageQueue&&) = delete; MessageQueue& operator=(const MessageQueue&) = delete; MessageQueue& operator=(const MessageQueue&&) = delete; MessageQueue() noexcept; void Post(IMessagePtr message, int64_t delayMs = 0) override; void Broadcast(IMessagePtr message, int64_t messageMs = 0) override; int Remove(IMessageTarget *target, int type = -1) override; bool Contains(IMessageTarget *target, int type = -1) override; void Debounce(IMessagePtr message, int64_t delayMs = 0) override; void Register(IMessageTarget* target) override; void Unregister(IMessageTarget* target) override; void RegisterForBroadcasts(IMessageTargetPtr target) override; void UnregisterForBroadcasts(IMessageTarget *target) override; void WaitAndDispatch(int64_t timeoutMillis = -1) override; void Dispatch() override;
protected: int64_t GetNextMessageTime() noexcept { return nextMessageTime.load(); }
private: typedef std::weak_ptr<IMessageTarget> IWeakMessageTarget; void Enqueue(IMessagePtr message, int64_t delayMs); void Dispatch(IMessage* message); struct EnqueuedMessage { IMessagePtr message; std::chrono::milliseconds time; };
struct WeakPtrLess { template <typename T> bool operator() (const std::weak_ptr<T>& l, const std::weak_ptr<T>& r) const { return l.lock().get() < r.lock().get(); } }; std::mutex queueMutex; std::list<EnqueuedMessage*> queue; std::list<EnqueuedMessage*> dispatch;
std::set<IWeakMessageTarget, WeakPtrLess> receivers; std::set<IMessageTarget*> targets; std::condition_variable_any waitForDispatch; std::atomic<int64_t> nextMessageTime; };
|
注:在WaitAndDispatch函数对queueMutex加锁,并未看到对queueMutex进行解锁,就调用了Dispatch函数,而Dispatch函数又对queueMutex加锁,而queueMutex是非递归锁,不可被同一线程多次获取;事实上,这里的{}指明了变量的作用域,在{}内声明的局部变量其作用域自变量声明开始,到{}之后终结。因此在调用Dispatch函数之前queueMutex就已被解锁。
总结一下消息队列的运行机制。
EvMessageQueue
在main.cpp中,使用的是继承自MessageQueue类的EvMessageQueue类;其定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| class EvMessageQueue: public MessageQueue { public: void Post(IMessagePtr message, int64_t delayMs) { MessageQueue::Post(message, delayMs);
if (delayMs <= 0) { write(pipeFd[1], &EVENT_DISPATCH, sizeof(EVENT_DISPATCH)); } else { double delayTs = (double) delayMs / 1000.0; loop.once< EvMessageQueue, &EvMessageQueue::DelayedDispatch >(-1, ev::TIMER, (ev::tstamp) delayTs, this); } }
void DelayedDispatch(int revents) { this->Dispatch(); }
static void SignalQuit(ev::sig& signal, int revents) { write(pipeFd[1], &EVENT_QUIT, sizeof(EVENT_QUIT)); }
void ReadCallback(ev::io& watcher, int revents) { short type; if (read(pipeFd[0], &type, sizeof(type)) == 0) { std::cerr << "read() failed.\n"; exit(EXIT_FAILURE); } switch (type) { case EVENT_DISPATCH: this->Dispatch(); break; case EVENT_QUIT: loop.break_loop(ev::ALL); break; } }
void Run() { io.set(loop); io.set(pipeFd[0], ev::READ); io.set<EvMessageQueue, &EvMessageQueue::ReadCallback>(this); io.start();
sio.set(loop); sio.set<&EvMessageQueue::SignalQuit>(); sio.start(SIGTERM);
write(pipeFd[1], &EVENT_DISPATCH, sizeof(EVENT_DISPATCH));
loop.run(0); }
private: ev::dynamic_loop loop; ev::io io; ev::sig sio; };
|
可见EvMessageQueue类只是对MessageQueue类的Post与DelayedDispatch的重写,并增加了SignalQuit、ReadCallback与Run函数,以及新增三个变量loop(类型)、io(ev::io类型)与sio(ev::sig类型)。
本文代表个人观点,内容仅供参考。若有不恰当之处,望不吝赐教!