forked from zeromq/libzmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_thread.cpp
94 lines (76 loc) · 1.94 KB
/
io_thread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/* SPDX-License-Identifier: MPL-2.0 */
#include "precompiled.hpp"
#include <new>
#include "macros.hpp"
#include "io_thread.hpp"
#include "err.hpp"
#include "ctx.hpp"
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_),
_mailbox_handle (static_cast<poller_t::handle_t> (NULL))
{
_poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (_poller);
if (_mailbox.get_fd () != retired_fd) {
_mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
_poller->set_pollin (_mailbox_handle);
}
}
zmq::io_thread_t::~io_thread_t ()
{
LIBZMQ_DELETE (_poller);
}
void zmq::io_thread_t::start ()
{
char name[16] = "";
snprintf (name, sizeof (name), "IO/%u",
get_tid () - zmq::ctx_t::reaper_tid - 1);
// Start the underlying I/O thread.
_poller->start (name);
}
void zmq::io_thread_t::stop ()
{
send_stop ();
}
zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
{
return &_mailbox;
}
int zmq::io_thread_t::get_load () const
{
return _poller->get_load ();
}
void zmq::io_thread_t::in_event ()
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?
command_t cmd;
int rc = _mailbox.recv (&cmd, 0);
while (rc == 0 || errno == EINTR) {
if (rc == 0)
cmd.destination->process_command (cmd);
rc = _mailbox.recv (&cmd, 0);
}
errno_assert (rc != 0 && errno == EAGAIN);
}
void zmq::io_thread_t::out_event ()
{
// We are never polling for POLLOUT here. This function is never called.
zmq_assert (false);
}
void zmq::io_thread_t::timer_event (int)
{
// No timers here. This function is never called.
zmq_assert (false);
}
zmq::poller_t *zmq::io_thread_t::get_poller () const
{
zmq_assert (_poller);
return _poller;
}
void zmq::io_thread_t::process_stop ()
{
zmq_assert (_mailbox_handle);
_poller->rm_fd (_mailbox_handle);
_poller->stop ();
}