forked from zeromq/libzmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathypipe_conflate.hpp
95 lines (77 loc) · 2.41 KB
/
ypipe_conflate.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/* SPDX-License-Identifier: MPL-2.0 */
#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
#include "platform.hpp"
#include "dbuffer.hpp"
#include "ypipe_base.hpp"
namespace zmq
{
// Adapter for dbuffer, to plug it in instead of a queue for the sake
// of implementing the conflate socket option, which, if set, makes
// the receiving side to discard all incoming messages but the last one.
//
// reader_awake flag is needed here to mimic ypipe delicate behaviour
// around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp)
template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
{
public:
// Initialises the pipe.
ypipe_conflate_t () : reader_awake (false) {}
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
#ifdef ZMQ_HAVE_OPENVMS
#pragma message save
#pragma message disable(UNINIT)
#endif
void write (const T &value_, bool incomplete_)
{
(void) incomplete_;
dbuffer.write (value_);
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
// There are no incomplete items for conflate ypipe
bool unwrite (T *)
{
return false;
}
// Flush is no-op for conflate ypipe. Reader asleep behaviour
// is as of the usual ypipe.
// Returns false if the reader thread is sleeping. In that case,
// caller is obliged to wake the reader up before using the pipe again.
bool flush ()
{
return reader_awake;
}
// Check whether item is available for reading.
bool check_read ()
{
const bool res = dbuffer.check_read ();
if (!res)
reader_awake = false;
return res;
}
// Reads an item from the pipe. Returns false if there is no value.
// available.
bool read (T *value_)
{
if (!check_read ())
return false;
return dbuffer.read (value_);
}
// Applies the function fn to the first element in the pipe
// and returns the value returned by the fn.
// The pipe mustn't be empty or the function crashes.
bool probe (bool (*fn_) (const T &))
{
return dbuffer.probe (fn_);
}
protected:
dbuffer_t<T> dbuffer;
bool reader_awake;
ZMQ_NON_COPYABLE_NOR_MOVABLE (ypipe_conflate_t)
};
}
#endif