YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
Node.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include <yarp/os/Node.h>
7
10
12#include <yarp/os/Network.h>
13#include <yarp/os/Os.h>
14#include <yarp/os/Port.h>
15#include <yarp/os/PortInfo.h>
16#include <yarp/os/PortReport.h>
18#include <yarp/os/Type.h>
21
22#include <algorithm>
23#include <cstdlib>
24#include <list>
25#include <map>
26#include <vector>
27#include <mutex>
28
29using namespace yarp::os;
30using namespace yarp::os::impl;
31
32namespace {
33YARP_OS_LOG_COMPONENT(NODE, "yarp.os.Node")
34}
35
36
37class ROSReport : public PortReport
38{
39public:
40 std::multimap<std::string, std::string> outgoingURIs;
41 std::multimap<std::string, std::string> incomingURIs;
42
43 ROSReport() = default;
44
45 void report(const PortInfo& info) override
46 {
49 Contact c;
50 if (info.incoming) {
51 c = RosNameSpace::rosify(nic.queryName(info.sourceName));
52 incomingURIs.insert(std::make_pair(info.portName, c.toURI()));
53 } else {
54 c = RosNameSpace::rosify(nic.queryName(info.targetName));
55 outgoingURIs.insert(std::make_pair(info.portName, c.toURI()));
56 }
57 }
58 }
59};
60
61static std::string toRosName(const std::string& str)
62{
63 return RosNameSpace::toRosName(str);
64}
65
66static std::string fromRosName(const std::string& str)
67{
68 return RosNameSpace::fromRosName(str);
69}
70
72{
73public:
76
77 void update()
78 {
79 if (nc.getTypeName().empty()) {
80 if (contactable == nullptr) {
81 return;
82 }
84 if (typ.isValid()) {
86 }
87 }
88 }
89
91 {
92 std::string cat = nc.getCategory();
93 return (cat.empty() || cat == "-");
94 }
95
97 {
98 std::string cat = nc.getCategory();
99 return (cat.empty() || cat == "+");
100 }
101
102 bool isTopic()
103 {
104 std::string cat = nc.getCategory();
105 return (cat.empty() || cat == "+" || cat == "-");
106 }
107
109 {
110 std::string cat = nc.getCategory();
111 return (cat.empty() || cat == "-1");
112 }
113
115 {
116 std::string cat = nc.getCategory();
117 return (cat.empty() || cat == "+1");
118 }
119
121 {
122 std::string cat = nc.getCategory();
123 return (cat.empty() || cat == "+1" || cat == "-1");
124 }
125};
126
128{
129public:
133 int code;
134 std::string msg;
136
138 {
139 code = -1;
140 should_drop = true;
141 }
142
143 void error(const char* txt)
144 {
145 msg = txt;
146 code = -1;
147 }
148
149 void fail(const char* txt)
150 {
151 msg = txt;
152 code = 0;
153 }
154
155 void success()
156 {
157 msg = "";
158 code = 1;
159 }
160
161 void drop()
162 {
163 should_drop = true;
164 }
165
166 void persist()
167 {
168 should_drop = false;
169 }
170
172 {
173 code = alt.get(0).asInt32();
174 msg = alt.get(1).asString();
175 reply = alt.get(2);
176 }
177};
178
180{
181public:
182 std::multimap<std::string, NodeItem> by_part_name;
183 std::multimap<std::string, NodeItem> by_category;
184 std::map<Contactable*, NodeItem> name_cache;
186 Node* owner{nullptr};
187
188 std::mutex mutex;
189 std::string name;
190 std::string prev_name;
191 bool has_prev_name{false};
192
194 {
195 clear();
196 port.includeNodeInName(false);
197 }
198
199 ~Helper() override
200 {
201 clear();
202 port.close();
203 }
204
205 void clear()
206 {
207 if (!mutex.try_lock()) {
208 return;
209 }
210 while (name_cache.begin() != name_cache.end()) {
211 Contactable* c = name_cache.begin()->first;
212 if (c != nullptr) {
213 mutex.unlock();
214 c->interrupt();
215 c->close();
216 mutex.lock();
217 // Close will remove the Contactable from the map only the first
218 // time that a node is found (for example if "/foo+@/node" and
219 // "/foo-@node" are registered, only the first time that "/node"
220 // is found it is removed automatically. The second time it must
221 // be removed manually.
222 if (!name_cache.empty() && name_cache.begin()->first == c) {
223 name_cache.erase(name_cache.begin());
224 }
225 }
226 }
227 mutex.unlock();
228 port.interrupt();
229 }
230
231 void add(Contactable& contactable);
232 void update(Contactable& contactable);
233 void remove(Contactable& contactable);
234 std::vector<Contact> query(const std::string& name, const std::string& category = std::string());
235
236 void prepare(const std::string& name);
237
239 {
240 port.interrupt();
241 }
242
243 bool read(ConnectionReader& reader) override;
244
246 {
247 na.reply = Value();
248 na.success();
249 }
250
252 {
253 unsigned int opaque_id = 1;
254 ROSReport report;
255 Value v;
256 Bottle* connections = v.asList();
257
258 mutex.lock();
259 for (auto& it : by_part_name) {
260 NodeItem& item = it.second;
261 if (!(item.isSubscriber() || item.isPublisher())) {
262 continue;
263 }
264 item.update();
265 item.contactable->getReport(report);
266 }
267 mutex.unlock();
268
269 for (std::multimap<std::string, std::string>::const_iterator it = report.outgoingURIs.begin(); it != report.outgoingURIs.end(); ++it) {
270 Bottle& lst = connections->addList();
271 lst.addInt32(opaque_id); // connectionId
272 lst.addString(it->second);
273 lst.addString("o");
274 lst.addString("TCPROS");
275 NestedContact nc(it->first);
277 opaque_id++;
278 }
279
280 for (std::multimap<std::string, std::string>::const_iterator it = report.incomingURIs.begin(); it != report.incomingURIs.end(); ++it) {
281 Bottle& lst = connections->addList();
282 lst.addInt32(opaque_id); // connectionId
283 lst.addString(it->second);
284 lst.addString("i");
285 lst.addString("TCPROS");
286 NestedContact nc(it->first);
288 opaque_id++;
289 }
290
291 if (connections->size() == 0) {
292 connections->addList(); // add empty list
293 }
294
295 na.reply = v;
296 na.success();
297 }
298
300 {
301 na.reply = Value(yarp::conf::environment::get_string("ROS_MASTER_URI"));
302 na.success();
303 }
304
306 {
307 na.reply = Value(std::string("hmm"));
308 }
309
311 {
312 na.reply = Value(static_cast<int>(yarp::os::getpid()));
313 na.success();
314 }
315
317 {
318 Value v;
320 mutex.lock();
321 for (auto& it : by_part_name) {
322 NodeItem& item = it.second;
323 if (!item.isSubscriber()) {
324 continue;
325 }
326 item.update();
327 Bottle& lst = subscriptions->addList();
328 lst.addString(toRosName(item.nc.getNestedName()));
329 lst.addString(item.nc.getTypeName());
330 }
331 mutex.unlock();
332 na.reply = v;
333 na.success();
334 }
335
337 {
338 Value v;
340 mutex.lock();
341 for (auto& it : by_part_name) {
342 NodeItem& item = it.second;
343 if (!item.isPublisher()) {
344 continue;
345 }
346 item.update();
347 Bottle& lst = publications->addList();
348 lst.addString(toRosName(item.nc.getNestedName()));
349 lst.addString(item.nc.getTypeName());
350 }
351 mutex.unlock();
352 na.reply = v;
353 na.success();
354 }
355
357 {
358 na.reply = Value(std::string("hmm"));
359 }
360
362 {
363 std::string topic = fromRosName(na.args.get(0).asString());
364 std::vector<Contact> contacts = query(topic, "-");
365 if (contacts.empty()) {
366 na.fail("Cannot find topic");
367 return;
368 }
369
370 for (auto& c : contacts) {
371 if (!c.isValid()) {
372 continue;
373 }
374 c.setName("");
375 // just pass the message along, YARP ports know what to do with it
376 ContactStyle style;
377 style.admin = true;
378 style.carrier = "tcp";
379 Bottle reply;
380 if (!NetworkBase::write(c, na.request, reply, style)) {
381 continue;
382 }
383 na.fromExternal(reply);
384 }
385 }
386
388 {
389 std::string topic = na.args.get(0).asString();
390 topic = fromRosName(topic);
391 std::vector<Contact> contacts = query(topic, "+");
392 if (contacts.empty()) {
393 na.fail("Cannot find topic");
394 return;
395 }
396 for (auto& c : contacts) {
397 if (!c.isValid()) {
398 continue;
399 }
400 Value v;
401 Bottle* lst = v.asList();
402 lst->addString("TCPROS");
403 lst->addString(c.getHost());
404 lst->addInt32(c.getPort());
405 na.reply = v;
406 na.success();
407 return;
408 }
409 na.fail("Cannot find topic");
410 }
411};
412
413void yarp::os::Node::Helper::prepare(const std::string& name)
414{
415 mutex.lock();
416 if (port.getName().empty()) {
417 port.setReader(*this);
418 Property* prop = port.acquireProperties(false);
419 if (prop != nullptr) {
420 prop->put("node_like", 1);
421 }
423 port.open(name);
424 this->name = port.getName();
425 }
426 mutex.unlock();
427}
428
430{
432 item.nc.fromString(contactable.getName());
433 if (name.empty()) {
434 name = item.nc.getNodeName();
435 }
436 if (name != item.nc.getNodeName()) {
437 yCError(NODE, "Node name mismatch, expected [%s] but got [%s]\n", name.c_str(), item.nc.getNodeName().c_str());
438 return;
439 }
440 prepare(name);
441 item.contactable = &contactable;
442
443 mutex.lock();
444 name_cache[&contactable] = item;
445 by_part_name.insert(std::pair<std::string, NodeItem>(item.nc.getNestedName(), item));
446 by_category.insert(std::pair<std::string, NodeItem>(item.nc.getCategory(), item));
447 mutex.unlock();
448}
449
451{
452 mutex.lock();
453 NodeItem item = name_cache[&contactable];
454 mutex.unlock();
455}
456
458{
459 mutex.lock();
460 NodeItem item = name_cache[&contactable];
461 name_cache.erase(&contactable);
462 std::string nestedName = item.nc.getNestedName();
463 for (auto it = by_part_name.begin(); it != by_part_name.end(); ++it) {
464 if (it->first == nestedName && it->second.contactable->where().toString() == contactable.where().toString()) {
465 by_part_name.erase(it);
466 break;
467 }
468 }
469 std::string category = item.nc.getCategory();
470 for (auto it = by_category.begin(); it != by_category.end(); ++it) {
471 if (it->first == category && it->second.contactable->where().toString() == contactable.where().toString()) {
472 by_category.erase(it);
473 break;
474 }
475 }
476 mutex.unlock();
477}
478
479std::vector<Contact> yarp::os::Node::Helper::query(const std::string& name, const std::string& category)
480{
481 std::vector<Contact> contacts;
482 mutex.lock();
483 for (std::multimap<std::string, NodeItem>::const_iterator it = by_part_name.begin(); it != by_part_name.end(); ++it) {
484 if (it->first == name && (category.empty() || category == it->second.nc.getCategory())) {
485 contacts.emplace_back(it->second.contactable->where());
486 }
487 }
488 mutex.unlock();
489
490 return contacts;
491}
492
494{
495 if (!reader.isValid()) {
496 return false;
497 }
498 NodeArgs na;
499 na.request.read(reader);
500 yCDebug(NODE, "NODE API for %s received %s\n", name.c_str(), na.request.toString().c_str());
501 std::string key = na.request.get(0).asString();
502 na.args = na.request.tail().tail();
503 if (key == "getBusStats") {
504 getBusStats(na);
505 } else if (key == "getBusInfo") {
506 getBusInfo(na);
507 } else if (key == "getMasterUri") {
508 getMasterUri(na);
509 } else if (key == "shutdown") {
510 shutdown(na);
511 } else if (key == "getPid") {
512 getPid(na);
513 } else if (key == "getSubscriptions") {
514 getSubscriptions(na);
515 } else if (key == "getPublications") {
516 getPublications(na);
517 } else if (key == "paramUpdate") {
518 paramUpdate(na);
519 } else if (key == "publisherUpdate") {
520 publisherUpdate(na);
521 } else if (key == "requestTopic") {
522 requestTopic(na);
523 } else {
524 na.error("I have no idea what you are talking about");
525 }
526 if (na.should_drop) {
527 reader.requestDrop(); // ROS likes to close down.
528 }
529 if (reader.getWriter() != nullptr) {
530 Bottle full;
531 full.addInt32(na.code);
532 full.addString(na.msg);
533 full.add(na.reply);
534 yCDebug(NODE, "NODE %s <<< %s\n", name.c_str(), full.toString().c_str());
535 full.write(*reader.getWriter());
536 }
537 return true;
538}
539
540
542 mPriv(new Helper)
543{
544 yCAssert(NODE, mPriv != nullptr);
545 mPriv->owner = this;
546}
547
548Node::Node(const std::string& name) :
549 mPriv(new Helper)
550{
551 yCAssert(NODE, mPriv != nullptr);
552 mPriv->owner = this;
553 Nodes& nodes = NameClient::getNameClient().getNodes();
554 mPriv->prev_name = nodes.getActiveName();
555 mPriv->has_prev_name = true;
556 mPriv->name = name;
557 prepare(name);
558 std::string rname = mPriv->port.getName();
559 nodes.addExternalNode(rname, *this);
560 nodes.setActiveName(rname);
561}
562
564{
565 if (mPriv->has_prev_name) {
566 Nodes& nodes = NameClient::getNameClient().getNodes();
567 nodes.setActiveName(mPriv->prev_name);
568 nodes.removeExternalNode(mPriv->name);
569 }
570 delete mPriv;
571}
572
573void Node::add(Contactable& contactable)
574{
575 mPriv->add(contactable);
576}
577
578void Node::update(Contactable& contactable)
579{
580 mPriv->update(contactable);
581}
582
583void Node::remove(Contactable& contactable)
584{
585 mPriv->remove(contactable);
586}
587
588Contact Node::query(const std::string& name, const std::string& category)
589{
590 std::vector<Contact> contacts = mPriv->query(name, category);
591 if (!contacts.empty()) {
592 return contacts.at(0);
593 }
594 return Contact();
595}
596
598{
599 mPriv->interrupt();
600}
601
603{
604 return mPriv->port.where();
605}
606
607void Node::prepare(const std::string& name)
608{
609 mPriv->prepare(name);
610}
void cat(Vector &a, const Vector &b)
static std::string fromRosName(const std::string &str)
Definition Node.cpp:66
static std::string toRosName(const std::string &str)
Definition Node.cpp:61
Bottle request
Definition Node.cpp:130
bool should_drop
Definition Node.cpp:135
NodeArgs()
Definition Node.cpp:137
Value reply
Definition Node.cpp:132
void fail(const char *txt)
Definition Node.cpp:149
void persist()
Definition Node.cpp:166
std::string msg
Definition Node.cpp:134
int code
Definition Node.cpp:133
Bottle args
Definition Node.cpp:131
void drop()
Definition Node.cpp:161
void error(const char *txt)
Definition Node.cpp:143
void success()
Definition Node.cpp:155
void fromExternal(const Bottle &alt)
Definition Node.cpp:171
void update()
Definition Node.cpp:77
bool isServiceClient()
Definition Node.cpp:114
bool isServiceServer()
Definition Node.cpp:108
Contactable * contactable
Definition Node.cpp:75
bool isSubscriber()
Definition Node.cpp:90
bool isTopic()
Definition Node.cpp:102
bool isService()
Definition Node.cpp:120
bool isPublisher()
Definition Node.cpp:96
NestedContact nc
Definition Node.cpp:74
std::multimap< std::string, std::string > outgoingURIs
Definition Node.cpp:40
std::multimap< std::string, std::string > incomingURIs
Definition Node.cpp:41
ROSReport()=default
void report(const PortInfo &info) override
Callback for port event/state information.
Definition Node.cpp:45
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition Bottle.cpp:182
size_type size() const
Gets the number of elements in the bottle.
Definition Bottle.cpp:251
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition Bottle.cpp:170
A mini-server for performing network communication in the background.
Contact where() const override
Returns information about how this port can be reached.
std::string getName() const override
Get name of port.
void close() override
Stop port activity.
void interrupt() override
Interrupt any current reads or writes attached to the port.
void getReport(PortReport &reporter) override
Get information on the state of the port - connections etc.
T * read(bool shouldWait=true) override
Read an available object from the port.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual bool isValid() const =0
Preferences for how to communicate with a contact.
std::string carrier
Request that communication be made using a particular carrier.
bool admin
Ask recipient to treat message as administrative.
Represents how to reach a part of a YARP network.
Definition Contact.h:33
std::string toString() const
Get a textual representation of the Contact.
Definition Contact.cpp:303
An abstract port.
Definition Contactable.h:28
virtual Type getType()=0
Get the type of data the port has committed to send/receive.
virtual std::string getName() const
Get name of port.
virtual Contact where() const =0
Returns information about how this port can be reached.
A placeholder for rich contact information.
std::string getNestedName() const
void setTypeName(const std::string &nWireType)
std::string getTypeName() const
std::string getCategory() const
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition Network.cpp:1219
void update(Contactable &contactable)
Definition Node.cpp:450
std::vector< Contact > query(const std::string &name, const std::string &category=std::string())
Definition Node.cpp:479
void getPid(NodeArgs &na)
Definition Node.cpp:310
std::multimap< std::string, NodeItem > by_part_name
Definition Node.cpp:182
void shutdown(NodeArgs &na)
Definition Node.cpp:305
void prepare(const std::string &name)
Definition Node.cpp:413
void publisherUpdate(NodeArgs &na)
Definition Node.cpp:361
void getBusInfo(NodeArgs &na)
Definition Node.cpp:251
std::string name
Definition Node.cpp:189
std::map< Contactable *, NodeItem > name_cache
Definition Node.cpp:184
std::string prev_name
Definition Node.cpp:190
void paramUpdate(NodeArgs &na)
Definition Node.cpp:356
void getSubscriptions(NodeArgs &na)
Definition Node.cpp:316
void add(Contactable &contactable)
Definition Node.cpp:429
~Helper() override
Definition Node.cpp:199
std::multimap< std::string, NodeItem > by_category
Definition Node.cpp:183
bool read(ConnectionReader &reader) override
Read this object from a network connection.
Definition Node.cpp:493
void requestTopic(NodeArgs &na)
Definition Node.cpp:387
void getBusStats(NodeArgs &na)
Definition Node.cpp:245
void remove(Contactable &contactable)
Definition Node.cpp:457
void getPublications(NodeArgs &na)
Definition Node.cpp:336
void getMasterUri(NodeArgs &na)
Definition Node.cpp:299
The Node class.
Definition Node.h:23
void prepare(const std::string &name)
prepare if it is not already been done, opens the port of the Node.
Definition Node.cpp:607
void update(Contactable &contactable)
update should update the contactable with new information.
Definition Node.cpp:578
void remove(Contactable &contactable) override
remove specified contactable from the list of contactables associated with this Node.
Definition Node.cpp:583
virtual Contact query(const std::string &name, const std::string &category="") override
query the Node to obtain Contact information about a nested port associated with this Node.
Definition Node.cpp:588
virtual ~Node()
Definition Node.cpp:563
void add(Contactable &contactable) override
add a contactable to this node.
Definition Node.cpp:573
Contact where()
where getter fot information about the port of the Node.
Definition Node.cpp:602
void interrupt()
interrupt delegates the call to the Node port interrupt.
Definition Node.cpp:597
The Nodes class.
Definition Nodes.h:29
void removeExternalNode(const std::string &name)
removeExternalNode erase the node from the container.
Definition Nodes.cpp:340
void addExternalNode(const std::string &name, Node &node)
addExternalNode adds a Node to this container.
Definition Nodes.cpp:335
std::string getActiveName()
getActiveName getter for the currently active node's name
Definition Nodes.cpp:325
void setActiveName(const std::string &name)
setActiveName setter for the currently active node
Definition Nodes.cpp:320
Information about a port connection or event.
Definition PortInfo.h:25
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition PortInfo.h:39
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
A base class for objects that want information about port status changes.
Definition PortReport.h:25
A mini-server for network communication.
Definition Port.h:46
void setReader(PortReader &reader) override
Set an external reader for port data.
Definition Port.cpp:511
Contact where() const override
Returns information about how this port can be reached.
Definition Port.cpp:405
void interrupt() override
Interrupt any current reads or writes attached to the port.
Definition Port.cpp:383
void includeNodeInName(bool flag) override
Choose whether to prepend a node name (if one is available) to the port's name.
Definition Port.cpp:672
void releaseProperties(Property *prop) override
End access unstructured port properties.
Definition Port.cpp:667
void close() override
Stop port activity.
Definition Port.cpp:363
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition Port.cpp:79
Property * acquireProperties(bool readOnly) override
Access unstructured port properties.
Definition Port.cpp:662
A class for storing options and configuration information.
Definition Property.h:33
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition Property.cpp:987
static Contact rosify(const Contact &contact)
static std::string fromRosName(const std::string &name)
static std::string toRosName(const std::string &name)
Possible ROS names are a subset of YARP names.
A single value (typically within a Bottle).
Definition Value.h:43
virtual Bottle * asList() const
Get list value.
Definition Value.cpp:240
Client for YARP name server.
Definition NameClient.h:33
static NameClient & getNameClient()
Get an instance of the name client.
#define yCError(component,...)
#define yCAssert(component, x)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition environment.h:66
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition Os.cpp:84