-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtzc_object.h
217 lines (182 loc) · 4.71 KB
/
tzc_object.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#ifndef __TZC_OBJECT_HPP__
#define __TZC_OBJECT_HPP__
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/atomic/atomic.hpp>
#include <ros/ros.h>
namespace tzc_transport
{
// some typedef for short
typedef boost::atomic< int32_t > atomic_int32_t;
typedef boost::interprocess::managed_shared_memory ShmManager;
typedef boost::shared_ptr< boost::interprocess::managed_shared_memory > ShmManagerPtr;
typedef boost::interprocess::interprocess_mutex ipc_mutex;
class ShmMessage
{
public:
ShmMessage(const ShmManagerPtr & pshm, uint32_t ref0) {
long hc = pshm->get_handle_from_address(this);
next = hc;
prev = hc;
ref = ref0;
}
~ShmMessage() {
}
void addLast(ShmMessage * lc, const ShmManagerPtr & pshm) {
long hc = pshm->get_handle_from_address(lc);
long hn = pshm->get_handle_from_address(this), hp = this->prev;
ShmMessage * ln = this, * lp = (ShmMessage *)pshm->get_address_from_handle(hp);
lc->next = hn;
lc->prev = hp;
lp->next = hc;
ln->prev = hc;
}
bool releaseFirst(const ShmManagerPtr & pshm) {
long hc = next;
ShmMessage * lc = (ShmMessage *)pshm->get_address_from_handle(hc);
if (lc == this)
return false;
if (lc->ref != 0)
return false;
long hn = lc->next, hp = lc->prev;
ShmMessage * ln = (ShmMessage *)pshm->get_address_from_handle(hn), * lp = this;
lp->next = hn;
ln->prev = hp;
pshm->deallocate(lc);
return true;
}
bool releaseOne(const ShmManagerPtr & pshm) {
long hc = next;
ShmMessage * lc = (ShmMessage *)pshm->get_address_from_handle(hc);
while (lc != this) {
if (lc->ref == 0)
break;
hc = lc->next;
lc = (ShmMessage *)pshm->get_address_from_handle(hc);
}
if (lc == this)
return false;
long hn = lc->next, hp = lc->prev;
ShmMessage * ln = (ShmMessage *)pshm->get_address_from_handle(hn), * lp = this;
lp->next = hn;
ln->prev = hp;
pshm->deallocate(lc);
return true;
}
uint32_t take() {
return ref.fetch_add(1, boost::memory_order_relaxed);
}
uint32_t release() {
return ref.fetch_sub(1, boost::memory_order_relaxed);
}
void setRef(uint32_t ref0) {
ref = ref0;
}
public:
long next;
long prev;
atomic_int32_t ref;
long magic_;
};
class ShmObject
{
public:
ShmObject(ShmManager * pshm, std::string name)
: pshm_(pshm), name_(name) {
plck_ = pshm_->find_or_construct< ipc_mutex >("lck")();
pmsg_ = pshm_->find_or_construct< ShmMessage >("msg")(pshm_, 0);
pmsg_->take();
}
~ShmObject() {
if (pmsg_->release() == 1) {
boost::interprocess::shared_memory_object::remove(name_.c_str());
// printf("shm file <%s> removed\n", name_.c_str());
}
}
void addLast(ShmMessage * ptr) {
plck_->lock();
pmsg_->addLast(ptr, pshm_);
ptr->setRef(1);
plck_->unlock();
}
bool releaseFirst() {
plck_->lock();
bool res = pmsg_->releaseFirst(pshm_);
plck_->unlock();
return res;
}
bool releaseOne() {
plck_->lock();
bool res = pmsg_->releaseOne(pshm_);
plck_->unlock();
return res;
}
void * allocate(uint32_t size) {
return pshm_->allocate(size);
}
ShmMessage * convertHandle2Address(long h) {
return (ShmMessage *)pshm_->get_address_from_handle(h);
}
long convertAddress2Handle(ShmMessage * p) {
return pshm_->get_handle_from_address(p);
}
// in shm, message list and ref count (pub # + sub #)
ShmMessage * pmsg_;
// smart pointer of shm
ShmManagerPtr pshm_;
private:
// name of shm
std::string name_;
// in shm, connection lock
ipc_mutex * plck_;
};
typedef boost::shared_ptr< ShmObject > ShmObjectPtr;
class BaseMsg {
public:
BaseMsg() : shmmsg_(NULL) {
}
BaseMsg(const BaseMsg & m) {
handle_ = m.handle_;
magic_ = m.magic_;
if (m.shmmsg_)
m.shmmsg_->take();
if (shmmsg_)
shmmsg_->release();
shmmsg_ = m.shmmsg_;
}
~BaseMsg() {
if (shmmsg_)
shmmsg_->release();
}
BaseMsg & operator = (const BaseMsg & m) {
handle_ = m.handle_;
magic_ = m.magic_;
if (m.shmmsg_)
m.shmmsg_->take();
if (shmmsg_)
shmmsg_->release();
shmmsg_ = m.shmmsg_;
return *this;
}
protected:
long handle_;
long magic_;
ShmMessage * shmmsg_;
friend class ::ros::serialization::Serializer< BaseMsg >;
};
} // namespace tzc_transport
namespace ros
{
namespace serialization
{
template <> struct Serializer< ::tzc_transport::BaseMsg >
{
template<typename Stream, typename T> inline static void allInOne(Stream& stream, T m)
{
stream.next(m.handle_);
stream.next(m.magic_);
}
ROS_DECLARE_ALLINONE_SERIALIZER
};
} // namespace serialization
} // namespace ros
#endif // __TZC_OBJECT_HPP__