YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
PortCoreOutputUnit.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/os/Name.h>
10#include <yarp/os/PortInfo.h>
11#include <yarp/os/PortReport.h>
12#include <yarp/os/Portable.h>
13#include <yarp/os/Time.h>
17
18namespace {
19YARP_OS_LOG_COMPONENT(PORTCOREOUTPUTUNIT, "yarp.os.impl.PortCoreOutputUnit")
20} // namespace
21
22using namespace yarp::os::impl;
23using namespace yarp::os;
24
26 PortCoreUnit(owner, index),
27 op(op),
28 closing(false),
29 finished(false),
30 running(false),
31 threaded(false),
32 sending(false),
33 name(owner.getName()),
34 phase(1),
35 activate(0),
36 trackerMutex(),
37 cachedWriter(nullptr),
38 cachedReader(nullptr),
39 cachedCallback(nullptr),
40 cachedTracker(nullptr)
41{
42 yCIAssert(PORTCOREOUTPUTUNIT, getName(), op != nullptr);
43}
44
49
50
52{
53 phase.wait();
54
55 if (!threaded) {
56 running = false;
57 sending = false;
59 phase.post();
60 return true;
61 }
62
63 bool result = PortCoreUnit::start();
64 if (result) {
65 phase.wait();
66 phase.post();
67 } else {
68 phase.post();
69 }
70
71 return result;
72}
73
74
76{
77 running = true;
78 sending = false;
79
80 // By default, we don't start up a thread for outputs.
81
82 if (!threaded) {
84 phase.post();
85 } else {
86 phase.post();
87 Route r = getRoute();
88 while (!closing) {
89 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "waiting");
90 activate.wait();
92 if (!closing) {
93 if (sending) {
94 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "write something in background");
95 sendHelper();
96 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "wrote something in background");
97 trackerMutex.lock();
98 if (cachedTracker != nullptr) {
99 void* t = cachedTracker;
100 cachedTracker = nullptr;
101 sending = false;
103 } else {
104 sending = false;
105 }
106 trackerMutex.unlock();
107 }
108 }
109 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "wrote something in background");
110 }
111 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "thread closing");
112 sending = false;
113 }
114}
115
116
118{
119 if (op != nullptr) {
120 Route route = op->getRoute();
121 setMode();
122 getOwner().reportUnit(this, true);
123
124 std::string msg = std::string("Sending output from ") + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
125 if (Name(route.getToName()).isRooted()) {
126 if (Name(route.getFromName()).isRooted()) {
127 yCIInfo(PORTCOREOUTPUTUNIT, getName(), "%s", msg.c_str());
128 }
129 }
130
131 // Report the new connection
133 info.message = msg;
135 info.incoming = false;
136 info.created = true;
137 info.sourceName = route.getFromName();
138 info.targetName = route.getToName();
139 info.portName = info.sourceName;
140 info.carrierName = route.getCarrierName();
142 }
143
144 // no thread component
145 running = false;
146}
147
148void PortCoreOutputUnit::closeBasic()
149{
150 bool waitForOther = false;
151 if (op != nullptr) {
152 op->getConnection().prepareDisconnect();
153 Route route = op->getRoute();
154 if (op->getConnection().isConnectionless() || op->getConnection().isBroadcast()) {
155 yCIInfo(PORTCOREOUTPUTUNIT, getName(), "output for route %s asking other side to close by out-of-band means",
156 route.toString().c_str());
158 route.getFromName(),
159 true);
160 } else {
161 if (op->getConnection().canEscape()) {
162 BufferedConnectionWriter buf(op->getConnection().isTextMode(),
163 op->getConnection().isBareMode());
164 PortCommand pc('\0', std::string("q"));
165 pc.write(buf);
166 //printf("Asked for %s to close...\n",
167 // op->getRoute().toString().c_str());
168 waitForOther = op->write(buf);
169 }
170 }
171
172 std::string msg = std::string("Removing output from ") + route.getFromName() + " to " + route.getToName();
173
174 if (Name(route.getToName()).isRooted()) {
175 if (Name(route.getFromName()).isRooted()) {
176 yCIInfo(PORTCOREOUTPUTUNIT, getName(), "%s", msg.c_str());
177 }
178 }
179
180 getOwner().reportUnit(this, false);
181
182 // Report the disappearing connection
184 info.message = msg;
186 info.incoming = false;
187 info.created = false;
188 info.sourceName = route.getFromName();
189 info.targetName = route.getToName();
190 info.portName = info.sourceName;
191 info.carrierName = route.getCarrierName();
193 }
194
195
196 if (op != nullptr) {
197 if (waitForOther) {
198 // quit is only acknowledged in certain conditions
199 if (op->getConnection().isTextMode() && op->getConnection().supportReply()) {
200 InputStream& is = op->getInputStream();
201 ManagedBytes dummy(1);
202 is.read(dummy.bytes());
203 }
204 }
205 op->close();
206 delete op;
207 op = nullptr;
208 }
209}
210
211void PortCoreOutputUnit::closeMain()
212{
213 if (finished) {
214 return;
215 }
216
217 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "closing");
218
219 if (running) {
220 // give a kick (unfortunately unavoidable)
221
222 if (op != nullptr) {
223 op->interrupt();
224 }
225
226 closing = true;
227 phase.post();
228 activate.post();
229 join();
230 }
231
232 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "internal join");
233
234 closeBasic();
235 running = false;
236 closing = false;
237 finished = true;
238
239 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "closed");
240}
241
242
244{
245 if (op != nullptr) {
246 Route r = op->getRoute();
247 op->beginWrite();
248 return r;
249 }
250 return PortCoreUnit::getRoute();
251}
252
253bool PortCoreOutputUnit::sendHelper()
254{
255 bool replied = false;
256 if (op != nullptr) {
257 bool done = false;
258 BufferedConnectionWriter buf(op->getConnection().isTextMode(),
259 op->getConnection().isBareMode());
260 if (cachedReader != nullptr) {
261 buf.setReplyHandler(*cachedReader);
262 }
263
264 if (op->getSender().modifiesOutgoingData()) {
265 if (op->getSender().acceptOutgoingData(*cachedWriter)) {
266 cachedWriter = &op->getSender().modifyOutgoingData(*cachedWriter);
267 } else {
268 return (done = true);
269 }
270 }
271
272 if (op->getConnection().isLocal()) {
273 // WARNING Cast away const qualifier.
274 // This may actually cause bugs when using the local carrier
275 // with something that is actually const (i.e. that is using
276 // some parts of memory that cannot be written.
277 auto* pw = const_cast<yarp::os::PortWriter*>(cachedWriter);
278 auto* p = dynamic_cast<yarp::os::Portable*>(pw);
279 if (p == nullptr) {
280 yCIError(PORTCOREOUTPUTUNIT, getName(), "cast failed.");
281 return false;
282 }
283 buf.setReference(p);
284 } else {
285 yCIAssert(PORTCOREOUTPUTUNIT, getName(), cachedWriter != nullptr);
286 bool ok = cachedWriter->write(buf);
287 if (!ok) {
288 done = true;
289 }
290
291 bool suppressReply = (buf.getReplyHandler() == nullptr);
292
293 if (!done) {
294 if (!op->getConnection().canEscape()) {
295 if (!cachedEnvelope.empty()) {
296 op->getConnection().handleEnvelope(cachedEnvelope);
297 }
298 } else {
299 buf.addToHeader();
300
301 if (!cachedEnvelope.empty()) {
302 if (cachedEnvelope == "__ADMIN") {
303 PortCommand pc('a', "");
304 pc.write(buf);
305 } else {
306 PortCommand pc('\0', std::string(suppressReply ? "D " : "d ") + cachedEnvelope);
307 pc.write(buf);
308 }
309 } else {
310 PortCommand pc(suppressReply ? 'D' : 'd', "");
311 pc.write(buf);
312 }
313 }
314 }
315 }
316
317 if (!done) {
318 if (op->getConnection().isActive()) {
319 replied = op->write(buf);
320 if (replied && op->getSender().modifiesReply() && cachedReader != nullptr) {
321 cachedReader = &op->getSender().modifyReply(*cachedReader);
322 }
323 }
324 if (!op->isOk()) {
325 done = true;
326 }
327 }
328
329 if (buf.dropRequested()) {
330 done = true;
331 }
332 if (done) {
333 closeBasic();
334 closing = true;
335 finished = true;
336 setDoomed();
337 }
338 }
339
340
341 return replied;
342}
343
345 yarp::os::PortReader* reader,
346 const yarp::os::PortWriter* callback,
347 void* tracker,
348 const std::string& envelopeString,
349 bool waitAfter,
350 bool waitBefore,
351 bool* gotReply)
352{
353 bool replied = false;
354
355 if (op != nullptr) {
356 if (!op->getConnection().isActive()) {
357 return tracker;
358 }
359 }
360
361 if (!waitBefore || !waitAfter) {
362 if (!running) {
363 // we must have a thread if we're going to be skipping waits
364 threaded = true;
365 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "starting a thread for output");
366 start();
367 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "started a thread for output");
368 }
369 }
370
371 if ((!waitBefore) && waitAfter) {
372 yCIError(PORTCOREOUTPUTUNIT, getName(), "chosen port wait combination not yet implemented");
373 }
374 if (!sending) {
375 cachedWriter = &writer;
376 cachedReader = reader;
377 cachedCallback = callback;
378 cachedEnvelope = envelopeString;
379
380 sending = true;
381 if (waitAfter) {
382 replied = sendHelper();
383 sending = false;
384 } else {
385 trackerMutex.lock();
386 void* nextTracker = tracker;
387 tracker = cachedTracker;
388 cachedTracker = nextTracker;
389 activate.post();
390 trackerMutex.unlock();
391 }
392 } else {
393 yCIDebug(PORTCOREOUTPUTUNIT, getName(), "skipping connection tagged as sending something");
394 }
395
396 if (waitAfter) {
397 if (gotReply != nullptr) {
398 *gotReply = replied;
399 }
400 }
401
402 // return tracker that we no longer need
403 return tracker;
404}
405
406
408{
409 void* tracker = nullptr;
410 trackerMutex.lock();
411 if (!sending) {
412 tracker = cachedTracker;
413 cachedTracker = nullptr;
414 }
415 trackerMutex.unlock();
416 return tracker;
417}
418
420{
421 return sending;
422}
423
425{
426 if (op != nullptr) {
427 op->getConnection().setCarrierParams(params);
428 }
429}
430
432{
433 if (op != nullptr) {
434 op->getConnection().getCarrierParams(params);
435 }
436}
437
A mini-server for performing network communication in the background.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
Simple specification of the minimum functions needed from input streams.
Definition InputStream.h:25
virtual int read()
Read and return a single byte.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Simple abstraction for a YARP port name.
Definition Name.h:18
bool isRooted() const
Check if port name begins with "/".
Definition Name.cpp:16
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition Network.cpp:1511
The output side of an active connection between two ports.
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual const Route & getRoute() const =0
virtual void close()=0
Negotiate an end to operations.
virtual void interrupt()=0
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
virtual Connection & getSender()=0
It is possible to chain a basic connection with a modifier.
virtual void beginWrite()=0
Notify connection that we intend to write to it.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Information about a port connection or event.
Definition PortInfo.h:25
std::string message
A human-readable description of contents.
Definition PortInfo.h:68
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition PortInfo.h:39
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
This is a base class for objects that can be both read from and be written to the YARP network.
Definition Portable.h:25
A class for storing options and configuration information.
Definition Property.h:33
Information about a connection between two ports.
Definition Route.h:28
const std::string & getToName() const
Get the destination of the route.
Definition Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition Route.cpp:93
void wait()
Decrement the counter, even if we must wait to do that.
Definition Semaphore.cpp:96
void post()
Increment the counter.
A helper for creating cached object descriptions.
Simple Readable and Writable object representing a command to a YARP port.
Definition PortCommand.h:24
void getCarrierParams(yarp::os::Property &params) override
void * takeTracker() override
Reacquire a tracker previously passed via send().
void setCarrierParams(const yarp::os::Property &params) override
Set arbitrary parameters for this connection.
void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelopeString, bool waitAfter, bool waitBefore, bool *gotReply) override
Send a message on the connection.
PortCoreOutputUnit(PortCore &owner, int index, OutputProtocol *op)
Constructor.
bool start() override
Prepare to serve this output.
void run() override
The body of a thread managing background sends.
virtual void runSingleThreaded()
Perform send operations without a separate thread.
This manages a single threaded resource related to a single input or output connection.
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
void setDoomed()
Request that this connection be shut down as soon as possible.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
int join(double seconds=-1)
#define yCIAssert(component, id, x)
#define yCIError(component, id,...)
#define yCIInfo(component, id,...)
#define yCIDebug(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.