YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
Port.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include <yarp/os/Port.h>
8
9#include <yarp/conf/system.h>
11
12#include <yarp/os/Bottle.h>
13#include <yarp/os/Contact.h>
14#include <yarp/os/Network.h>
15#include <yarp/os/Portable.h>
16#include <yarp/os/Time.h>
22
23using namespace yarp::os::impl;
24using namespace yarp::os;
25
26namespace {
27YARP_OS_LOG_COMPONENT(PORT, "yarp.os.Port")
28} // namespace
29
30void* Port::needImplementation() const
31{
32 if (implementation != nullptr) {
33 return implementation;
34 }
35 Port* self = const_cast<Port*>(this);
36 self->implementation = new yarp::os::impl::PortCoreAdapter(*self);
37 yCAssert(PORT, self->implementation != nullptr);
38 self->owned = true;
39 return self->implementation;
40}
41
42// implementation is a PortCoreAdapter
43#define IMPL() (*reinterpret_cast<yarp::os::impl::PortCoreAdapter*>(needImplementation()))
44
46 implementation(nullptr),
47 owned(false)
48{
49}
50
52{
53 if (implementation != nullptr) {
54 close();
55 if (owned) {
56 delete (static_cast<PortCoreAdapter*>(implementation));
57 }
58 implementation = nullptr;
59 owned = false;
60 }
61}
62
64{
65 close();
66 if (owned) {
67 delete (static_cast<PortCoreAdapter*>(implementation));
68 }
69 implementation = port.implementation;
70 owned = false;
71 return true;
72}
73
74bool Port::openFake(const std::string& name)
75{
76 return open(Contact(name), false, name.c_str());
77}
78
79bool Port::open(const std::string& name)
80{
81 return open(Contact(name));
82}
83
84bool Port::open(const Contact& contact, bool registerName)
85{
86 return open(contact, registerName, nullptr);
87}
88
89bool Port::open(const Contact& contact, bool registerName, const char* fakeName)
90{
91 Contact contact2 = contact;
92
94 yCError(PORT, "YARP not initialized; create a yarp::os::Network object before using ports");
95 return false;
96 }
97
98 std::string n = contact2.getName();
99
100 NameConfig conf;
101
102 std::string nportnumber = std::string("YARP_PORTNUMBER") + conf.getSafeString(n);
103 uint16_t port = yarp::conf::environment::get_numeric<uint16_t>(nportnumber);
104 if (port != 0) {
105 contact2.setPort(port);
106 }
107
108 std::string nenv = std::string("YARP_RENAME") + conf.getSafeString(n);
110 if (!rename.empty()) {
111 n = rename;
112 contact2.setName(n);
113 }
114
115 bool local = false;
116 if (n.empty() && contact2.getPort() <= 0) {
117 local = true;
118 registerName = false;
119 n = "...";
120 }
121
123 if (currentCore != nullptr) {
124 currentCore->active = false;
125 }
126 if (!n.empty() && n[0] != '/' && n[0] != '=' && n != "..." && n.substr(0, 3) != "...") {
127 if (fakeName == nullptr) {
128 yCError(PORT, "Port name '%s' needs to start with a '/' character", n.c_str());
129 return false;
130 }
131 }
132 if (!n.empty() && n != "..." && n[0] != '=' && n.substr(0, 3) != "...") {
133 if (fakeName == nullptr) {
134 std::string prefix = yarp::conf::environment::get_string("YARP_PORT_PREFIX");
135 if (!prefix.empty()) {
136 n = prefix + n;
137 contact2.setName(n);
138 }
139 }
140 }
141 if (currentCore != nullptr) {
143 nc.fromString(n);
144 if (!nc.getNestedName().empty()) {
145 if (nc.getCategory().empty()) {
146 // we need to add in a category
147 std::string cat;
148 if (currentCore->commitToRead) {
149 cat = "-";
150 } else if (currentCore->commitToWrite) {
151 cat = "+";
152 }
153 if (!cat.empty()) {
154 if (currentCore->commitToRpc) {
155 cat += "1";
156 }
157 contact2.setName(nc.getNestedName() + cat + "@" + nc.getNodeName());
158 } else {
159 yCError(PORT, "Error: Port '%s' is not committed to being either an input or output port.", n.c_str());
160 yCError(PORT, "YARP does not mind, but we are trying to register with a name server that does.");
161 yCError(PORT, "You can call Port::setWriteOnly() or Port::setReadOnly(), OR rename the port.");
163 nc2.setCategoryWrite();
164 yCError(PORT, "For an output port, call it: %s (+ adds data)", nc2.toString().c_str());
165 nc2.setCategoryRead();
166 yCError(PORT, "For an input port, call it: %s (- takes data)", nc2.toString().c_str());
167 return false;
168 }
169 }
170 }
171 }
172
173 // Allow for open() to be called safely many times on the same Port
174 if ((currentCore != nullptr) && currentCore->isOpened()) {
175 auto* newCore = new PortCoreAdapter(*this);
176 yCAssert(PORT, newCore != nullptr);
177 // copy state that should survive in a new open()
178 if (currentCore->checkPortReader() != nullptr) {
179 newCore->configReader(*(currentCore->checkPortReader()));
180 }
181 if (currentCore->checkAdminPortReader() != nullptr) {
182 newCore->configAdminReader(*(currentCore->checkAdminPortReader()));
183 }
184 if (currentCore->checkReadCreator() != nullptr) {
185 newCore->configReadCreator(*(currentCore->checkReadCreator()));
186 }
187 if (currentCore->checkWaitAfterSend() >= 0) {
188 newCore->configWaitAfterSend(currentCore->checkWaitAfterSend() != 0);
189 }
190 if (currentCore->haveCallbackLock) {
191 newCore->configCallbackLock(currentCore->recCallbackLock);
192 }
193 close();
194 if (owned) {
195 delete (static_cast<PortCoreAdapter*>(implementation));
196 }
197 implementation = newCore;
198 owned = true;
200 currentCore->active = false;
201 }
202
204
205 core.openable();
206
207 if (NetworkBase::localNetworkAllocation() && contact2.getPort() <= 0) {
208 yCDebug(PORT, "local network allocation needed");
209 local = true;
210 }
211
212 bool success = true;
213 Contact address(contact2.getName(),
214 contact2.getCarrier(),
215 contact2.getHost(),
216 contact2.getPort());
217 address.setNestedContact(contact2.getNested());
218
219 core.setReadHandler(core);
220 if (contact2.getPort() > 0 && !contact2.getHost().empty()) {
221 registerName = false;
222 }
223
224 std::string ntyp = getType().getNameOnWire();
225 if (ntyp.empty()) {
227 nc.fromString(n);
228 if (!nc.getTypeName().empty()) {
229 ntyp = nc.getTypeName();
230 }
231 }
232 if (ntyp.empty()) {
233 ntyp = getType().getName();
234 }
235 if (!ntyp.empty()) {
237 nc.fromString(contact2.getName());
238 nc.setTypeName(ntyp);
239 contact2.setNestedContact(nc);
240 if (getType().getNameOnWire() != ntyp) {
242 }
243 }
244
245 if (registerName && !local) {
247 }
248
249 core.setControlRegistration(registerName);
250 success = (address.isValid() || local) && (fakeName == nullptr);
251
252 // If we are a service client, go ahead and connect
253 if (success) {
255 nc.fromString(address.getName());
256 if (!nc.getNestedName().empty()) {
257 if (nc.getCategory() == "+1") {
258 addOutput(nc.getNestedName());
259 }
260 }
261 }
262
263 std::string blame = "invalid address";
264 if (success) {
265 success = core.listen(address, registerName);
266 blame = "address conflict";
267 if (success) {
268 success = core.start();
269 blame = "manager did not start";
270 }
271 }
272 if (success) {
273 address = core.getAddress();
274 if (registerName && local) {
275 contact2.setSocket(address.getCarrier(),
276 address.getHost(),
277 address.getPort());
278 contact2.setName(address.getRegName());
280 core.resetPortName(newName.getName());
281 address = core.getAddress();
282 } else if (core.getAddress().getRegName().empty() && !registerName) {
283 core.resetPortName(core.getAddress().toURI(false));
284 core.setName(core.getAddress().getRegName());
285 }
286
287 if (address.getRegName().empty()) {
289 "Anonymous port active at %s",
290 address.toURI().c_str());
291 } else {
293 "Port %s active at %s",
294 address.getRegName().c_str(),
295 address.toURI().c_str());
296 }
297 }
298
299 if (fakeName != nullptr) {
300 success = core.manualStart(fakeName);
301 blame = "unmanaged port failed to start";
302 }
303
304 if (!success) {
306 core.getName().c_str(),
307 "Port %s failed to activate%s%s (%s)",
308 (address.isValid() ? (address.getRegName().c_str()) : (contact2.getName().c_str())),
309 (address.isValid() ? " at " : ""),
310 (address.isValid() ? address.toURI().c_str() : ""),
311 blame.c_str());
312 }
313
314 if (success && currentCore != nullptr) {
315 currentCore->active = true;
316 }
317 return success;
318}
319
320bool Port::addOutput(const std::string& name)
321{
322 return addOutput(Contact(name));
323}
324
325bool Port::addOutput(const std::string& name, const std::string& carrier)
326{
327 return addOutput(Contact(name, carrier));
328}
329
331{
332 if (!owned) {
333 return;
334 }
335
337 core.finishReading();
338 core.finishWriting();
339 core.close();
340 core.join();
341 core.active = false;
342
343 // In fact, open flag means "ever opened", so don't reset it
344 // core.setOpened(false);
345}
346
348{
350 core.interrupt();
351}
352
354{
356 if (!core.isInterrupted()) {
357 // prevent resuming when the port is not interrupted
358 return;
359 }
360 core.resumeFull();
361}
362
363
365{
367 return core.getAddress();
368}
369
370
371bool Port::addOutput(const Contact& contact)
372{
374 if (core.commitToRead) {
375 return false;
376 }
377 if (core.isInterrupted()) {
378 return false;
379 }
380 core.alertOnWrite();
381 std::string name;
382 if (contact.getPort() <= 0) {
383 name = contact.toString();
384 } else {
385 name = contact.toURI();
386 }
387 if (!core.isListening()) {
388 return core.addOutput(name, nullptr, nullptr, true);
389 }
390 Contact me = where();
391 return NetworkBase::connect(me.getName(), name);
392}
393
394
395bool Port::write(const PortWriter& writer, const PortWriter* callback) const
396{
398 if (core.isInterrupted()) {
399 return false;
400 }
401 core.alertOnWrite();
402 bool result = false;
403 //WritableAdapter adapter(writer);
404 result = core.send(writer, nullptr, callback);
405 //writer.onCompletion();
406 if (!result) {
407 if (callback != nullptr) {
408 callback->onCompletion();
409 } else {
410 writer.onCompletion();
411 }
412 // leave result false
413 }
414 return result;
415}
416
417bool Port::write(const PortWriter& writer,
418 PortReader& reader,
419 const PortWriter* callback) const
420{
422 if (core.isInterrupted()) {
423 return false;
424 }
425 core.alertOnRpc();
426 core.alertOnWrite();
427 bool result = false;
428 result = core.send(writer, &reader, callback);
429 if (!result) {
430 if (callback != nullptr) {
431 callback->onCompletion();
432 } else {
433 writer.onCompletion();
434 }
435 // leave result false
436 }
437 return result;
438}
439
440bool Port::read(PortReader& reader, bool willReply)
441{
442 if (!isOpen()) {
443 return false;
444 }
446 if (willReply) {
447 core.alertOnRpc();
448 }
449 core.alertOnRead();
450 if (core.isInterrupted()) {
451 return false;
452 }
453 return core.read(reader, willReply);
454}
455
456
458{
460 return core.reply(writer, false, core.isInterrupted());
461}
462
464{
466 return core.reply(writer, true, core.isInterrupted());
467}
468
469
471{
473 core.alertOnRead();
474 core.configReader(reader);
475}
476
478{
480 core.configAdminReader(reader);
481}
482
483
485{
487 core.alertOnRead();
488 core.configReadCreator(creator);
489}
490
491
493{
495 core.configWaitAfterSend(!backgroundFlag);
496}
497
498
500{
502 return core.isWriting();
503}
504
505
507{
509 return core.setEnvelope(envelope);
510}
511
512
514{
516 return core.getEnvelope(envelope);
517}
518
520{
522 core.alertOnRead();
523 return core.getInputCount();
524}
525
527{
529 core.alertOnWrite();
530 return core.getOutputCount();
531}
532
534{
536 core.describe(reporter);
537}
538
539
541{
543 core.setReportCallback(&reporter);
544}
545
546
548{
550 core.resetReportCallback();
551}
552
553
555{
556 if (adminMode) {
557 Bottle b("__ADMIN");
558 setEnvelope(b);
559 } else {
560 Bottle b;
561 setEnvelope(b);
562 }
563}
564
565
566#define SET_FLAG(implementation, mask, val) \
567 IMPL().setFlags((IMPL().getFlags() & (~(mask))) + ((val) ? (mask) : 0))
568
570{
571 if (!expectInput) {
572 IMPL().setWriteOnly();
573 }
574 SET_FLAG(implementation, PORTCORE_IS_INPUT, expectInput);
575}
576
578{
579 if (!expectOutput) {
580 IMPL().setReadOnly();
581 }
582 SET_FLAG(implementation, PORTCORE_IS_OUTPUT, expectOutput);
583}
584
586{
587 if (expectRpc) {
588 IMPL().setRpc();
589 }
590 SET_FLAG(implementation, PORTCORE_IS_RPC, expectRpc);
591}
592
593bool Port::setTimeout(float timeout)
594{
595 IMPL().setTimeout(timeout);
596 return true;
597}
598
599#ifndef YARP_NO_DEPRECATED // Since YARP 3.4
600void Port::setVerbosity(int level)
601{
602 YARP_UNUSED(level);
603}
604
606{
607 return 0;
608}
609#endif
610
612{
613 return IMPL().getType();
614}
615
617{
619}
620
625
627{
628 IMPL().releaseProperties(prop);
629}
630
631bool Port::isOpen() const
632{
633 if (implementation == nullptr) {
634 return false;
635 }
636 return IMPL().active;
637}
638
639bool Port::setCallbackLock(std::mutex* mutex)
640{
641 return IMPL().configCallbackLock(mutex);
642}
643
645{
646 return IMPL().unconfigCallbackLock();
647}
648
650{
651 if (!IMPL().lockCallback()) {
654 core.getName(),
655 "Cannot do lockCallback() without setCallbackLock() before opening port");
656 }
657 return true;
658}
659
661{
662 return IMPL().tryLockCallback();
663}
664
666{
668}
void cat(Vector &a, const Vector &b)
#define PORTCORE_IS_INPUT
Definition PortCore.h:40
#define PORTCORE_IS_RPC
Definition PortCore.h:39
#define PORTCORE_IS_OUTPUT
Definition PortCore.h:41
#define IMPL()
Definition Port.cpp:43
#define SET_FLAG(implementation, mask, val)
Definition Port.cpp:566
RandScalar * implementation(void *t)
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
A mini-server for performing network communication in the background.
Type getType() override
Get the type of data the port has committed to send/receive.
void promiseType(const Type &typ) override
Commit the port to a particular type of data.
Property * acquireProperties(bool readOnly) override
Access unstructured port properties.
std::string getName() const override
Get name of port.
bool tryLockCallback() override
Try to lock callbacks until unlockCallback() is called.
bool getEnvelope(PortReader &envelope) override
Get the envelope information (e.g., a timestamp) from the last message received on the port.
void close() override
Stop port activity.
void releaseProperties(Property *prop) override
End access unstructured port properties.
bool setEnvelope(PortWriter &envelope) override
Set an envelope (e.g., a timestamp) to the next message which will be sent.
void interrupt() override
Interrupt any current reads or writes attached to the port.
int getInputCount() override
Determine how many connections are arriving into this port.
bool isWriting() override
Report whether the port is currently writing data.
T * read(bool shouldWait=true) override
Read an available object from the port.
bool addOutput(const std::string &name) override
Add an output connection to the specified port.
int getOutputCount() override
Determine how many output connections this port has.
void unlockCallback() override
Unlock callbacks.
Represents how to reach a part of a YARP network.
Definition Contact.h:33
std::string toString() const
Get a textual representation of the Contact.
Definition Contact.cpp:303
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition Contact.cpp:239
virtual std::string getName() const
Get name of port.
void setWriteOnly()
Shorthand for setInputMode(false), setOutputMode(true), setRpcMode(false)
void setReadOnly()
Shorthand for setInputMode(true), setOutputMode(false), setRpcMode(false)
A placeholder for rich contact information.
static bool initialized()
Returns true if YARP has been fully initialized.
Definition Network.cpp:1382
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition Network.cpp:682
static Contact registerContact(const Contact &contact)
Register contact information with the name server.
Definition Network.cpp:1017
static bool localNetworkAllocation()
Check where the name server in use expects processes to allocate their own network resources.
Definition Network.cpp:1912
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
A base class for objects that want information about port status changes.
Definition PortReport.h:25
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
virtual void onCompletion() const
This is called when the port has finished all writing operations.
A mini-server for network communication.
Definition Port.h:46
void setReaderCreator(PortReaderCreator &creator)
Set a creator for readers for port data.
Definition Port.cpp:484
void enableBackgroundWrite(bool backgroundFlag)
control whether writing from this port is done in the background.
Definition Port.cpp:492
int getOutputCount() override
Determine how many output connections this port has.
Definition Port.cpp:526
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition Port.cpp:395
void setReader(PortReader &reader) override
Set an external reader for port data.
Definition Port.cpp:470
Contact where() const override
Returns information about how this port can be reached.
Definition Port.cpp:364
bool getEnvelope(PortReader &envelope) override
Get the envelope information (e.g., a timestamp) from the last message received on the port.
Definition Port.cpp:513
Type getType() override
Get the type of data the port has committed to send/receive.
Definition Port.cpp:611
void setRpcMode(bool expectRpc) override
Configure the port to be RPC only.
Definition Port.cpp:585
Port()
Constructor.
Definition Port.cpp:45
bool removeCallbackLock() override
Remove a lock on callbacks added with setCallbackLock()
Definition Port.cpp:644
bool sharedOpen(Port &port)
Open a port wrapping an existing port.
Definition Port.cpp:63
void setAdminMode(bool adminMode=true)
Turn on/off "admin" mode.
Definition Port.cpp:554
void setAdminReader(PortReader &reader) override
Set an external reader for unrecognized administrative port messages.
Definition Port.cpp:477
int getVerbosity()
Get port verbosity level.
Definition Port.cpp:605
void resume() override
Put the port back in an operative state after interrupt() has been called.
Definition Port.cpp:353
bool tryLockCallback() override
Try to lock callbacks until unlockCallback() is called.
Definition Port.cpp:660
bool addOutput(const std::string &name) override
Add an output connection to the specified port.
Definition Port.cpp:320
bool read(PortReader &reader, bool willReply=false) override
Read an object from the port.
Definition Port.cpp:440
~Port() override
Destructor.
Definition Port.cpp:51
bool replyAndDrop(PortWriter &writer) override
Same as reply(), but closes connection after reply.
Definition Port.cpp:463
void setInputMode(bool expectInput) override
Configure the port to allow or forbid inputs.
Definition Port.cpp:569
int getInputCount() override
Determine how many connections are arriving into this port.
Definition Port.cpp:519
void setReporter(PortReport &reporter) override
Set a callback to be called upon any future connections and disconnections to/from the port.
Definition Port.cpp:540
void setVerbosity(int level)
Set whether the port should issue messages about its operations.
Definition Port.cpp:600
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition Port.cpp:593
void interrupt() override
Interrupt any current reads or writes attached to the port.
Definition Port.cpp:347
bool setCallbackLock(std::mutex *mutex=nullptr) override
Add a lock to use when invoking callbacks.
Definition Port.cpp:639
bool lockCallback() override
Lock callbacks until unlockCallback() is called.
Definition Port.cpp:649
bool setEnvelope(PortWriter &envelope) override
Set an envelope (e.g., a timestamp) to the next message which will be sent.
Definition Port.cpp:506
void releaseProperties(Property *prop) override
End access unstructured port properties.
Definition Port.cpp:626
void getReport(PortReport &reporter) override
Get information on the state of the port - connections etc.
Definition Port.cpp:533
bool openFake(const std::string &name)
Start port without making it accessible from the network.
Definition Port.cpp:74
void resetReporter() override
Remove the callback which is called upon any future connections and disconnections to/from the port.
Definition Port.cpp:547
void close() override
Stop port activity.
Definition Port.cpp:330
void promiseType(const Type &typ) override
Commit the port to a particular type of data.
Definition Port.cpp:616
bool reply(PortWriter &writer) override
Send an object as a reply to an object read from the port.
Definition Port.cpp:457
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition Port.cpp:79
bool isWriting() override
Report whether the port is currently writing data.
Definition Port.cpp:499
Property * acquireProperties(bool readOnly) override
Access unstructured port properties.
Definition Port.cpp:621
void setOutputMode(bool expectOutput) override
Configure the port to allow or forbid outputs.
Definition Port.cpp:577
bool isOpen() const
Check if the port has been opened.
Definition Port.cpp:631
void unlockCallback() override
Unlock callbacks.
Definition Port.cpp:665
A class for storing options and configuration information.
Definition Property.h:33
static Type byNameOnWire(const char *name_on_wire)
Definition Type.cpp:186
std::string getNameOnWire() const
Definition Type.cpp:144
std::string getName() const
Definition Type.cpp:139
Small helper class to help deal with legacy YARP configuration files.
Definition NameConfig.h:23
std::string getSafeString(const std::string &txt)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCIError(component, id,...)
#define yCDebug(component,...)
#define yCIInfo(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition environment.h:66
bool isValid()
Check if time is valid (non-zero).
Definition Time.cpp:307
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int rename(const char *oldname, const char *newname)
Portable wrapper for the rename() function.
Definition Os.cpp:73
#define YARP_UNUSED(var)
Definition api.h:162
#define PORT