文章目录
  1. EvMessageQueue

该类提供订阅-发布模式的消息队列机制,供其他需要消息队列机制的类使用。定义于src/musikcore/runtime/MessageQueue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class MessageQueue : public IMessageQueue {
public:
MessageQueue(const MessageQueue&) = delete;
MessageQueue(const MessageQueue&&) = delete;
MessageQueue& operator=(const MessageQueue&) = delete;
MessageQueue& operator=(const MessageQueue&&) = delete;
/* nextMessageTime设置为1 */
MessageQueue() noexcept;
/* 投递消息:先判断消息的目标是否注册到本消息队列的targets中,若注册了才将此消息加入队列queue */
void Post(IMessagePtr message, int64_t delayMs = 0) override;
/* 判断message是否设置了target,若设置了则报错(因为这是广播消息),否则调用Enqueue将其加入消息队列 */
void Broadcast(IMessagePtr message, int64_t messageMs = 0) override;
/* 将消息队列中目标为target、类型为type(若type=-1,则匹配任意类型)的消息删除(消息本身也删除),然后重新设置nextMessageTime */
int Remove(IMessageTarget *target, int type = -1) override;
/* 判断消息队列中是否存在目标为target、类型为type(若type=-1,则匹配任意类型)的消息 */
bool Contains(IMessageTarget *target, int type = -1) override;
/* 删除消息队列中目标与类型与message一致的消息,然后投递消息message,延时为delayMs */
void Debounce(IMessagePtr message, int64_t delayMs = 0) override;
/* target加入到消息队列的目标targets中,如同target订阅消息 */
void Register(IMessageTarget* target) override;
/* target从消息队列的目标中移除,如同target取订消息;然后调用Remove函数 */
void Unregister(IMessageTarget* target) override;
/* target加入到消息队列的广播目标receivers中,如同target订阅广播消息 */
void RegisterForBroadcasts(IMessageTargetPtr target) override;
/* target从消息队列的广播目标receivers中,如同target取订广播消息 */
void UnregisterForBroadcasts(IMessageTarget *target) override;
/* “等待”并执行Dispatch函数(若queue非空且消息队列中已有消息超时未投递,则调用执行Dispatch函数;若queue非空且消息队列中还未有消息到时且timeoutMillis大于等于0,则wait_for min(this->queue.front()->time-system_clock::now().time_since_epoch(), timeoutMillis)再调用Dispatch函数;若queue非空且消息队列中还未有消息到时且timeoutMillis小于0,则wait_for直到queue头部的消息设置的投递时间;若queue为空且timeoutMillis大于等于0,则wait_for timeoutMillis;若queue为空且timeoutMillis小于0,则一直wait,但是如果有消息插入到queue中,就会被唤醒) */
void WaitAndDispatch(int64_t timeoutMillis = -1) override;
/* 在类中定义了两个名为Dispatch的函数,此处的参数表为空,用于处理所有超时消息,具体实现如下:判断是否有消息到期(即nextMessageTime是否大于0且小于等于now),若无消息到期则退出;若有消息到期且其目标在targets中或其类型为广播消息,则将其存入dispatch,然后只要消息到期就将其从queue中移除,将所有到期的消息收集完成后,调用Dispatch(IMessage*)依次进行处理;这里试着分析一下为什么先收集到期且仍然有效的消息:处理消息的时间是不可控的,可能很短,也可能很长,当处理完再收集下一个到期消息时,也许时间过了很久,那么这时仅根据函数开头得到的now判断消息是否到期是不准确的 */
void Dispatch() override;

protected:
int64_t GetNextMessageTime() noexcept {
return nextMessageTime.load();
}

private:
typedef std::weak_ptr<IMessageTarget> IWeakMessageTarget;
/* 根据延迟时间delayMs计算出消息的投递时间,再将其插入到以投递时间为序的队列queue中,若message在queue的开头,那么需要重新设置nextMessageTime为message的投递时间,并通知所有等待在waitForDispatch上的线程(关于此处waitForDispatch.notify_all的一点看法:所有的成员函数中,只有Enqueue成员函数会在一些条件下执行waitForDispatch.notify_all,因此它是必不可少的;在WaitAndDispatch中,只有waitForDispatch.wait(lock)必须由它唤醒,而只有在queue为空且timeoutMillis<0时才会执行至这条语句,而当queue为空时插入的消息message一定会在queue的头部,从而恰好执行waitForDispatch.notify_all将等待在waitForDispatch上的线程唤醒;另一种执行至waitForDispatch.notify_all的情形是queue非空,message插入到了queue的头部,等待在条件变量waitForDispatch上的线程只会阻塞在waitForDispatch.wait_for,此时执行Dispatch不一定有消息到期,因此Dispatch函数在最开始进行了判断) */
void Enqueue(IMessagePtr message, int64_t delayMs);
/* 对于设置了target的消息,调用其target的ProcessMessage函数处理message;对于未设置target的消息,先拷贝receivers的一个副本copy,这样可以避免在receivers中的target依次处理message过程中,一直将receivers加锁;然后遍历copy,每个target要处理message时,先对自身加锁,若加锁成功,则调用其ProcessMessage函数处理message,否则,设定一个标志表示至少有一个weak_ptr对应的对象已经被销毁,则依次遍历receivers中的weak_ptr,若已失效,则将其从receivers中删除 */
void Dispatch(IMessage* message);
/* 被插入队列的消息结构:消息主体与设置的投递时间 */
struct EnqueuedMessage {
IMessagePtr message;
std::chrono::milliseconds time;
};
/* 自定义类(结构体)比较函数less:重载()运算符
* http://www.javashuo.com/article/p-zufmcrln-eg.html
*/
struct WeakPtrLess { /* https://stackoverflow.com/a/12875729 */
template <typename T>
bool operator() (const std::weak_ptr<T>& l, const std::weak_ptr<T>& r) const {
return l.lock().get() < r.lock().get();
}
};
/* 关于queue的互斥量 */
std::mutex queueMutex;
/* 消息队列,按照消息设置的投递时间排序 */
std::list<EnqueuedMessage*> queue;
/* 保存超时要处理的消息 */
std::list<EnqueuedMessage*> dispatch;
/* set类型定义如下:
* template <
* class Key,
* class Traits=less<Key>,
* class Allocator=allocator<Key> >
* class set
* https://blog.csdn.net/zy2317878/article/details/78981726
*/
std::set<IWeakMessageTarget, WeakPtrLess> receivers;
/* 消息订阅者 */
std::set<IMessageTarget*> targets;
/* 标志Dispatch函数所需的条件是否准备好 */
std::condition_variable_any waitForDispatch;
/* 始终存储消息队列中最先到期的消息的投递时间 */
std::atomic<int64_t> nextMessageTime;
};

注:在WaitAndDispatch函数对queueMutex加锁,并未看到对queueMutex进行解锁,就调用了Dispatch函数,而Dispatch函数又对queueMutex加锁,而queueMutex是非递归锁,不可被同一线程多次获取;事实上,这里的{}指明了变量的作用域,在{}内声明的局部变量其作用域自变量声明开始,到{}之后终结。因此在调用Dispatch函数之前queueMutex就已被解锁。

总结一下消息队列的运行机制。

EvMessageQueue

在main.cpp中,使用的是继承自MessageQueue类的EvMessageQueue类;其定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class EvMessageQueue: public MessageQueue {
public:
void Post(IMessagePtr message, int64_t delayMs) {
MessageQueue::Post(message, delayMs);

if (delayMs <= 0) {
write(pipeFd[1], &EVENT_DISPATCH, sizeof(EVENT_DISPATCH));
}
else {
double delayTs = (double) delayMs / 1000.0;
loop.once<
EvMessageQueue,
&EvMessageQueue::DelayedDispatch
>(-1, ev::TIMER, (ev::tstamp) delayTs, this);
}
}

void DelayedDispatch(int revents) {
this->Dispatch();
}

static void SignalQuit(ev::sig& signal, int revents) {
write(pipeFd[1], &EVENT_QUIT, sizeof(EVENT_QUIT));
}

void ReadCallback(ev::io& watcher, int revents) {
short type;
if (read(pipeFd[0], &type, sizeof(type)) == 0) {
std::cerr << "read() failed.\n";
exit(EXIT_FAILURE);
}
switch (type) {
case EVENT_DISPATCH: this->Dispatch(); break;
case EVENT_QUIT: loop.break_loop(ev::ALL); break;
}
}

void Run() {
io.set(loop);
io.set(pipeFd[0], ev::READ);
io.set<EvMessageQueue, &EvMessageQueue::ReadCallback>(this);
io.start();

sio.set(loop);
sio.set<&EvMessageQueue::SignalQuit>();
sio.start(SIGTERM);

write(pipeFd[1], &EVENT_DISPATCH, sizeof(EVENT_DISPATCH));

loop.run(0);
}

private:
ev::dynamic_loop loop;
ev::io io;
ev::sig sio;
};

可见EvMessageQueue类只是对MessageQueue类的Post与DelayedDispatch的重写,并增加了SignalQuit、ReadCallback与Run函数,以及新增三个变量loop(类型)、io(ev::io类型)与sio(ev::sig类型)。