forked from zeromq/libzmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsignaler.cpp
387 lines (361 loc) · 10.3 KB
/
signaler.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
/* SPDX-License-Identifier: MPL-2.0 */
#include "precompiled.hpp"
#include "poller.hpp"
#include "polling_util.hpp"
#if defined ZMQ_POLL_BASED_ON_POLL
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
#include <poll.h>
#endif
#elif defined ZMQ_POLL_BASED_ON_SELECT
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <sockLib.h>
#include <strings.h>
#else
#include <sys/select.h>
#endif
#endif
#include "signaler.hpp"
#include "likely.hpp"
#include "stdint.hpp"
#include "config.hpp"
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif
#if !defined(ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
if (ms_ == 0)
return 0;
#if defined ZMQ_HAVE_ANDROID
usleep (ms_ * 1000);
return 0;
#elif defined ZMQ_HAVE_VXWORKS
struct timespec ns_;
ns_.tv_sec = ms_ / 1000;
ns_.tv_nsec = ms_ % 1000 * 1000000;
return nanosleep (&ns_, 0);
#else
return usleep (ms_ * 1000);
#endif
}
// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
unsigned int ms_so_far = 0;
const unsigned int min_step_ms = 1;
const unsigned int max_step_ms = 100;
const unsigned int step_ms =
std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
int rc = 0; // do not sleep on first attempt
do {
if (rc == -1 && errno == EAGAIN) {
sleep_ms (step_ms);
ms_so_far += step_ms;
}
rc = close (fd_);
} while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
return rc;
}
#endif
zmq::signaler_t::signaler_t ()
{
// Create the socketpair for signaling.
if (make_fdpair (&_r, &_w) == 0) {
unblock_socket (_w);
unblock_socket (_r);
}
#ifdef HAVE_FORK
pid = getpid ();
#endif
}
// This might get run after some part of construction failed, leaving one or
// both of _r and _w retired_fd.
zmq::signaler_t::~signaler_t ()
{
#if defined ZMQ_HAVE_EVENTFD
if (_r == retired_fd)
return;
int rc = close_wait_ms (_r);
errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
if (_w != retired_fd) {
const struct linger so_linger = {1, 0};
int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
reinterpret_cast<const char *> (&so_linger),
sizeof so_linger);
// Only check shutdown if WSASTARTUP was previously done
if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (_w);
wsa_assert (rc != SOCKET_ERROR);
if (_r == retired_fd)
return;
rc = closesocket (_r);
wsa_assert (rc != SOCKET_ERROR);
}
}
#else
if (_w != retired_fd) {
int rc = close_wait_ms (_w);
errno_assert (rc == 0);
}
if (_r != retired_fd) {
int rc = close_wait_ms (_r);
errno_assert (rc == 0);
}
#endif
}
zmq::fd_t zmq::signaler_t::get_fd () const
{
return _r;
}
void zmq::signaler_t::send ()
{
#if defined HAVE_FORK
if (unlikely (pid != getpid ())) {
//printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
return; // do not send anything in forked child context
}
#endif
#if defined ZMQ_HAVE_EVENTFD
const uint64_t inc = 1;
ssize_t sz = write (_w, &inc, sizeof (inc));
errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
const char dummy = 0;
int nbytes;
do {
nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
// wsa_assert does not abort on WSAEWOULDBLOCK. If we get this, we retry.
} while (nbytes == SOCKET_ERROR);
// Given the small size of dummy (should be 1) expect that send was able to send everything.
zmq_assert (nbytes == sizeof (dummy));
#elif defined ZMQ_HAVE_VXWORKS
unsigned char dummy = 0;
while (true) {
ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
if (unlikely (nbytes == -1 && errno == EINTR))
continue;
#if defined(HAVE_FORK)
if (unlikely (pid != getpid ())) {
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
errno = EINTR;
break;
}
#endif
zmq_assert (nbytes == sizeof dummy);
break;
}
#else
unsigned char dummy = 0;
while (true) {
ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
if (unlikely (nbytes == -1 && errno == EINTR))
continue;
#if defined(HAVE_FORK)
if (unlikely (pid != getpid ())) {
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
errno = EINTR;
break;
}
#endif
zmq_assert (nbytes == sizeof dummy);
break;
}
#endif
}
int zmq::signaler_t::wait (int timeout_) const
{
#ifdef HAVE_FORK
if (unlikely (pid != getpid ())) {
// we have forked and the file descriptor is closed. Emulate an interrupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
errno = EINTR;
return -1;
}
#endif
#ifdef ZMQ_POLL_BASED_ON_POLL
struct pollfd pfd;
pfd.fd = _r;
pfd.events = POLLIN;
const int rc = poll (&pfd, 1, timeout_);
if (unlikely (rc < 0)) {
errno_assert (errno == EINTR);
return -1;
}
if (unlikely (rc == 0)) {
errno = EAGAIN;
return -1;
}
#ifdef HAVE_FORK
if (unlikely (pid != getpid ())) {
// we have forked and the file descriptor is closed. Emulate an interrupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
errno = EINTR;
return -1;
}
#endif
zmq_assert (rc == 1);
zmq_assert (pfd.revents & POLLIN);
return 0;
#elif defined ZMQ_POLL_BASED_ON_SELECT
optimized_fd_set_t fds (1);
FD_ZERO (fds.get ());
FD_SET (_r, fds.get ());
struct timeval timeout;
if (timeout_ >= 0) {
timeout.tv_sec = timeout_ / 1000;
timeout.tv_usec = timeout_ % 1000 * 1000;
}
#ifdef ZMQ_HAVE_WINDOWS
int rc =
select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc =
select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
if (unlikely (rc < 0)) {
errno_assert (errno == EINTR);
return -1;
}
#endif
if (unlikely (rc == 0)) {
errno = EAGAIN;
return -1;
}
zmq_assert (rc == 1);
return 0;
#else
#error
#endif
}
void zmq::signaler_t::recv ()
{
// Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
uint64_t dummy;
ssize_t sz = read (_r, &dummy, sizeof (dummy));
errno_assert (sz == sizeof (dummy));
// If we accidentally grabbed the next signal(s) along with the current
// one, return it back to the eventfd object.
if (unlikely (dummy > 1)) {
const uint64_t inc = dummy - 1;
ssize_t sz2 = write (_w, &inc, sizeof (inc));
errno_assert (sz2 == sizeof (inc));
return;
}
zmq_assert (dummy == 1);
#else
unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
const int nbytes =
::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
errno_assert (nbytes >= 0);
#else
ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
errno_assert (nbytes >= 0);
#endif
zmq_assert (nbytes == sizeof (dummy));
zmq_assert (dummy == 0);
#endif
}
int zmq::signaler_t::recv_failable ()
{
// Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
uint64_t dummy;
ssize_t sz = read (_r, &dummy, sizeof (dummy));
if (sz == -1) {
errno_assert (errno == EAGAIN);
return -1;
}
errno_assert (sz == sizeof (dummy));
// If we accidentally grabbed the next signal(s) along with the current
// one, return it back to the eventfd object.
if (unlikely (dummy > 1)) {
const uint64_t inc = dummy - 1;
ssize_t sz2 = write (_w, &inc, sizeof (inc));
errno_assert (sz2 == sizeof (inc));
return 0;
}
zmq_assert (dummy == 1);
#else
unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
const int nbytes =
::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
if (nbytes == SOCKET_ERROR) {
const int last_error = WSAGetLastError ();
if (last_error == WSAEWOULDBLOCK) {
errno = EAGAIN;
return -1;
}
wsa_assert (last_error == WSAEWOULDBLOCK);
}
#elif defined ZMQ_HAVE_VXWORKS
ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
if (nbytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
errno = EAGAIN;
return -1;
}
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
|| errno == EINTR);
}
#else
ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
if (nbytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
errno = EAGAIN;
return -1;
}
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
|| errno == EINTR);
}
#endif
zmq_assert (nbytes == sizeof (dummy));
zmq_assert (dummy == 0);
#endif
return 0;
}
bool zmq::signaler_t::valid () const
{
return _w != retired_fd;
}
#ifdef HAVE_FORK
void zmq::signaler_t::forked ()
{
// Close file descriptors created in the parent and create new pair
close (_r);
close (_w);
make_fdpair (&_r, &_w);
}
#endif