-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.cpp
117 lines (97 loc) · 2.71 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <boost/asio.hpp>
#include <fmt/core.h>
using namespace std::literals;
struct publisher {
std::chrono::milliseconds duration{};
boost::asio::awaitable<void> send(std::string_view msg)
{
std::this_thread::sleep_for(duration);
fmt::print("sent: {}\n", msg);
co_return;
}
};
struct subscriber {
std::chrono::milliseconds duration{};
template<typename T>
boost::asio::awaitable<void> recv(T& t)
{
std::this_thread::sleep_for(duration);
t = T{};
co_return;
}
};
struct other_service {
std::chrono::milliseconds duration{};
std::string msg{};
boost::asio::awaitable<std::string> request()
{
std::this_thread::sleep_for(duration);
co_return msg;
}
};
struct model {
publisher pub{};
subscriber sub1{};
subscriber sub2{};
other_service service{};
};
struct msg_1 {
};
struct msg_2 {
};
struct event_handler {
model& m;
boost::asio::awaitable<void> process(msg_1)
{
thread_local int count = 0;
fmt::print("recv: msg_1 {}\n", ++count);
auto request = co_await m.service.request();
fmt::print("recv: for msg_1: {} {}\n", request, count);
co_await m.pub.send(fmt::format("msg_1 {}", count));
}
boost::asio::awaitable<void> process(msg_2)
{
thread_local int count = 0;
fmt::print("recv: msg_2 {}\n", ++count);
auto request = co_await m.service.request();
fmt::print("recv: for msg_2: {} {}\n", request, count);
co_await m.pub.send(fmt::format("msg_2 {}", count));
}
};
template<typename T>
boost::asio::awaitable<void> receive_messages(std::stop_token stop_token, subscriber& sub, event_handler& handler)
{
while (!stop_token.stop_requested())
{
T msg;
co_await sub.recv(msg);
co_await handler.process(msg);
}
}
int main()
{
boost::asio::thread_pool pool{4};
std::stop_source stop_source;
boost::asio::signal_set signals{pool, SIGINT, SIGTERM};
signals.async_wait([&pool, &stop_source](const auto&, auto) {
fmt::print("signal caught!\n");
stop_source.request_stop();
pool.stop();
});
model m{
.pub{100ms},
.sub1{1s},
.sub2{3s},
.service{500ms, "reqested msg"},
};
event_handler handler{m};
boost::asio::co_spawn(
pool,
[&stop_source, &m, &handler]() { return receive_messages<msg_1>(stop_source.get_token(), m.sub1, handler); },
boost::asio::detached);
boost::asio::co_spawn(
pool,
[&stop_source, &m, &handler]() { return receive_messages<msg_2>(stop_source.get_token(), m.sub2, handler); },
boost::asio::detached);
pool.join();
}