YARP
Yet Another Robot Platform
PortCoreAdapter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/PortReader.h>
13 #include <yarp/os/Time.h>
15 
16 namespace {
17 YARP_OS_LOG_COMPONENT(PORTCOREADAPTER, "yarp.os.impl.PortCoreAdapter")
18 } // namespace
19 
21 {
22  setContactable(&owner);
23 }
24 
26 {
27  stateMutex.lock();
28  closed = false;
29  opened = true;
30  stateMutex.unlock();
31 }
32 
34 {
35  usedForRead = true;
36 }
37 
39 {
40  usedForWrite = true;
41 }
42 
44 {
45  usedForRpc = true;
46 }
47 
49 {
50  commitToRead = true;
51 }
52 
54 {
55  commitToWrite = true;
56 }
57 
59 {
60  commitToRpc = true;
61 }
62 
64 {
65  if (!readBackground) {
66  stateMutex.lock();
67  closed = true;
68  consume.post();
69  consume.post();
70  stateMutex.unlock();
71  }
72 }
73 
75 {
76  if (isWriting()) {
77  double start = SystemClock::nowSystem();
78  double pause = 0.01;
79  do {
81  pause *= 2;
82  } while (isWriting() && (SystemClock::nowSystem() - start < 3));
83  if (isWriting()) {
84  yCError(PORTCOREADAPTER, "Closing port that was sending data (slowly)");
85  }
86  }
87 }
88 
89 
91 {
92  while (produce.check()) {
93  }
94  while (readBlock.check()) {
95  }
96  resume();
97  readBlock.post();
98 }
99 
101 {
102  if (permanentReadDelegate != nullptr) {
103  bool result = permanentReadDelegate->read(reader);
104  return result;
105  }
106 
107  // called by comms code
108  readBlock.wait();
109 
110  if (!reader.isValid()) {
111  // interrupt
112  stateMutex.lock();
113  if (readDelegate != nullptr) {
114  readResult = readDelegate->read(reader);
115  }
116  stateMutex.unlock();
117  produce.post();
118  readBlock.post();
119  return false;
120  }
121 
122  if (closed) {
123  yCDebug(PORTCOREADAPTER, "Port::read shutting down");
124  readBlock.post();
125  return false;
126  }
127 
128  // wait for happy consumer - don't want to miss a packet
129  if (!readBackground) {
130  consume.wait();
131  }
132 
133  stateMutex.lock();
134  readResult = false;
135  if (readDelegate != nullptr) {
136  readResult = readDelegate->read(reader);
137  } else {
138  // read and ignore
139  yCDebug(PORTCOREADAPTER, "data received in Port, no reader for it");
140  Bottle b;
141  b.read(reader);
142  }
143  if (!readBackground) {
144  readDelegate = nullptr;
145  writeDelegate = nullptr;
146  }
147  bool result = readResult;
148  stateMutex.unlock();
149  if (!readBackground) {
150  produce.post();
151  }
152  if (result && willReply) {
153  consume.wait();
154  if (closed) {
155  yCDebug(PORTCOREADAPTER, "Port::read shutting down");
156  readBlock.post();
157  return false;
158  }
159  if (writeDelegate != nullptr) {
160  stateMutex.lock();
161  ConnectionWriter* writer = reader.getWriter();
162  if (writer != nullptr) {
163  result = readResult = writeDelegate->write(*writer);
164  }
165  stateMutex.unlock();
166  }
167  if (dropDue) {
168  reader.requestDrop();
169  }
170  produce.post();
171  }
172  readBlock.post();
173  return result;
174 }
175 
177 {
178  // called by user
179 
180  // user claimed they would reply to last read, but then
181  // decided not to.
182  if (replyDue) {
183  Bottle emptyMessage;
184  reply(emptyMessage, false, false);
185  replyDue = false;
186  dropDue = false;
187  }
188  if (willReply) {
189  replyDue = true;
190  }
191 
192  stateMutex.lock();
193  readActive = true;
194  readDelegate = &reader;
195  checkType(reader);
196  writeDelegate = nullptr;
197  this->willReply = willReply;
198  consume.post(); // happy consumer
199  stateMutex.unlock();
200 
201  produce.wait();
202  stateMutex.lock();
203  if (!readBackground) {
204  readDelegate = nullptr;
205  }
206  bool result = readResult;
207  if (!result) {
208  replyDue = false;
209  }
210  stateMutex.unlock();
211  return result;
212 }
213 
214 bool yarp::os::impl::PortCoreAdapter::reply(PortWriter& writer, bool drop, bool /*interrupted*/)
215 {
216  // send reply even if interrupt has happened in interim
217  if (!replyDue) {
218  return false;
219  }
220 
221  replyDue = false;
222  dropDue = drop;
223  writeDelegate = &writer;
224  consume.post();
225  produce.wait();
226  bool result = readResult;
227  return result;
228 }
229 
230 /*
231  Configuration of a port that should be remembered
232  between opens and closes
233 */
234 
236 {
237  stateMutex.lock();
238  readActive = true;
239  readBackground = true;
240  readDelegate = &reader;
241  permanentReadDelegate = &reader;
242  checkType(reader);
243  consume.post(); // just do this once
244  stateMutex.unlock();
245 }
246 
248 {
249  stateMutex.lock();
250  adminReadDelegate = &reader;
251  setAdminReadHandler(reader);
252  stateMutex.unlock();
253 }
254 
256 {
257  recReadCreator = &creator;
258  setReadCreator(creator);
259 }
260 
262 {
263  if (waitAfterSend && isManual()) {
264  yCError(PORTCOREADAPTER, "Cannot use background-mode writes on a fake port");
265  }
266  recWaitAfterSend = waitAfterSend ? 1 : 0;
267  setWaitAfterSend(waitAfterSend);
268 }
269 
270 #ifndef YARP_NO_DEPRECATED // Since YARP 3.3
274 {
275  recCallbackLock = nullptr;
276  old_recCallbackLock = lock;
277  haveCallbackLock = true;
278  return setCallbackLock(lock);
279 }
281 #endif
282 
284 {
285  recCallbackLock = lock;
286 #ifndef YARP_NO_DEPRECATED // Since YARP 3.3
287  old_recCallbackLock = nullptr;
288 #endif
289  haveCallbackLock = true;
290  return setCallbackLock(lock);
291 }
292 
294 {
295  recCallbackLock = nullptr;
296 #ifndef YARP_NO_DEPRECATED // Since YARP 3.3
297  old_recCallbackLock = nullptr;
298 #endif
299  haveCallbackLock = false;
300  return removeCallbackLock();
301 }
302 
304 {
305  return readDelegate;
306 }
307 
309 {
310  return adminReadDelegate;
311 }
312 
314 {
315  return recReadCreator;
316 }
317 
319 {
320  return recWaitAfterSend;
321 }
322 
323 
325 {
326  return opened;
327 }
328 
330 {
331  this->opened = opened;
332 }
333 
335 {
336  includeNode = flag;
337 }
YARP_WARNING_PUSH
#define YARP_WARNING_PUSH
Definition: system.h:334
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:72
yarp::os::impl::PortCoreAdapter::setWriteOnly
void setWriteOnly()
Definition: PortCoreAdapter.cpp:53
yarp::os::impl::PortCoreAdapter::PortCoreAdapter
PortCoreAdapter(Port &owner)
Definition: PortCoreAdapter.cpp:20
yarp::os::impl::PortCore::setContactable
void setContactable(Contactable *contactable)
Definition: PortCore.h:300
yarp::os::impl::PortCoreAdapter::openable
void openable()
Definition: PortCoreAdapter.cpp:25
yarp::os::impl::PortCoreAdapter::configAdminReader
void configAdminReader(PortReader &reader)
Definition: PortCoreAdapter.cpp:247
yarp::os::impl::PortCoreAdapter::checkReadCreator
PortReaderCreator * checkReadCreator()
Definition: PortCoreAdapter.cpp:313
yarp::os::impl::PortCoreAdapter::configReadCreator
void configReadCreator(PortReaderCreator &creator)
Definition: PortCoreAdapter.cpp:255
yarp::os::impl::PortCoreAdapter::setOpen
void setOpen(bool opened)
Definition: PortCoreAdapter.cpp:329
yarp::os::ConnectionReader::isValid
virtual bool isValid() const =0
yarp::os::impl::PortCoreAdapter::finishWriting
void finishWriting()
Definition: PortCoreAdapter.cpp:74
LogComponent.h
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:26
yarp::os::impl::PortCoreAdapter::isOpened
bool isOpened()
Definition: PortCoreAdapter.cpp:324
yarp::os::SystemClock::nowSystem
static double nowSystem()
Definition: SystemClock.cpp:37
yarp::os::Port
Definition: Port.h:49
yarp::os::impl::PortCoreAdapter::alertOnWrite
void alertOnWrite()
Definition: PortCoreAdapter.cpp:38
yarp::os::impl::PortCoreAdapter::alertOnRpc
void alertOnRpc()
Definition: PortCoreAdapter.cpp:43
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:27
yarp::os::impl::PortCoreAdapter::alertOnRead
void alertOnRead()
Definition: PortCoreAdapter.cpp:33
yarp::os::ConnectionWriter
An interface for writing to a network connection.
Definition: ConnectionWriter.h:39
yarp::os::SystemClock::delaySystem
static void delaySystem(double seconds)
Definition: SystemClock.cpp:32
yarp::os::PortReaderCreator
A creator for readers.
Definition: PortReaderCreator.h:33
yarp::os::impl::PortCoreAdapter::setReadOnly
void setReadOnly()
Definition: PortCoreAdapter.cpp:48
yarp::os::impl::PortCoreAdapter::finishReading
void finishReading()
Definition: PortCoreAdapter.cpp:63
yarp::os::impl::PortCoreAdapter::checkWaitAfterSend
int checkWaitAfterSend()
Definition: PortCoreAdapter.cpp:318
yarp::os::ConnectionReader::getWriter
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
yarp::os::impl::PortCoreAdapter::read
bool read(ConnectionReader &reader) override
Callback for data.
Definition: PortCoreAdapter.cpp:100
YARP_WARNING_POP
#define YARP_WARNING_POP
Definition: system.h:335
yarp::os::impl::PortCoreAdapter::resumeFull
void resumeFull()
Definition: PortCoreAdapter.cpp:90
yarp::os::impl::PortCoreAdapter::unconfigCallbackLock
bool unconfigCallbackLock()
Definition: PortCoreAdapter.cpp:293
yarp::os::ConnectionReader
An interface for reading from a network connection.
Definition: ConnectionReader.h:39
yarp::os::impl::PortCoreAdapter::includeNodeInName
void includeNodeInName(bool flag)
Definition: PortCoreAdapter.cpp:334
yarp::os::impl::PortCoreAdapter::checkAdminPortReader
PortReader * checkAdminPortReader()
Definition: PortCoreAdapter.cpp:308
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::Bottle::read
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
PortReader.h
Time.h
PortCoreAdapter.h
yarp::os::Mutex
Basic wrapper for mutual exclusion.
Definition: Mutex.h:34
yarp::os::impl::PortCoreAdapter::setRpc
void setRpc()
Definition: PortCoreAdapter.cpp:58
yarp::os::impl::PortCoreAdapter::checkPortReader
PortReader * checkPortReader()
Definition: PortCoreAdapter.cpp:303
yarp::os::impl::PortCoreAdapter::configCallbackLock
bool configCallbackLock(Mutex *lock)
Definition: PortCoreAdapter.cpp:273
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
YARP_DISABLE_DEPRECATED_WARNING
#define YARP_DISABLE_DEPRECATED_WARNING
Definition: system.h:336
yarp::os::impl::PortCoreAdapter::configWaitAfterSend
void configWaitAfterSend(bool waitAfterSend)
Definition: PortCoreAdapter.cpp:261
yarp::os::ConnectionReader::requestDrop
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
yarp::os::impl::PortCoreAdapter::configReader
void configReader(PortReader &reader)
Definition: PortCoreAdapter.cpp:235
yarp::os::impl::PortCoreAdapter::reply
bool reply(PortWriter &writer, bool drop, bool interrupted)
Definition: PortCoreAdapter.cpp:214