YARP
Yet Another Robot Platform
PortCore.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include <yarp/os/impl/PortCore.h>
11 
12 #include <yarp/conf/environment.h>
13 
14 #include <yarp/os/Bottle.h>
15 #include <yarp/os/DummyConnector.h>
16 #include <yarp/os/InputProtocol.h>
17 #include <yarp/os/Name.h>
18 #include <yarp/os/Network.h>
19 #include <yarp/os/PortInfo.h>
20 #include <yarp/os/RosNameSpace.h>
22 #include <yarp/os/SystemInfo.h>
23 #include <yarp/os/Time.h>
31 
32 #include <cstdio>
33 #include <functional>
34 #include <random>
35 #include <regex>
36 #include <vector>
37 
38 #ifdef YARP_HAS_ACE
39 # include <ace/INET_Addr.h>
40 # include <ace/Sched_Params.h>
41 // In one the ACE headers there is a definition of "main" for WIN32
42 # ifdef main
43 # undef main
44 # endif
45 #endif
46 
47 #if defined(__linux__) // used for configuring scheduling properties
48 # include <dirent.h>
49 # include <sys/types.h>
50 # include <unistd.h>
51 #endif
52 
53 
54 using namespace yarp::os::impl;
55 using namespace yarp::os;
56 using namespace yarp;
57 
58 namespace {
59 YARP_OS_LOG_COMPONENT(PORTCORE, "yarp.os.impl.PortCore")
60 } // namespace
61 
63  m_stateSemaphore(1),
64  m_packetMutex(),
65  m_connectionChangeSemaphore(1),
66  m_face(nullptr),
67  m_reader(nullptr),
68  m_adminReader(nullptr),
69  m_readableCreator(nullptr),
70  m_eventReporter(nullptr),
71  m_listening(false),
72  m_running(false),
73  m_starting(false),
74  m_closing(false),
75  m_finished(false),
76  m_finishing(false),
77  m_waitBeforeSend(true),
78  m_waitAfterSend(true),
79  m_controlRegistration(true),
80  m_interruptible(true),
81  m_interrupted(false),
82  m_manual(false),
83  m_events(0),
84  m_connectionListeners(0),
85  m_inputCount(0),
86  m_outputCount(0),
87  m_dataOutputCount(0),
89  m_logNeeded(false),
90  m_timeout(-1),
91  m_counter(1),
92  m_prop(nullptr),
93  m_contactable(nullptr),
94  m_mutex(nullptr),
95 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
96  m_old_mutex(nullptr),
97 #endif // YARP_NO_DEPRECATED
98  m_mutexOwned(false),
99  m_envelopeWriter(true),
100  m_typeMutex(),
101  m_checkedType(false)
102 {
103 }
104 
106 {
107  close();
109 }
110 
111 
112 bool PortCore::listen(const Contact& address, bool shouldAnnounce)
113 {
114  // If we're using ACE, we really need to have it initialized before
115  // this point.
116  if (!NetworkBase::initialized()) {
117  yCError(PORTCORE, "YARP not initialized; create a yarp::os::Network object before using ports");
118  return false;
119  }
120 
121  yCTrace(PORTCORE, "listen");
122 
123  m_stateSemaphore.wait();
124 
125  // This method assumes we are not already on the network.
126  // We can assume this because it is not a user-facing class,
127  // and we carefully never call this method again without
128  // calling close().
129  yCAssert(PORTCORE, !m_listening);
130  yCAssert(PORTCORE, !m_running);
131  yCAssert(PORTCORE, !m_closing);
132  yCAssert(PORTCORE, !m_finished);
133  yCAssert(PORTCORE, m_face == nullptr);
134 
135  // Try to put the port on the network, using the user-supplied
136  // address (which may be incomplete). You can think of
137  // this as getting a server socket.
138  m_address = address;
139  setName(address.getRegName());
140  if (m_timeout > 0) {
141  m_address.setTimeout(m_timeout);
142  }
143  m_face = Carriers::listen(m_address);
144 
145  // We failed, abort.
146  if (m_face == nullptr) {
147  m_stateSemaphore.post();
148  return false;
149  }
150 
151  // Update our address if it was incomplete.
152  if (m_address.getPort() <= 0) {
153  m_address = m_face->getLocalAddress();
154  if (m_address.getRegName() == "...") {
155  m_address.setName(std::string("/") + m_address.getHost() + "_" + NetType::toString(m_address.getPort()));
156  setName(m_address.getRegName());
157  }
158  }
159 
160  // Move into listening phase
161  m_listening = true;
162  m_stateSemaphore.post();
163 
164  // Now that we are on the network, we can let the name server know this.
165  if (shouldAnnounce) {
166  if (!(NetworkBase::getLocalMode() && NetworkBase::getQueryBypass() == nullptr)) {
167  std::string portName = address.getRegName();
168  Bottle cmd;
169  Bottle reply;
170  cmd.addString("announce");
171  cmd.addString(portName);
172  ContactStyle style;
173  NetworkBase::writeToNameServer(cmd, reply, style);
174  }
175  }
176 
177  // Success!
178  return true;
179 }
180 
181 
183 {
184  // Don't even try to do this when the port is hot, it'll burn you
185  yCAssert(PORTCORE, !m_running);
186  yCAssert(PORTCORE, m_reader == nullptr);
187  m_reader = &reader;
188 }
189 
191 {
192  // Don't even try to do this when the port is hot, it'll burn you
193  yCAssert(PORTCORE, !m_running);
194  yCAssert(PORTCORE, m_adminReader == nullptr);
195  m_adminReader = &reader;
196 }
197 
199 {
200  // Don't even try to do this when the port is hot, it'll burn you
201  yCAssert(PORTCORE, !m_running);
202  yCAssert(PORTCORE, m_readableCreator == nullptr);
203  m_readableCreator = &creator;
204 }
205 
206 
208 {
209  yCTrace(PORTCORE, "run");
210 
211  // This is the server thread for the port. We listen on
212  // the network and handle any incoming connections.
213  // We don't touch those connections, just shove them
214  // in a list and move on. It is important that this
215  // thread doesn't make a connecting client wait just
216  // because some other client is slow.
217 
218  // We assume that listen() has succeeded and that
219  // start() has been called.
220  yCAssert(PORTCORE, m_listening);
221  yCAssert(PORTCORE, !m_running);
222  yCAssert(PORTCORE, !m_closing);
223  yCAssert(PORTCORE, !m_finished);
224  yCAssert(PORTCORE, m_starting);
225 
226  // Enter running phase
227  m_running = true;
228  m_starting = false;
229 
230  // This post is matched with a wait in start()
231  m_stateSemaphore.post();
232 
233  yCTrace(PORTCORE, "run running");
234 
235  // Enter main loop, where we block on incoming connections.
236  // The loop is exited when PortCore#closing is set. One last
237  // connection will be needed to de-block this thread and ensure
238  // that it checks PortCore#closing.
239  bool shouldStop = false;
240  while (!shouldStop) {
241 
242  // Block and wait for a connection
243  InputProtocol* ip = m_face->read();
244 
245  m_stateSemaphore.wait();
246 
247  // Attach the connection to this port and update its timeout setting
248  if (ip != nullptr) {
249  ip->attachPort(m_contactable);
250  yCDebug(PORTCORE, "received something");
251  if (m_timeout > 0) {
252  ip->setTimeout(m_timeout);
253  }
254  }
255 
256  // Check whether we should shut down
257  shouldStop |= m_closing;
258 
259  // Increment a global count of connection events
260  m_events++;
261 
262  m_stateSemaphore.post();
263 
264  // It we are not shutting down, spin off the connection.
265  // It starts life as an input connection (although it
266  // may later morph into an output).
267  if (!shouldStop) {
268  if (ip != nullptr) {
269  addInput(ip);
270  }
271  yCDebug(PORTCORE, "spun off a connection");
272  ip = nullptr;
273  }
274 
275  // If the connection wasn't spun off, just shut it down.
276  if (ip != nullptr) {
277  ip->close();
278  delete ip;
279  ip = nullptr;
280  }
281 
282  // Remove any defunct connections.
283  reapUnits();
284 
285  // Notify anyone listening for connection changes.
286  // This should be using a condition variable once we have them,
287  // this is not solid TODO
288  m_stateSemaphore.wait();
289  for (int i = 0; i < m_connectionListeners; i++) {
290  m_connectionChangeSemaphore.post();
291  }
292  m_connectionListeners = 0;
293  m_stateSemaphore.post();
294  }
295 
296  yCTrace(PORTCORE, "run closing");
297 
298  // The server thread is shutting down.
299  m_stateSemaphore.wait();
300  for (int i = 0; i < m_connectionListeners; i++) {
301  m_connectionChangeSemaphore.post();
302  }
303  m_connectionListeners = 0;
304  m_finished = true;
305  m_stateSemaphore.post();
306 }
307 
308 
310 {
311  closeMain();
312 
313  if (m_prop != nullptr) {
314  delete m_prop;
315  m_prop = nullptr;
316  }
317  m_modifier.releaseOutModifier();
318  m_modifier.releaseInModifier();
319 }
320 
321 
323 {
324  yCTrace(PORTCORE, "start");
325 
326  // This wait will, on success, be matched by a post in run()
327  m_stateSemaphore.wait();
328 
329  // We assume that listen() has been called.
330  yCAssert(PORTCORE, m_listening);
331  yCAssert(PORTCORE, !m_running);
332  yCAssert(PORTCORE, !m_starting);
333  yCAssert(PORTCORE, !m_finished);
334  yCAssert(PORTCORE, !m_closing);
335  m_starting = true;
336 
337  // Start the server thread.
338  bool started = ThreadImpl::start();
339  if (!started) {
340  // run() won't be happening
341  m_stateSemaphore.post();
342  } else {
343  // run() will signal stateSema once it is active
344  m_stateSemaphore.wait();
345  yCAssert(PORTCORE, m_running);
346 
347  // release stateSema for its normal task of controlling access to state
348  m_stateSemaphore.post();
349  }
350  return started;
351 }
352 
353 
354 bool PortCore::manualStart(const char* sourceName)
355 {
356  // This variant of start() does not create a server thread.
357  // That is appropriate if we never expect to receive incoming
358  // connections for any reason. No incoming data, no requests
359  // for state information, no requests to change connections,
360  // nothing. We set the port's name to something fake, and
361  // act like nothing is wrong.
362  m_interruptible = false;
363  m_manual = true;
364  setName(sourceName);
365  return true;
366 }
367 
368 
370 {
371  // We are no longer interrupted.
372  m_interrupted = false;
373 }
374 
376 {
377  // This is a no-op if there is no server thread.
378  if (!m_listening) {
379  return;
380  }
381 
382  // Ignore any future incoming data
383  m_interrupted = true;
384 
385  // What about data that is already coming in?
386  // If interruptible is not currently set, no worries, the user
387  // did not or will not end up blocked on a read.
388  if (!m_interruptible) {
389  return;
390  }
391 
392  // Since interruptible is set, it is possible that the user
393  // may be blocked on a read. We send an empty message,
394  // which is reserved for giving blocking readers a chance to
395  // update their state.
396  m_stateSemaphore.wait();
397  if (m_reader != nullptr) {
398  yCDebug(PORTCORE, "sending update-state message to listener");
400  lockCallback();
401  m_reader->read(sbr);
402  unlockCallback();
403  }
404  m_stateSemaphore.post();
405 }
406 
407 
408 void PortCore::closeMain()
409 {
410  yCTrace(PORTCORE, "closeMain");
411 
412  m_stateSemaphore.wait();
413 
414  // We may not have anything to do.
415  if (m_finishing || !(m_running || m_manual)) {
416  yCTrace(PORTCORE, "closeMain - nothing to do");
417  m_stateSemaphore.post();
418  return;
419  }
420 
421  yCTrace(PORTCORE, "closeMain - Central");
422 
423  // Move into official "finishing" phase.
424  m_finishing = true;
425  yCDebug(PORTCORE, "now preparing to shut down port");
426  m_stateSemaphore.post();
427 
428  // Start disconnecting inputs. We ask the other side of the
429  // connection to do this, so it won't come as a surprise.
430  // The details of how disconnection works vary by carrier.
431  // While we are doing this, the server thread may be still running.
432  // This is because other ports may need to talk to the server
433  // to organize details of how a connection should be broken down.
434  bool done = false;
435  std::string prevName;
436  while (!done) {
437  done = true;
438  std::string removeName;
439  m_stateSemaphore.wait();
440  for (auto unit : m_units) {
441  if ((unit != nullptr) && unit->isInput() && !unit->isDoomed()) {
442  Route r = unit->getRoute();
443  std::string s = r.getFromName();
444  if (s.length() >= 1 && s[0] == '/' && s != getName() && s != prevName) {
445  removeName = s;
446  done = false;
447  break;
448  }
449  }
450  }
451  m_stateSemaphore.post();
452  if (!done) {
453  yCDebug(PORTCORE, "requesting removal of connection from %s", removeName.c_str());
454  bool result = NetworkBase::disconnect(removeName,
455  getName(),
456  true);
457  if (!result) {
459  removeName,
460  true);
461  }
462  prevName = removeName;
463  }
464  }
465 
466  // Start disconnecting outputs. We don't negotiate with the
467  // other side, we just break them down.
468  done = false;
469  while (!done) {
470  done = true;
471  Route removeRoute;
472  m_stateSemaphore.wait();
473  for (auto unit : m_units) {
474  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
475  removeRoute = unit->getRoute();
476  if (removeRoute.getFromName() == getName()) {
477  done = false;
478  break;
479  }
480  }
481  }
482  m_stateSemaphore.post();
483  if (!done) {
484  removeUnit(removeRoute, true);
485  }
486  }
487 
488  m_stateSemaphore.wait();
489  bool stopRunning = m_running;
490  m_stateSemaphore.post();
491 
492  // If the server thread is still running, we need to bring it down.
493  if (stopRunning) {
494  // Let the server thread know we no longer need its services.
495  m_stateSemaphore.wait();
496  m_closing = true;
497  m_stateSemaphore.post();
498 
499  // Wake up the server thread the only way we can, by sending
500  // a message to it. Then join it, it is done.
501  if (!m_manual) {
502  OutputProtocol* op = m_face->write(m_address);
503  if (op != nullptr) {
504  op->close();
505  delete op;
506  }
507  join();
508  }
509 
510  // We should be finished now.
511  m_stateSemaphore.wait();
512  yCAssert(PORTCORE, m_finished);
513  m_stateSemaphore.post();
514 
515  // Clean up our connection list. We couldn't do this earlier,
516  // because the server thread was running.
517  closeUnits();
518 
519  // Reset some state flags.
520  m_stateSemaphore.wait();
521  m_finished = false;
522  m_closing = false;
523  m_running = false;
524  m_stateSemaphore.post();
525  }
526 
527  // There should be no other threads at this point and we
528  // can stop listening on the network.
529  if (m_listening) {
530  yCAssert(PORTCORE, m_face != nullptr);
531  m_face->close();
532  delete m_face;
533  m_face = nullptr;
534  m_listening = false;
535  }
536 
537  // Check if the client is waiting for input. If so, wake them up
538  // with the bad news. An empty message signifies a request to
539  // check the port state.
540  if (m_reader != nullptr) {
541  yCDebug(PORTCORE, "sending end-of-port message to listener");
543  m_reader->read(sbr);
544  m_reader = nullptr;
545  }
546 
547  // We may need to unregister the port with the name server.
548  if (stopRunning) {
549  std::string name = getName();
550  if (name != std::string("")) {
551  if (m_controlRegistration) {
553  }
554  }
555  }
556 
557  // We are done with the finishing process.
558  m_finishing = false;
559 
560  // We are fresh as a daisy.
561  yCAssert(PORTCORE, !m_listening);
562  yCAssert(PORTCORE, !m_running);
563  yCAssert(PORTCORE, !m_starting);
564  yCAssert(PORTCORE, !m_closing);
565  yCAssert(PORTCORE, !m_finished);
566  yCAssert(PORTCORE, !m_finishing);
567  yCAssert(PORTCORE, m_face == nullptr);
568 }
569 
570 
572 {
573  // How many times has the server thread spun off a connection.
574  m_stateSemaphore.wait();
575  int ct = m_events;
576  m_stateSemaphore.post();
577  return ct;
578 }
579 
580 
581 void PortCore::closeUnits()
582 {
583  // Empty the PortCore#units list. This is only possible when
584  // the server thread is finished.
585  m_stateSemaphore.wait();
586  yCAssert(PORTCORE, m_finished);
587  m_stateSemaphore.post();
588 
589  // In the "finished" phase, nobody else touches the units,
590  // so we can go ahead and shut them down and delete them.
591  for (auto& i : m_units) {
592  PortCoreUnit* unit = i;
593  if (unit != nullptr) {
594  yCDebug(PORTCORE, "closing a unit");
595  unit->close();
596  yCDebug(PORTCORE, "joining a unit");
597  unit->join();
598  delete unit;
599  yCDebug(PORTCORE, "deleting a unit");
600  i = nullptr;
601  }
602  }
603 
604  // Get rid of all our nulls. Done!
605  m_units.clear();
606 }
607 
608 void PortCore::reapUnits()
609 {
610  // Connections that should be shut down get tagged as "doomed"
611  // but aren't otherwise touched until it is safe to do so.
612  m_stateSemaphore.wait();
613  if (!m_finished) {
614  for (auto unit : m_units) {
615  if ((unit != nullptr) && unit->isDoomed() && !unit->isFinished()) {
616  std::string s = unit->getRoute().toString();
617  yCDebug(PORTCORE, "Informing connection %s that it is doomed", s.c_str());
618  unit->close();
619  yCDebug(PORTCORE, "Closed connection %s", s.c_str());
620  unit->join();
621  yCDebug(PORTCORE, "Joined thread of connection %s", s.c_str());
622  }
623  }
624  }
625  m_stateSemaphore.post();
626  cleanUnits();
627 }
628 
629 void PortCore::cleanUnits(bool blocking)
630 {
631  // We will remove any connections that have finished operating from
632  // the PortCore#units list.
633 
634  // Depending on urgency, either wait for a safe time to do this
635  // or skip if unsafe.
636  if (blocking) {
637  m_stateSemaphore.wait();
638  } else {
639  blocking = m_stateSemaphore.check();
640  if (!blocking) {
641  return;
642  }
643  }
644 
645  // We will update our connection counts as a by-product.
646  int updatedInputCount = 0;
647  int updatedOutputCount = 0;
648  int updatedDataOutputCount = 0;
649  yCDebug(PORTCORE, "/ routine check of connections to this port begins");
650  if (!m_finished) {
651 
652  // First, we delete and null out any defunct entries in the list.
653  for (auto& i : m_units) {
654  PortCoreUnit* unit = i;
655  if (unit != nullptr) {
656  yCDebug(PORTCORE, "| checking connection %s %s", unit->getRoute().toString().c_str(), unit->getMode().c_str());
657  if (unit->isFinished()) {
658  std::string con = unit->getRoute().toString();
659  yCDebug(PORTCORE, "| removing connection %s", con.c_str());
660  unit->close();
661  unit->join();
662  delete unit;
663  i = nullptr;
664  yCDebug(PORTCORE, "| removed connection %s", con.c_str());
665  } else {
666  // No work to do except updating connection counts.
667  if (!unit->isDoomed()) {
668  if (unit->isOutput()) {
669  updatedOutputCount++;
670  if (unit->getMode().empty()) {
671  updatedDataOutputCount++;
672  }
673  }
674  if (unit->isInput()) {
675  if (unit->getRoute().getFromName() != "admin") {
676  updatedInputCount++;
677  }
678  }
679  }
680  }
681  }
682  }
683 
684  // Now we do some awkward shuffling (list class may be from ACE
685  // or STL, if ACE it is quite limited). We move the nulls to
686  // the end of the list ...
687  size_t rem = 0;
688  for (size_t i2 = 0; i2 < m_units.size(); i2++) {
689  if (m_units[i2] != nullptr) {
690  if (rem < i2) {
691  m_units[rem] = m_units[i2];
692  m_units[i2] = nullptr;
693  }
694  rem++;
695  }
696  }
697 
698  // ... Now we get rid of the null entries
699  for (size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
700  m_units.pop_back();
701  }
702  }
703 
704  // Finalize the connection counts.
705  m_dataOutputCount = updatedDataOutputCount;
706  m_stateSemaphore.post();
707  m_packetMutex.lock();
708  m_inputCount = updatedInputCount;
709  m_outputCount = updatedOutputCount;
710  m_packetMutex.unlock();
711  yCDebug(PORTCORE, "\\ routine check of connections to this port ends");
712 }
713 
714 
715 void PortCore::addInput(InputProtocol* ip)
716 {
717  // This method is only called by the server thread in its running phase.
718  // It wraps up a network connection as a unit and adds it to
719  // PortCore#units. The unit will have its own thread associated
720  // with it.
721 
722  yCAssert(PORTCORE, ip != nullptr);
723  m_stateSemaphore.wait();
724  PortCoreUnit* unit = new PortCoreInputUnit(*this,
725  getNextIndex(),
726  ip,
727  false);
728  yCAssert(PORTCORE, unit != nullptr);
729  unit->start();
730  m_units.push_back(unit);
731  yCTrace(PORTCORE, "there are now %zu units", m_units.size());
732  m_stateSemaphore.post();
733 }
734 
735 
737 {
738  // This method is called from threads associated with input
739  // connections.
740  // It wraps up a network connection as a unit and adds it to
741  // PortCore#units. The unit will start with its own thread
742  // associated with it, but that thread will be very short-lived
743  // if the port is not configured to do background writes.
744 
745  yCAssert(PORTCORE, op != nullptr);
746  m_stateSemaphore.wait();
747  if (!m_finished) {
748  PortCoreUnit* unit = new PortCoreOutputUnit(*this, getNextIndex(), op);
749  yCAssert(PORTCORE, unit != nullptr);
750  unit->start();
751  m_units.push_back(unit);
752  }
753  m_stateSemaphore.post();
754 }
755 
756 
757 bool PortCore::isUnit(const Route& route, int index)
758 {
759  // Check if a connection with a specified route (and optional ID) is present
760  bool needReap = false;
761  if (!m_finished) {
762  for (auto unit : m_units) {
763  if (unit != nullptr) {
764  Route alt = unit->getRoute();
765  std::string wild = "*";
766  bool ok = true;
767  if (index >= 0) {
768  ok = ok && (unit->getIndex() == index);
769  }
770  if (ok) {
771  if (route.getFromName() != wild) {
772  ok = ok && (route.getFromName() == alt.getFromName());
773  }
774  if (route.getToName() != wild) {
775  ok = ok && (route.getToName() == alt.getToName());
776  }
777  if (route.getCarrierName() != wild) {
778  ok = ok && (route.getCarrierName() == alt.getCarrierName());
779  }
780  }
781  if (ok) {
782  needReap = true;
783  break;
784  }
785  }
786  }
787  }
788  return needReap;
789 }
790 
791 
792 bool PortCore::removeUnit(const Route& route, bool synch, bool* except)
793 {
794  // This is a request to remove a connection. It could arise from any
795  // input thread.
796 
797  if (except != nullptr) {
798  yCDebug(PORTCORE, "asked to remove connection in the way of %s", route.toString().c_str());
799  *except = false;
800  } else {
801  yCDebug(PORTCORE, "asked to remove connection %s", route.toString().c_str());
802  }
803 
804  // Scan for units that match the given route, and collect their IDs.
805  // Mark them as "doomed".
806  std::vector<int> removals;
807  m_stateSemaphore.wait();
808  bool needReap = false;
809  if (!m_finished) {
810  for (auto unit : m_units) {
811  if (unit != nullptr) {
812  Route alt = unit->getRoute();
813  std::string wild = "*";
814  bool ok = true;
815  if (route.getFromName() != wild) {
816  ok = ok && (route.getFromName() == alt.getFromName());
817  }
818  if (route.getToName() != wild) {
819  ok = ok && (route.getToName() == alt.getToName());
820  }
821  if (route.getCarrierName() != wild) {
822  if (except == nullptr) {
823  ok = ok && (route.getCarrierName() == alt.getCarrierName());
824  } else {
825  if (route.getCarrierName() == alt.getCarrierName()) {
826  *except = true;
827  ok = false;
828  }
829  }
830  }
831 
832  if (ok) {
833  yCDebug(PORTCORE, "removing connection %s", alt.toString().c_str());
834  removals.push_back(unit->getIndex());
835  unit->setDoomed();
836  needReap = true;
837  }
838  }
839  }
840  }
841  m_stateSemaphore.post();
842 
843  // If we find one or more matches, we need to do some work.
844  // We've marked those matches as "doomed" so they'll get cleared
845  // up eventually, but we can speed this up by waking up the
846  // server thread.
847  if (needReap) {
848  yCDebug(PORTCORE, "one or more connections need prodding to die");
849 
850  if (m_manual) {
851  // No server thread, no problems.
852  reapUnits();
853  } else {
854  // Send a blank message to make up server thread.
855  OutputProtocol* op = m_face->write(m_address);
856  if (op != nullptr) {
857  op->close();
858  delete op;
859  }
860  yCDebug(PORTCORE, "sent message to prod connection death");
861 
862  if (synch) {
863  // Wait for connections to be cleaned up.
864  yCDebug(PORTCORE, "synchronizing with connection death");
865  bool cont = false;
866  do {
867  m_stateSemaphore.wait();
868  for (int removal : removals) {
869  cont = isUnit(route, removal);
870  if (cont) {
871  break;
872  }
873  }
874  if (cont) {
875  m_connectionListeners++;
876  }
877  m_stateSemaphore.post();
878  if (cont) {
879  m_connectionChangeSemaphore.wait();
880  }
881  } while (cont);
882  }
883  }
884  }
885  return needReap;
886 }
887 
888 
889 bool PortCore::addOutput(const std::string& dest,
890  void* id,
891  OutputStream* os,
892  bool onlyIfNeeded)
893 {
894  YARP_UNUSED(id);
895  yCDebug(PORTCORE, "asked to add output to %s", dest.c_str());
896 
897  // Buffer to store text describing outcome (successful connection,
898  // or a failure).
899  BufferedConnectionWriter bw(true);
900 
901  // Look up the address we'll be connectioning to.
902  Contact parts = Name(dest).toAddress();
903  Contact contact = NetworkBase::queryName(parts.getRegName());
904  Contact address = contact;
905 
906  // If we can't find it, say so and abort.
907  if (!address.isValid()) {
908  bw.appendLine(std::string("Do not know how to connect to ") + dest);
909  if (os != nullptr) {
910  bw.write(*os);
911  }
912  return false;
913  }
914 
915  // We clean all existing connections to the desired destination,
916  // optionally stopping if we find one with the right carrier.
917  if (onlyIfNeeded) {
918  // Remove any existing connections between source and destination
919  // with a different carrier. If we find a connection already
920  // present with the correct carrier, then we are done.
921  bool except = false;
922  removeUnit(Route(getName(), address.getRegName(), address.getCarrier()), true, &except);
923  if (except) {
924  // Connection already present.
925  yCDebug(PORTCORE, "output already present to %s", dest.c_str());
926  bw.appendLine(std::string("Desired connection already present from ") + getName() + " to " + dest);
927  if (os != nullptr) {
928  bw.write(*os);
929  }
930  return true;
931  }
932  } else {
933  // Remove any existing connections between source and destination.
934  removeUnit(Route(getName(), address.getRegName(), "*"), true);
935  }
936 
937  // Set up a named route for this connection.
938  std::string aname = address.getRegName();
939  if (aname.empty()) {
940  aname = address.toURI(false);
941  }
942  Route r(getName(),
943  aname,
944  ((!parts.getCarrier().empty()) ? parts.getCarrier() : address.getCarrier()));
945  r.setToContact(contact);
946 
947  // Check for any restrictions on the port. Perhaps it can only
948  // read, or write.
949  bool allowed = true;
950  std::string err;
951  std::string append;
952  unsigned int f = getFlags();
953  bool allow_output = (f & PORTCORE_IS_OUTPUT) != 0;
954  bool rpc = (f & PORTCORE_IS_RPC) != 0;
955  Name name(r.getCarrierName() + std::string("://test"));
956  std::string mode = name.getCarrierModifier("log");
957  bool is_log = (!mode.empty());
958  if (is_log) {
959  if (mode != "in") {
960  err = "Logger configured as log." + mode + ", but only log.in is supported";
961  allowed = false;
962  } else {
963  append = "; " + r.getFromName() + " will forward messages and replies (if any) to " + r.getToName();
964  }
965  }
966  if (!allow_output) {
967  if (!is_log) {
968  bool push = false;
970  if (c != nullptr) {
971  push = c->isPush();
972  }
973  if (push) {
974  err = "Outputs not allowed";
975  allowed = false;
976  }
977  }
978  } else if (rpc) {
979  if (m_dataOutputCount >= 1 && !is_log) {
980  err = "RPC output already connected";
981  allowed = false;
982  }
983  }
984 
985  // If we found a relevant restriction, abort.
986  if (!allowed) {
987  bw.appendLine(err);
988  if (os != nullptr) {
989  bw.write(*os);
990  }
991  return false;
992  }
993 
994  // Ok! We can go ahead and make a connection.
995  OutputProtocol* op = nullptr;
996  if (m_timeout > 0) {
997  address.setTimeout(m_timeout);
998  }
999  op = Carriers::connect(address);
1000  if (op != nullptr) {
1001  op->attachPort(m_contactable);
1002  if (m_timeout > 0) {
1003  op->setTimeout(m_timeout);
1004  }
1005 
1006  bool ok = op->open(r);
1007  if (!ok) {
1008  yCDebug(PORTCORE, "open route error");
1009  delete op;
1010  op = nullptr;
1011  }
1012  }
1013 
1014  // No connection, abort.
1015  if (op == nullptr) {
1016  bw.appendLine(std::string("Cannot connect to ") + dest);
1017  if (os != nullptr) {
1018  bw.write(*os);
1019  }
1020  return false;
1021  }
1022 
1023  // Ok, we have a connection, now add it to PortCore#units
1024  if (op->getConnection().isPush()) {
1025  // This is the normal case
1026  addOutput(op);
1027  } else {
1028  // This is the case for connections that are initiated
1029  // in the opposite direction to native YARP connections.
1030  // Native YARP has push connections, initiated by the
1031  // sender. HTTP and ROS have pull connections, initiated
1032  // by the receiver.
1033  // We invert the route, flip the protocol direction, and add.
1034  r.swapNames();
1035  op->rename(r);
1036  InputProtocol* ip = &(op->getInput());
1037  m_stateSemaphore.wait();
1038  if (!m_finished) {
1039  PortCoreUnit* unit = new PortCoreInputUnit(*this,
1040  getNextIndex(),
1041  ip,
1042  true);
1043  yCAssert(PORTCORE, unit != nullptr);
1044  unit->start();
1045  m_units.push_back(unit);
1046  }
1047  m_stateSemaphore.post();
1048  }
1049 
1050  // Communicated the good news.
1051  bw.appendLine(std::string("Added connection from ") + getName() + " to " + dest + append);
1052  if (os != nullptr) {
1053  bw.write(*os);
1054  }
1055  cleanUnits();
1056  return true;
1057 }
1058 
1059 
1060 void PortCore::removeOutput(const std::string& dest, void* id, OutputStream* os)
1061 {
1062  YARP_UNUSED(id);
1063  // All the real work done by removeUnit().
1064  BufferedConnectionWriter bw(true);
1065  if (removeUnit(Route("*", dest, "*"), true)) {
1066  bw.appendLine(std::string("Removed connection from ") + getName() + " to " + dest);
1067  } else {
1068  bw.appendLine(std::string("Could not find an outgoing connection to ") + dest);
1069  }
1070  if (os != nullptr) {
1071  bw.write(*os);
1072  }
1073  cleanUnits();
1074 }
1075 
1076 void PortCore::removeInput(const std::string& src, void* id, OutputStream* os)
1077 {
1078  YARP_UNUSED(id);
1079  // All the real work done by removeUnit().
1080  BufferedConnectionWriter bw(true);
1081  if (removeUnit(Route(src, "*", "*"), true)) {
1082  bw.appendLine(std::string("Removing connection from ") + src + " to " + getName());
1083  } else {
1084  bw.appendLine(std::string("Could not find an incoming connection from ") + src);
1085  }
1086  if (os != nullptr) {
1087  bw.write(*os);
1088  }
1089  cleanUnits();
1090 }
1091 
1093 {
1094  YARP_UNUSED(id);
1095  cleanUnits();
1096 
1097  // Buffer to store a human-readable description of the port's
1098  // state.
1099  BufferedConnectionWriter bw(true);
1100 
1101  m_stateSemaphore.wait();
1102 
1103  // Report name and address.
1104  bw.appendLine(std::string("This is ") + m_address.getRegName() + " at " + m_address.toURI());
1105 
1106  // Report outgoing connections.
1107  int oct = 0;
1108  for (auto unit : m_units) {
1109  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1110  Route route = unit->getRoute();
1111  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1112  bw.appendLine(msg);
1113  oct++;
1114  }
1115  }
1116  if (oct < 1) {
1117  bw.appendLine("There are no outgoing connections");
1118  }
1119 
1120  // Report incoming connections.
1121  int ict = 0;
1122  for (auto unit : m_units) {
1123  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1124  Route route = unit->getRoute();
1125  if (!route.getCarrierName().empty()) {
1126  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1127  bw.appendLine(msg);
1128  ict++;
1129  }
1130  }
1131  }
1132  if (ict < 1) {
1133  bw.appendLine("There are no incoming connections");
1134  }
1135 
1136  m_stateSemaphore.post();
1137 
1138  // Send description across network, or print it.
1139  if (os != nullptr) {
1140  bw.write(*os);
1141  } else {
1142  StringOutputStream sos;
1143  bw.write(sos);
1144  printf("%s\n", sos.toString().c_str());
1145  }
1146 }
1147 
1148 
1150 {
1151  cleanUnits();
1152 
1153  m_stateSemaphore.wait();
1154 
1155  // Report name and address of port.
1156  PortInfo baseInfo;
1158  std::string portName = m_address.getRegName();
1159  baseInfo.message = std::string("This is ") + portName + " at " + m_address.toURI();
1160  reporter.report(baseInfo);
1161 
1162  // Report outgoing connections.
1163  int oct = 0;
1164  for (auto unit : m_units) {
1165  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1166  Route route = unit->getRoute();
1167  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1168  PortInfo info;
1169  info.message = msg;
1171  info.incoming = false;
1172  info.portName = portName;
1173  info.sourceName = route.getFromName();
1174  info.targetName = route.getToName();
1175  info.carrierName = route.getCarrierName();
1176  reporter.report(info);
1177  oct++;
1178  }
1179  }
1180  if (oct < 1) {
1181  PortInfo info;
1183  info.message = "There are no outgoing connections";
1184  reporter.report(info);
1185  }
1186 
1187  // Report incoming connections.
1188  int ict = 0;
1189  for (auto unit : m_units) {
1190  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1191  Route route = unit->getRoute();
1192  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1193  PortInfo info;
1194  info.message = msg;
1196  info.incoming = true;
1197  info.portName = portName;
1198  info.sourceName = route.getFromName();
1199  info.targetName = route.getToName();
1200  info.carrierName = route.getCarrierName();
1201  reporter.report(info);
1202  ict++;
1203  }
1204  }
1205  if (ict < 1) {
1206  PortInfo info;
1208  info.message = "There are no incoming connections";
1209  reporter.report(info);
1210  }
1211 
1212  m_stateSemaphore.post();
1213 }
1214 
1215 
1217 {
1218  m_stateSemaphore.wait();
1219  if (reporter != nullptr) {
1220  m_eventReporter = reporter;
1221  }
1222  m_stateSemaphore.post();
1223 }
1224 
1226 {
1227  m_stateSemaphore.wait();
1228  m_eventReporter = nullptr;
1229  m_stateSemaphore.post();
1230 }
1231 
1232 void PortCore::report(const PortInfo& info)
1233 {
1234  // We are in the context of one of the input or output threads,
1235  // so our contact with the PortCore must be absolutely minimal.
1236  //
1237  // It is safe to pick up the address of the reporter if this is
1238  // kept constant over the lifetime of the input/output threads.
1239 
1240  if (m_eventReporter != nullptr) {
1241  m_eventReporter->report(info);
1242  }
1243 }
1244 
1245 
1247 {
1248  YARP_UNUSED(id);
1249  YARP_UNUSED(os);
1250  bool result = true;
1251 
1252  // We are in the context of one of the input threads,
1253  // so our contact with the PortCore must be absolutely minimal.
1254  //
1255  // It is safe to pick up the address of the reader since this is
1256  // constant over the lifetime of the input threads.
1257 
1258  if (m_reader != nullptr && !m_interrupted) {
1259  m_interruptible = false; // No mutexing; user of interrupt() has to be careful.
1260 
1261  bool haveOutputs = (m_outputCount != 0); // No mutexing, but failure modes are benign.
1262 
1263  if (m_logNeeded && haveOutputs) {
1264  // Normally, yarp doesn't pay attention to the content of
1265  // messages received by the client. Likewise, the content
1266  // of replies are not monitored. However it may sometimes
1267  // be useful to log this traffic.
1268 
1269  ConnectionRecorder recorder;
1270  recorder.init(&reader);
1271  lockCallback();
1272  result = m_reader->read(recorder);
1273  unlockCallback();
1274  recorder.fini();
1275  // send off a log of this transaction to whoever wants it
1276  sendHelper(recorder, PORTCORE_SEND_LOG);
1277  } else {
1278  // YARP is not needed as a middleman
1279  lockCallback();
1280  result = m_reader->read(reader);
1281  unlockCallback();
1282  }
1283 
1284  m_interruptible = true;
1285  } else {
1286  // Read and ignore message, there is no where to send it.
1287  yCDebug(PORTCORE, "data received, no reader for it");
1288  Bottle b;
1289  result = b.read(reader);
1290  }
1291  return result;
1292 }
1293 
1294 
1295 bool PortCore::send(const PortWriter& writer,
1296  PortReader* reader,
1297  const PortWriter* callback)
1298 {
1299  // check if there is any modifier
1300  // we need to protect this part while the modifier
1301  // plugin is loading or unloading!
1302  m_modifier.outputMutex.lock();
1303  if (m_modifier.outputModifier != nullptr) {
1304  if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1305  m_modifier.outputMutex.unlock();
1306  return false;
1307  }
1308  m_modifier.outputModifier->modifyOutgoingData(writer);
1309  }
1310  m_modifier.outputMutex.unlock();
1311  if (!m_logNeeded) {
1312  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1313  }
1314  // logging is desired, so we need to wrap up and log this send
1315  // (and any reply it gets) -- TODO not yet supported
1316  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1317 }
1318 
1320  int mode,
1321  PortReader* reader,
1322  const PortWriter* callback)
1323 {
1324  if (m_interrupted || m_finishing) {
1325  return false;
1326  }
1327 
1328  bool all_ok = true;
1329  bool gotReply = false;
1330  int logCount = 0;
1331  std::string envelopeString = m_envelope;
1332 
1333  // Pass a message to all output units for sending on. We could
1334  // be doing more here to cache the serialization of the message
1335  // and reuse it across output connections. However, one key
1336  // optimization is present: external blocks written by
1337  // yarp::os::ConnectionWriter::appendExternalBlock are never
1338  // copied. So for example the core image array in a yarp::sig::Image
1339  // is untouched by the port communications code.
1340 
1341  yCTrace(PORTCORE, "------- send in real");
1342 
1343  // Give user the chance to know that this object is about to be
1344  // written.
1345  writer.onCommencement();
1346 
1347  // All user-facing parts of this port will be blocked on this
1348  // operation, so we'll want to be snappy. How long the
1349  // operation lasts will depend on these flags:
1350  // * waitAfterSend
1351  // * waitBeforeSend
1352  // set by setWaitAfterSend() and setWaitBeforeSend().
1353  m_stateSemaphore.wait();
1354 
1355  // If the port is shutting down, abort.
1356  if (m_finished) {
1357  m_stateSemaphore.post();
1358  return false;
1359  }
1360 
1361  yCTrace(PORTCORE, "------- send in");
1362  // Prepare a "packet" for tracking a single message which
1363  // may travel by multiple outputs.
1364  m_packetMutex.lock();
1365  PortCorePacket* packet = m_packets.getFreePacket();
1366  yCAssert(PORTCORE, packet != nullptr);
1367  packet->setContent(&writer, false, callback);
1368  m_packetMutex.unlock();
1369 
1370  // Scan connections, placing message everyhere we can.
1371  for (auto unit : m_units) {
1372  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1373  bool log = (!unit->getMode().empty());
1374  if (log) {
1375  // Some connections are for logging only.
1376  logCount++;
1377  }
1378  bool ok = (mode == PORTCORE_SEND_NORMAL) ? (!log) : (log);
1379  if (!ok) {
1380  continue;
1381  }
1382  bool waiter = m_waitAfterSend || (mode == PORTCORE_SEND_LOG);
1383  yCTrace(PORTCORE, "------- -- inc");
1384  m_packetMutex.lock();
1385  packet->inc(); // One more connection carrying message.
1386  m_packetMutex.unlock();
1387  yCTrace(PORTCORE, "------- -- presend");
1388  bool gotReplyOne = false;
1389  // Send the message off on this connection.
1390  void* out = unit->send(writer,
1391  reader,
1392  (callback != nullptr) ? callback : (&writer),
1393  (void*)packet,
1394  envelopeString,
1395  waiter,
1396  m_waitBeforeSend,
1397  &gotReplyOne);
1398  gotReply = gotReply || gotReplyOne;
1399  yCTrace(PORTCORE, "------- -- send");
1400  if (out != nullptr) {
1401  // We got back a report of a message already sent.
1402  m_packetMutex.lock();
1403  (static_cast<PortCorePacket*>(out))->dec(); // Message on one fewer connections.
1404  m_packets.checkPacket(static_cast<PortCorePacket*>(out));
1405  m_packetMutex.unlock();
1406  }
1407  if (waiter) {
1408  if (unit->isFinished()) {
1409  all_ok = false;
1410  }
1411  }
1412  yCTrace(PORTCORE, "------- -- dec");
1413  }
1414  }
1415  yCTrace(PORTCORE, "------- pack check");
1416  m_packetMutex.lock();
1417 
1418  // We no longer concern ourselves with the message.
1419  // It may or may not be traveling on some connections.
1420  // But that is not our problem anymore.
1421  packet->dec();
1422 
1423  m_packets.checkPacket(packet);
1424  m_packetMutex.unlock();
1425  yCTrace(PORTCORE, "------- packed");
1426  yCTrace(PORTCORE, "------- send out");
1427  if (mode == PORTCORE_SEND_LOG) {
1428  if (logCount == 0) {
1429  m_logNeeded = false;
1430  }
1431  }
1432  m_stateSemaphore.post();
1433  yCTrace(PORTCORE, "------- send out real");
1434 
1435  if (m_waitAfterSend && reader != nullptr) {
1436  all_ok = all_ok && gotReply;
1437  }
1438 
1439  return all_ok;
1440 }
1441 
1442 
1444 {
1445  bool writing = false;
1446 
1447  m_stateSemaphore.wait();
1448 
1449  // Check if any port is currently writing. TODO optimize
1450  // this query by counting down with notifyCompletion().
1451  if (!m_finished) {
1452  for (auto unit : m_units) {
1453  if ((unit != nullptr) && !unit->isFinished() && unit->isBusy()) {
1454  writing = true;
1455  }
1456  }
1457  }
1458 
1459  m_stateSemaphore.post();
1460 
1461  return writing;
1462 }
1463 
1464 
1466 {
1467  cleanUnits(false);
1468  m_packetMutex.lock();
1469  int result = m_inputCount;
1470  m_packetMutex.unlock();
1471  return result;
1472 }
1473 
1475 {
1476  cleanUnits(false);
1477  m_packetMutex.lock();
1478  int result = m_outputCount;
1479  m_packetMutex.unlock();
1480  return result;
1481 }
1482 
1483 
1484 void PortCore::notifyCompletion(void* tracker)
1485 {
1486  yCTrace(PORTCORE, "starting notifyCompletion");
1487  m_packetMutex.lock();
1488  if (tracker != nullptr) {
1489  (static_cast<PortCorePacket*>(tracker))->dec();
1490  m_packets.checkPacket(static_cast<PortCorePacket*>(tracker));
1491  }
1492  m_packetMutex.unlock();
1493  yCTrace(PORTCORE, "stopping notifyCompletion");
1494 }
1495 
1496 
1498 {
1499  m_envelopeWriter.restart();
1500  bool ok = envelope.write(m_envelopeWriter);
1501  if (ok) {
1502  setEnvelope(m_envelopeWriter.toString());
1503  }
1504  return ok;
1505 }
1506 
1507 
1508 void PortCore::setEnvelope(const std::string& envelope)
1509 {
1510  m_envelope = envelope;
1511  for (size_t i = 0; i < m_envelope.length(); i++) {
1512  // It looks like envelopes are constrained to be printable ASCII?
1513  // I'm not sure why this would be. TODO check.
1514  if (m_envelope[i] < 32) {
1515  m_envelope = m_envelope.substr(0, i);
1516  break;
1517  }
1518  }
1519  yCDebug(PORTCORE, "set envelope to %s", m_envelope.c_str());
1520 }
1521 
1523 {
1524  return m_envelope;
1525 }
1526 
1528 {
1529  StringInputStream sis;
1530  sis.add(m_envelope);
1531  sis.add("\r\n");
1533  Route route;
1534  sbr.reset(sis, nullptr, route, 0, true);
1535  return envelope.read(sbr);
1536 }
1537 
1538 // Make an RPC connection to talk to a ROS API, send a message, get reply.
1539 // NOTE: ROS support can now be moved out of here, once all documentation
1540 // of older ways to interoperate with it are purged and people stop
1541 // doing it.
1542 static bool __pc_rpc(const Contact& c,
1543  const char* carrier,
1544  Bottle& writer,
1545  Bottle& reader,
1546  bool verbose)
1547 {
1548  ContactStyle style;
1549  style.quiet = !verbose;
1550  style.timeout = 4;
1551  style.carrier = carrier;
1552  bool ok = Network::write(c, writer, reader, style);
1553  return ok;
1554 }
1555 
1556 // ACE is sometimes confused by localhost aliases, in a ROS-incompatible
1557 // way. This method does a quick sanity check if we are using ROS.
1558 static bool __tcp_check(const Contact& c)
1559 {
1560 #ifdef YARP_HAS_ACE
1561  ACE_INET_Addr addr;
1562  int result = addr.set(c.getPort(), c.getHost().c_str());
1563  if (result != 0) {
1564  yCWarning(PORTCORE, "ACE choked on %s:%d\n", c.getHost().c_str(), c.getPort());
1565  }
1566  result = addr.set(c.getPort(), "127.0.0.1");
1567  if (result != 0) {
1568  yCWarning(PORTCORE, "ACE choked on 127.0.0.1:%d\n", c.getPort());
1569  }
1570  result = addr.set(c.getPort(), "127.0.1.1");
1571  if (result != 0) {
1572  yCWarning(PORTCORE, "ACE choked on 127.0.1.1:%d\n", c.getPort());
1573  }
1574 #endif
1575  return true;
1576 }
1577 
1578 namespace {
1579 enum class PortCoreCommand : yarp::conf::vocab32_t
1580 {
1581  Unknown = 0,
1582  Help = yarp::os::createVocab('h', 'e', 'l', 'p'),
1583  Ver = yarp::os::createVocab('v', 'e', 'r'),
1584  Pray = yarp::os::createVocab('p', 'r', 'a', 'y'),
1585  Add = yarp::os::createVocab('a', 'd', 'd'),
1586  Del = yarp::os::createVocab('d', 'e', 'l'),
1587  Atch = yarp::os::createVocab('a', 't', 'c', 'h'),
1588  Dtch = yarp::os::createVocab('d', 't', 'c', 'h'),
1589  List = yarp::os::createVocab('l', 'i', 's', 't'),
1590  Set = yarp::os::createVocab('s', 'e', 't'),
1591  Get = yarp::os::createVocab('g', 'e', 't'),
1592  Prop = yarp::os::createVocab('p', 'r', 'o', 'p'),
1593  RosPublisherUpdate = yarp::os::createVocab('r', 'p', 'u', 'p'),
1594  RosRequestTopic = yarp::os::createVocab('r', 't', 'o', 'p'),
1595  RosGetPid = yarp::os::createVocab('p', 'i', 'd'),
1596  RosGetBusInfo = yarp::os::createVocab('b', 'u', 's'),
1597 };
1598 
1599 enum class PortCoreConnectionDirection : yarp::conf::vocab32_t
1600 {
1601  Error = 0,
1602  Out = yarp::os::createVocab('o', 'u', 't'),
1603  In = yarp::os::createVocab('i', 'n'),
1604 };
1605 
1606 enum class PortCorePropertyAction : yarp::conf::vocab32_t
1607 {
1608  Error = 0,
1609  Get = yarp::os::createVocab('g', 'e', 't'),
1610  Set = yarp::os::createVocab('s', 'e', 't')
1611 };
1612 
1613 PortCoreCommand parseCommand(const yarp::os::Value& v)
1614 {
1615  if (v.isString()) {
1616  // We support ROS client API these days. Here we recode some long ROS
1617  // command names, just for convenience.
1618  std::string cmd = v.asString();
1619  if (cmd == "publisherUpdate") {
1620  return PortCoreCommand::RosPublisherUpdate;
1621  }
1622  if (cmd == "requestTopic") {
1623  return PortCoreCommand::RosRequestTopic;
1624  }
1625  if (cmd == "getPid") {
1626  return PortCoreCommand::RosGetPid;
1627  }
1628  if (cmd == "getBusInfo") {
1629  return PortCoreCommand::RosGetBusInfo;
1630  }
1631  }
1632 
1633  auto cmd = static_cast<PortCoreCommand>(v.asVocab());
1634  switch (cmd) {
1635  case PortCoreCommand::Help:
1636  case PortCoreCommand::Ver:
1637  case PortCoreCommand::Pray:
1638  case PortCoreCommand::Add:
1639  case PortCoreCommand::Del:
1640  case PortCoreCommand::Atch:
1641  case PortCoreCommand::Dtch:
1642  case PortCoreCommand::List:
1643  case PortCoreCommand::Set:
1644  case PortCoreCommand::Get:
1645  case PortCoreCommand::Prop:
1646  case PortCoreCommand::RosPublisherUpdate:
1647  case PortCoreCommand::RosRequestTopic:
1648  case PortCoreCommand::RosGetPid:
1649  case PortCoreCommand::RosGetBusInfo:
1650  return cmd;
1651  default:
1652  return PortCoreCommand::Unknown;
1653  }
1654 }
1655 
1656 PortCoreConnectionDirection parseConnectionDirection(yarp::conf::vocab32_t v, bool errorIsOut = false)
1657 {
1658  auto dir = static_cast<PortCoreConnectionDirection>(v);
1659  switch (dir) {
1660  case PortCoreConnectionDirection::In:
1661  case PortCoreConnectionDirection::Out:
1662  return dir;
1663  default:
1664  return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1665  }
1666 }
1667 
1668 PortCorePropertyAction parsePropertyAction(yarp::conf::vocab32_t v)
1669 {
1670  auto action = static_cast<PortCorePropertyAction>(v);
1671  switch (action) {
1672  case PortCorePropertyAction::Get:
1673  case PortCorePropertyAction::Set:
1674  return action;
1675  default:
1676  return PortCorePropertyAction::Error;
1677  }
1678 }
1679 
1680 void describeRoute(const Route& route, Bottle& result)
1681 {
1682  Bottle& bfrom = result.addList();
1683  bfrom.addString("from");
1684  bfrom.addString(route.getFromName());
1685 
1686  Bottle& bto = result.addList();
1687  bto.addString("to");
1688  bto.addString(route.getToName());
1689 
1690  Bottle& bcarrier = result.addList();
1691  bcarrier.addString("carrier");
1692  bcarrier.addString(route.getCarrierName());
1693 
1694  Carrier* carrier = Carriers::chooseCarrier(route.getCarrierName());
1695  if (carrier->isConnectionless()) {
1696  Bottle& bconnectionless = result.addList();
1697  bconnectionless.addString("connectionless");
1698  bconnectionless.addInt32(1);
1699  }
1700  if (!carrier->isPush()) {
1701  Bottle& breverse = result.addList();
1702  breverse.addString("push");
1703  breverse.addInt32(0);
1704  }
1705  delete carrier;
1706 }
1707 
1708 } // namespace
1709 
1711  void* id)
1712 {
1713  Bottle cmd;
1714  Bottle result;
1715 
1716  // We've received a message to the port that is marked as administrative.
1717  // That means that instead of passing it along as data to the user of the
1718  // port, the port itself is responsible for reading and responding to
1719  // it. So let's read the message and see what we're supposed to do.
1720  cmd.read(reader);
1721 
1722  yCDebug(PORTCORE, "Port %s received command %s", getName().c_str(), cmd.toString().c_str());
1723 
1724  auto handleAdminHelpCmd = []() {
1725  Bottle result;
1726  // We give a list of the most useful administrative commands.
1727  result.addVocab(yarp::os::createVocab('m', 'a', 'n', 'y'));
1728  result.addString("[help] # give this help");
1729  result.addString("[ver] # report protocol version information");
1730  result.addString("[add] $portname # add an output connection");
1731  result.addString("[add] $portname $car # add an output with a given protocol");
1732  result.addString("[del] $portname # remove an input or output connection");
1733  result.addString("[list] [in] # list input connections");
1734  result.addString("[list] [out] # list output connections");
1735  result.addString("[list] [in] $portname # give details for input");
1736  result.addString("[list] [out] $portname # give details for output");
1737  result.addString("[prop] [get] # get all user-defined port properties");
1738  result.addString("[prop] [get] $prop # get a user-defined port property (prop, val)");
1739  result.addString("[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1740  result.addString("[prop] [get] $portname # get Qos properties of a connection to/from a port");
1741  result.addString("[prop] [set] $portname # set Qos properties of a connection to/from a port");
1742  result.addString("[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1743  result.addString("[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1744  result.addString("[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1745  result.addString("[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1746  result.addString("[dtch] [out] # detach portmonitor plug-in from the port's output");
1747  result.addString("[dtch] [in] # detach portmonitor plug-in from the port's input");
1748  //result.addString("[atch] $portname $prop # attach a portmonitor plug-in to the connection to/from $portname");
1749  //result.addString("[dtch] $portname # detach any portmonitor plug-in from the connection to/from $portname");
1750  return result;
1751  };
1752 
1753  auto handleAdminVerCmd = []() {
1754  // Gives a version number for the administrative commands.
1755  // It is distinct from YARP library versioning.
1756  Bottle result;
1757  result.addVocab(Vocab::encode("ver"));
1758  result.addInt32(1);
1759  result.addInt32(2);
1760  result.addInt32(3);
1761  return result;
1762  };
1763 
1764  auto handleAdminPrayCmd = [this]() {
1765  // Strongly inspired by nethack #pray command:
1766  // https://nethackwiki.com/wiki/Prayer
1767  // http://www.steelypips.org/nethack/pray.html
1768 
1769  Bottle result;
1770 
1771  bool found = false;
1772  std::string name = yarp::conf::environment::getEnvironment("YARP_ROBOT_NAME", &found);
1773  if (!found) {
1774  name = getName();
1775  // Remove initial "/"
1776  while (name[0] == '/') {
1777  name = name.substr(1);
1778  }
1779  // Keep only the first part of the port name
1780  auto i = name.find('/');
1781  if (i != std::string::npos) {
1782  name = name.substr(0, i);
1783  }
1784  }
1785 
1786  std::random_device rd;
1787  std::mt19937 mt(rd());
1788  std::uniform_int_distribution<int> dist2(0,1);
1789  auto d2 = std::bind(dist2, mt);
1790 
1791  result.addString("You begin praying to " + name + ".");
1792  result.addString("You finish your prayer.");
1793 
1794  static const char* godvoices[] = {
1795  "booms out",
1796  "thunders",
1797  "rings out",
1798  "booms",
1799  };
1800  std::uniform_int_distribution<int> godvoices_dist(0, (sizeof(godvoices) / sizeof(godvoices[0])) - 1);
1801  auto godvoice = [&]() {
1802  return std::string(godvoices[godvoices_dist(mt)]);
1803  };
1804 
1805  static const char* creatures[] = {
1806  "mortal",
1807  "creature",
1808  "robot",
1809  };
1810  std::uniform_int_distribution<int> creatures_dist(0, (sizeof(creatures) / sizeof(creatures[0])) - 1);
1811  auto creature = [&]() {
1812  return std::string(creatures[creatures_dist(mt)]);
1813  };
1814 
1815  static const char* auras[] = {
1816  "amber",
1817  "light blue",
1818  "golden",
1819  "white",
1820  "orange",
1821  "black",
1822  };
1823  std::uniform_int_distribution<int> auras_dist(0, (sizeof(auras) / sizeof(auras[0])) - 1);
1824  auto aura = [&]() {
1825  return std::string(auras[auras_dist(mt)]);
1826  };
1827 
1828  static const char* items[] = {
1829  "keyboard",
1830  "mouse",
1831  "monitor",
1832  "headphones",
1833  "smartphone",
1834  "wallet",
1835  "eyeglasses",
1836  "shirt",
1837  };
1838  std::uniform_int_distribution<int> items_dist(0, (sizeof(items) / sizeof(items[0])) - 1);
1839  auto item = [&]() {
1840  return std::string(items[items_dist(mt)]);
1841  };
1842 
1843  static const char* blessings[] = {
1844  "You feel more limber.",
1845  "The slime disappears.",
1846  "Your amulet vanishes! You can breathe again.",
1847  "You can breathe again.",
1848  "You are back on solid ground.",
1849  "Your stomach feels content.",
1850  "You feel better.",
1851  "You feel much better.",
1852  "Your surroundings change.",
1853  "Your shape becomes uncertain.",
1854  "Your chain disappears.",
1855  "There's a tiger in your tank.",
1856  "You feel in good health again.",
1857  "Your eye feels better.",
1858  "Your eyes feel better.",
1859  "Looks like you are back in Kansas.",
1860  "Your <ITEM> softly glows <AURA>.",
1861  };
1862  std::uniform_int_distribution<int> blessings_dist(0, (sizeof(blessings) / sizeof(blessings[0])) - 1);
1863  auto blessing = [&](){
1864  auto blessing = std::string(blessings[blessings_dist(mt)]);
1865  blessing = std::regex_replace(blessing, std::regex("<ITEM>"), item());
1866  blessing = std::regex_replace(blessing, std::regex("<AURA>"), aura());
1867  return blessing;
1868  };
1869 
1870  std::uniform_int_distribution<int> dist13(0,12);
1871  switch(dist13(mt)) {
1872  case 0:
1873  case 1:
1874  result.addString("You feel that " + name + " is " + (d2() ? "bummed" : "displeased") + ".");
1875  break;
1876  case 2:
1877  case 3:
1878  result.addString("The voice of " + name + " " + godvoice() +
1879  ": \"Thou " + (d2() ? "hast strayed from the path" : "art arrogant") +
1880  ", " + creature() + ". Thou must relearn thy lessons!\"");
1881  break;
1882  case 4:
1883  case 5:
1884  result.addString("The voice of " + name + " " + godvoice() +
1885  ": \"Thou hast angered me.\"");
1886  result.addString("A black glow surrounds you.");
1887  break;
1888  case 6:
1889  result.addString("The voice of " + name + " " + godvoice() +
1890  ": \"Thou hast angered me.\"");
1891  break;
1892  case 7:
1893  case 8:
1894  result.addString("The voice of " + name + " " + godvoice() +
1895  ": \"Thou durst " + (d2() ? "scorn" : "call upon") +
1896  " me? Then die, " + creature() + "!\"");
1897  break;
1898  case 9:
1899  result.addString("You feel that " + name + " is " + (d2() ? "pleased as punch" : "well-pleased") + ".");
1900  result.addString(blessing());
1901  break;
1902  case 10:
1903  result.addString("You feel that " + name + " is " + (d2() ? "ticklish" : "pleased") + ".");
1904  result.addString(blessing());
1905  break;
1906  case 11:
1907  result.addString("You feel that " + name + " is " + (d2() ? "full" : "satisfied") + ".");
1908  result.addString(blessing());
1909  break;
1910  default:
1911  result.addString("The voice of " + name + " " + godvoice() +
1912  ": \"Thou hast angered me.\"");
1913  result.addString("Suddenly, a bolt of lightning strikes you!");
1914  result.addString("You fry to a crisp!");
1915  break;
1916  }
1917 
1918  return result;
1919  };
1920 
1921  auto handleAdminAddCmd = [this, id](std::string output,
1922  const std::string& carrier) {
1923  // Add an output to the port.
1924  Bottle result;
1925  StringOutputStream cache;
1926  if (!carrier.empty()) {
1927  output = carrier + ":/" + output;
1928  }
1929  addOutput(output, id, &cache, false);
1930  std::string r = cache.toString();
1931  int v = (r[0] == 'A') ? 0 : -1;
1932  result.addInt32(v);
1933  result.addString(r);
1934  return result;
1935  };
1936 
1937  auto handleAdminDelCmd = [this, id](const std::string& dest) {
1938  // Delete any inputs or outputs involving the named port.
1939  Bottle result;
1940  StringOutputStream cache;
1941  removeOutput(dest, id, &cache);
1942  std::string r1 = cache.toString();
1943  cache.reset();
1944  removeInput(dest, id, &cache);
1945  std::string r2 = cache.toString();
1946  int v = (r1[0] == 'R' || r2[0] == 'R') ? 0 : -1;
1947  result.addInt32(v);
1948  if (r1[0] == 'R' && r2[0] != 'R') {
1949  result.addString(r1);
1950  } else if (r1[0] != 'R' && r2[0] == 'R') {
1951  result.addString(r2);
1952  } else {
1953  result.addString(r1 + r2);
1954  }
1955  return result;
1956  };
1957 
1958  auto handleAdminAtchCmd = [this](PortCoreConnectionDirection direction,
1959  Property prop) {
1960  Bottle result;
1961  switch (direction) {
1962  case PortCoreConnectionDirection::Out: {
1963  std::string errMsg;
1964  if (!attachPortMonitor(prop, true, errMsg)) {
1965  result.addVocab(Vocab::encode("fail"));
1966  result.addString(errMsg);
1967  } else {
1968  result.addVocab(Vocab::encode("ok"));
1969  }
1970  } break;
1971  case PortCoreConnectionDirection::In: {
1972  std::string errMsg;
1973  if (!attachPortMonitor(prop, false, errMsg)) {
1974  result.addVocab(Vocab::encode("fail"));
1975  result.addString(errMsg);
1976  } else {
1977  result.addVocab(Vocab::encode("ok"));
1978  }
1979  } break;
1980  case PortCoreConnectionDirection::Error:
1981  result.addVocab(Vocab::encode("fail"));
1982  result.addString("attach command must be followd by [out] or [in]");
1983  };
1984  return result;
1985  };
1986 
1987  auto handleAdminDtchCmd = [this](PortCoreConnectionDirection direction) {
1988  Bottle result;
1989  switch (direction) {
1990  case PortCoreConnectionDirection::Out: {
1991  if (dettachPortMonitor(true)) {
1992  result.addVocab(Vocab::encode("ok"));
1993  } else {
1994  result.addVocab(Vocab::encode("fail"));
1995  }
1996  } break;
1997  case PortCoreConnectionDirection::In: {
1998  if (dettachPortMonitor(false)) {
1999  result.addVocab(Vocab::encode("ok"));
2000  } else {
2001  result.addVocab(Vocab::encode("fail"));
2002  }
2003  } break;
2004  case PortCoreConnectionDirection::Error:
2005  result.addVocab(Vocab::encode("fail"));
2006  result.addString("detach command must be followd by [out] or [in]");
2007  };
2008  return result;
2009  };
2010 
2011  auto handleAdminListCmd = [this](const PortCoreConnectionDirection direction,
2012  const std::string& target) {
2013  Bottle result;
2014  switch (direction) {
2015  case PortCoreConnectionDirection::In: {
2016  // Return a list of all input connections.
2017  m_stateSemaphore.wait();
2018  for (auto unit : m_units) {
2019  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2020  Route route = unit->getRoute();
2021  if (target.empty()) {
2022  const std::string& name = route.getFromName();
2023  if (!name.empty()) {
2024  result.addString(name);
2025  }
2026  } else if (route.getFromName() == target) {
2027  describeRoute(route, result);
2028  }
2029  }
2030  }
2031  m_stateSemaphore.post();
2032  } break;
2033  case PortCoreConnectionDirection::Out: {
2034  // Return a list of all output connections.
2035  m_stateSemaphore.wait();
2036  for (auto unit : m_units) {
2037  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2038  Route route = unit->getRoute();
2039  if (target.empty()) {
2040  result.addString(route.getToName());
2041  } else if (route.getToName() == target) {
2042  describeRoute(route, result);
2043  }
2044  }
2045  }
2046  m_stateSemaphore.post();
2047  } break;
2048  case PortCoreConnectionDirection::Error:
2049  // Should never happen
2050  yCAssert(PORTCORE, false);
2051  break;
2052  }
2053  return result;
2054  };
2055 
2056  auto handleAdminSetInCmd = [this](const std::string& target,
2057  const Property& property) {
2058  Bottle result;
2059  // Set carrier parameters on a given input connection.
2060  m_stateSemaphore.wait();
2061  if (target.empty()) {
2062  result.addInt32(-1);
2063  result.addString("target port is not specified.\r\n");
2064  } else {
2065  if (target == getName()) {
2066  std::string errMsg;
2067  if (!setParamPortMonitor(property, false, errMsg)) {
2068  result.addVocab(Vocab::encode("fail"));
2069  result.addString(errMsg);
2070  } else {
2071  result.addVocab(Vocab::encode("ok"));
2072  }
2073  } else {
2074  for (auto unit : m_units) {
2075  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2076  Route route = unit->getRoute();
2077  if (route.getFromName() == target) {
2078  unit->setCarrierParams(property);
2079  result.addInt32(0);
2080  std::string msg = "Configured connection from ";
2081  msg += route.getFromName();
2082  msg += "\r\n";
2083  result.addString(msg);
2084  break;
2085  }
2086  }
2087  }
2088  }
2089  if (result.size() == 0) {
2090  result.addInt32(-1);
2091  std::string msg = "Could not find an incoming connection from ";
2092  msg += target;
2093  msg += "\r\n";
2094  result.addString(msg);
2095  }
2096  }
2097  m_stateSemaphore.post();
2098  return result;
2099  };
2100 
2101  auto handleAdminSetOutCmd = [this](const std::string& target,
2102  const Property& property) {
2103  Bottle result;
2104  // Set carrier parameters on a given output connection.
2105  m_stateSemaphore.wait();
2106  if (target.empty()) {
2107  result.addInt32(-1);
2108  result.addString("target port is not specified.\r\n");
2109  } else {
2110  if (target == getName()) {
2111  std::string errMsg;
2112  if (!setParamPortMonitor(property, true, errMsg)) {
2113  result.addVocab(Vocab::encode("fail"));
2114  result.addString(errMsg);
2115  } else {
2116  result.addVocab(Vocab::encode("ok"));
2117  }
2118  } else {
2119  for (auto unit : m_units) {
2120  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2121  Route route = unit->getRoute();
2122  if (route.getToName() == target) {
2123  unit->setCarrierParams(property);
2124  result.addInt32(0);
2125  std::string msg = "Configured connection to ";
2126  msg += route.getToName();
2127  msg += "\r\n";
2128  result.addString(msg);
2129  break;
2130  }
2131  }
2132  }
2133  }
2134  if (result.size() == 0) {
2135  result.addInt32(-1);
2136  std::string msg = "Could not find an incoming connection to ";
2137  msg += target;
2138  msg += "\r\n";
2139  result.addString(msg);
2140  }
2141  }
2142  m_stateSemaphore.post();
2143  return result;
2144  };
2145 
2146  auto handleAdminGetInCmd = [this](const std::string& target) {
2147  Bottle result;
2148  // Get carrier parameters for a given input connection.
2149  m_stateSemaphore.wait();
2150  if (target.empty()) {
2151  result.addInt32(-1);
2152  result.addString("target port is not specified.\r\n");
2153  } else if (target == getName()) {
2154  yarp::os::Property property;
2155  std::string errMsg;
2156  if (!getParamPortMonitor(property, false, errMsg)) {
2157  result.addVocab(Vocab::encode("fail"));
2158  result.addString(errMsg);
2159  } else {
2160  result.addDict() = property;
2161  }
2162  } else {
2163  for (auto unit : m_units) {
2164  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2165  Route route = unit->getRoute();
2166  if (route.getFromName() == target) {
2167  yarp::os::Property property;
2168  unit->getCarrierParams(property);
2169  result.addDict() = property;
2170  break;
2171  }
2172  }
2173  }
2174  if (result.size() == 0) {
2175  result.addInt32(-1);
2176  std::string msg = "Could not find an incoming connection from ";
2177  msg += target;
2178  msg += "\r\n";
2179  result.addString(msg);
2180  }
2181  }
2182  m_stateSemaphore.post();
2183  return result;
2184  };
2185 
2186  auto handleAdminGetOutCmd = [this](const std::string& target) {
2187  Bottle result;
2188  // Get carrier parameters for a given output connection.
2189  m_stateSemaphore.wait();
2190  if (target.empty()) {
2191  result.addInt32(-1);
2192  result.addString("target port is not specified.\r\n");
2193  } else if (target == getName()) {
2194  yarp::os::Property property;
2195  std::string errMsg;
2196  if (!getParamPortMonitor(property, true, errMsg)) {
2197  result.addVocab(Vocab::encode("fail"));
2198  result.addString(errMsg);
2199  } else {
2200  result.addDict() = property;
2201  }
2202  } else {
2203  for (auto unit : m_units) {
2204  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2205  Route route = unit->getRoute();
2206  if (route.getToName() == target) {
2207  yarp::os::Property property;
2208  unit->getCarrierParams(property);
2209  result.addDict() = property;
2210  break;
2211  }
2212  }
2213  }
2214  if (result.size() == 0) {
2215  result.addInt32(-1);
2216  std::string msg = "Could not find an incoming connection to ";
2217  msg += target;
2218  msg += "\r\n";
2219  result.addString(msg);
2220  }
2221  }
2222  m_stateSemaphore.post();
2223  return result;
2224  };
2225 
2226  auto handleAdminPropGetCmd = [this](const std::string& key) {
2227  Bottle result;
2228  Property* p = acquireProperties(false);
2229  if (p != nullptr) {
2230  if (key.empty()) {
2231  result.fromString(p->toString());
2232  } else {
2233  // request: "prop get /portname"
2234  if (key[0] == '/') {
2235  bool bFound = false;
2236  // check for their own name
2237  if (key == getName()) {
2238  bFound = true;
2239  Bottle& sched = result.addList();
2240  sched.addString("sched");
2241  Property& sched_prop = sched.addDict();
2242  sched_prop.put("tid", static_cast<int>(this->getTid()));
2243  sched_prop.put("priority", this->getPriority());
2244  sched_prop.put("policy", this->getPolicy());
2245 
2247  Bottle& proc = result.addList();
2248  proc.addString("process");
2249  Property& proc_prop = proc.addDict();
2250  proc_prop.put("pid", info.pid);
2251  proc_prop.put("name", (info.pid != -1) ? info.name : "unknown");
2252  proc_prop.put("arguments", (info.pid != -1) ? info.arguments : "unknown");
2253  proc_prop.put("priority", info.schedPriority);
2254  proc_prop.put("policy", info.schedPolicy);
2255 
2257  Bottle& platform = result.addList();
2258  platform.addString("platform");
2259  Property& platform_prop = platform.addDict();
2260  platform_prop.put("os", pinfo.name);
2261  platform_prop.put("hostname", m_address.getHost());
2262 
2263  unsigned int f = getFlags();
2264  bool is_input = (f & PORTCORE_IS_INPUT) != 0;
2265  bool is_output = (f & PORTCORE_IS_OUTPUT) != 0;
2266  bool is_rpc = (f & PORTCORE_IS_RPC) != 0;
2267  Bottle& port = result.addList();
2268  port.addString("port");
2269  Property& port_prop = port.addDict();
2270  port_prop.put("is_input", is_input);
2271  port_prop.put("is_output", is_output);
2272  port_prop.put("is_rpc", is_rpc);
2273  port_prop.put("type", getType().getName());
2274  } else {
2275  for (auto unit : m_units) {
2276  if ((unit != nullptr) && !unit->isFinished()) {
2277  Route route = unit->getRoute();
2278  std::string coreName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2279  if (key == coreName) {
2280  bFound = true;
2281  int priority = unit->getPriority();
2282  int policy = unit->getPolicy();
2283  int tos = getTypeOfService(unit);
2284  int tid = static_cast<int>(unit->getTid());
2285  Bottle& sched = result.addList();
2286  sched.addString("sched");
2287  Property& sched_prop = sched.addDict();
2288  sched_prop.put("tid", tid);
2289  sched_prop.put("priority", priority);
2290  sched_prop.put("policy", policy);
2291  Bottle& qos = result.addList();
2292  qos.addString("qos");
2293  Property& qos_prop = qos.addDict();
2294  qos_prop.put("tos", tos);
2295  }
2296  } // end isFinished()
2297  } // end for loop
2298  } // end portName == getname()
2299 
2300  if (!bFound) { // cannot find any port matches the requested one
2301  result.addVocab(Vocab::encode("fail"));
2302  std::string msg = "cannot find any connection to/from ";
2303  msg = msg + key;
2304  result.addString(msg);
2305  }
2306  // end of (portName[0] == '/')
2307  } else {
2308  result.add(p->find(key));
2309  }
2310  }
2311  }
2312  releaseProperties(p);
2313  return result;
2314  };
2315 
2316  auto handleAdminPropSetCmd = [this](const std::string& key,
2317  const Value& value,
2318  const Bottle& process,
2319  const Bottle& sched,
2320  const Bottle& qos) {
2321  Bottle result;
2322  Property* p = acquireProperties(false);
2323  bool bOk = true;
2324  if (p != nullptr) {
2325  p->put(key, value);
2326  // setting scheduling properties of all threads within the process
2327  // scope through the admin port
2328  // e.g. prop set /current_port (process ((priority 30) (policy 1)))
2329  if (!process.isNull()) {
2330  std::string portName = key;
2331  if ((!portName.empty()) && (portName[0] == '/')) {
2332  // check for their own name
2333  if (portName == getName()) {
2334  bOk = false;
2335  Bottle* process_prop = process.find("process").asList();
2336  if (process_prop != nullptr) {
2337  int prio = -1;
2338  int policy = -1;
2339  if (process_prop->check("priority")) {
2340  prio = process_prop->find("priority").asInt32();
2341  }
2342  if (process_prop->check("policy")) {
2343  policy = process_prop->find("policy").asInt32();
2344  }
2345  bOk = setProcessSchedulingParam(prio, policy);
2346  }
2347  }
2348  }
2349  }
2350  // check if we need to set the PortCoreUnit scheduling policy
2351  // e.g., "prop set /portname (sched ((priority 30) (policy 1)))"
2352  // The priority and policy values on Linux are:
2353  // SCHED_OTHER : policy=0, priority=[0 .. 0]
2354  // SCHED_FIFO : policy=1, priority=[1 .. 99]
2355  // SCHED_RR : policy=2, priority=[1 .. 99]
2356  if (!sched.isNull()) {
2357  if ((!key.empty()) && (key[0] == '/')) {
2358  bOk = false;
2359  for (auto unit : m_units) {
2360  if ((unit != nullptr) && !unit->isFinished()) {
2361  Route route = unit->getRoute();
2362  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2363 
2364  if (portName == key) {
2365  Bottle* sched_prop = sched.find("sched").asList();
2366  if (sched_prop != nullptr) {
2367  int prio = -1;
2368  int policy = -1;
2369  if (sched_prop->check("priority")) {
2370  prio = sched_prop->find("priority").asInt32();
2371  }
2372  if (sched_prop->check("policy")) {
2373  policy = sched_prop->find("policy").asInt32();
2374  }
2375  bOk = (unit->setPriority(prio, policy) != -1);
2376  } else {
2377  bOk = false;
2378  }
2379  break;
2380  }
2381  }
2382  }
2383  }
2384  }
2385  // check if we need to set the packet QOS policy
2386  // e.g., "prop set /portname (qos ((priority HIGH)))"
2387  // e.g., "prop set /portname (qos ((dscp AF12)))"
2388  // e.g., "prop set /portname (qos ((tos 12)))"
2389  if (!qos.isNull()) {
2390  if ((!key.empty()) && (key[0] == '/')) {
2391  bOk = false;
2392  for (auto unit : m_units) {
2393  if ((unit != nullptr) && !unit->isFinished()) {
2394  Route route = unit->getRoute();
2395  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2396  if (portName == key) {
2397  Bottle* qos_prop = qos.find("qos").asList();
2398  if (qos_prop != nullptr) {
2399  int tos = -1;
2400  if (qos_prop->check("priority")) {
2401  // set the packet TOS value on the socket based on some predefined
2402  // priority levels.
2403  // the expected levels are: LOW, NORM, HIGH, CRIT
2404  NetInt32 priority = qos_prop->find("priority").asVocab();
2405  int dscp;
2406  switch (priority) {
2407  case yarp::os::createVocab('L', 'O', 'W'):
2408  dscp = 10;
2409  break;
2410  case yarp::os::createVocab('N', 'O', 'R', 'M'):
2411  dscp = 0;
2412  break;
2413  case yarp::os::createVocab('H', 'I', 'G', 'H'):
2414  dscp = 36;
2415  break;
2416  case yarp::os::createVocab('C', 'R', 'I', 'T'):
2417  dscp = 44;
2418  break;
2419  default:
2420  dscp = -1;
2421  }
2422  if (dscp >= 0) {
2423  tos = (dscp << 2);
2424  }
2425  } else if (qos_prop->check("dscp")) {
2426  // Set the packet TOS value on the socket based on the DSCP level
2427  QosStyle::PacketPriorityDSCP dscp_class = QosStyle::getDSCPByVocab(qos_prop->find("dscp").asVocab());
2428  int dscp = -1;
2429  if (dscp_class == QosStyle::DSCP_Invalid) {
2430  auto dscp_val = qos_prop->find("dscp");
2431  if (dscp_val.isInt32()) {
2432  dscp = dscp_val.asInt32();
2433  }
2434  } else {
2435  dscp = static_cast<int>(dscp_class);
2436  }
2437  if ((dscp >= 0) && (dscp < 64)) {
2438  tos = (dscp << 2);
2439  }
2440  } else if (qos_prop->check("tos")) {
2441  // Set the TOS value directly
2442  auto tos_val = qos_prop->find("tos");
2443  if (tos_val.isInt32()) {
2444  tos = tos_val.asInt32();
2445  }
2446  }
2447  if (tos >= 0) {
2448  bOk = setTypeOfService(unit, tos);
2449  }
2450  } else {
2451  bOk = false;
2452  }
2453  break;
2454  }
2455  }
2456  }
2457  }
2458  }
2459  }
2460  releaseProperties(p);
2461  result.addVocab((bOk) ? Vocab::encode("ok") : Vocab::encode("fail"));
2462  return result;
2463  };
2464 
2465  // NOTE: YARP partially supports the ROS Slave API https://wiki.ros.org/ROS/Slave_API
2466 
2467  auto handleAdminRosPublisherUpdateCmd = [this](const std::string& topic, Bottle* pubs) {
2468  // When running against a ROS name server, we need to
2469  // support ROS-style callbacks for connecting publishers
2470  // with subscribers. Note: this should not be necessary
2471  // anymore, now that a dedicated yarp::os::Node class
2472  // has been implemented, but is still needed for older
2473  // ways of interfacing with ROS without using dedicated
2474  // node ports.
2475  Bottle result;
2476  if (pubs != nullptr) {
2477  Property listed;
2478  for (size_t i = 0; i < pubs->size(); i++) {
2479  std::string pub = pubs->get(i).asString();
2480  listed.put(pub, 1);
2481  }
2482  Property present;
2483  m_stateSemaphore.wait();
2484  for (auto unit : m_units) {
2485  if ((unit != nullptr) && unit->isPupped()) {
2486  std::string me = unit->getPupString();
2487  present.put(me, 1);
2488  if (!listed.check(me)) {
2489  unit->setDoomed();
2490  }
2491  }
2492  }
2493  m_stateSemaphore.post();
2494  for (size_t i = 0; i < pubs->size(); i++) {
2495  std::string pub = pubs->get(i).asString();
2496  if (!present.check(pub)) {
2497  yCDebug(PORTCORE, "ROS ADD %s", pub.c_str());
2498  Bottle req;
2499  Bottle reply;
2500  req.addString("requestTopic");
2501  NestedContact nc(getName());
2502  req.addString(nc.getNodeName());
2503  req.addString(topic);
2504  Bottle& lst = req.addList();
2505  Bottle& sublst = lst.addList();
2506  sublst.addString("TCPROS");
2507  yCDebug(PORTCORE, "Sending [%s] to %s", req.toString().c_str(), pub.c_str());
2508  Contact c = Contact::fromString(pub);
2509  if (!__pc_rpc(c, "xmlrpc", req, reply, false)) {
2510  fprintf(stderr, "Cannot connect to ROS subscriber %s\n", pub.c_str());
2511  // show diagnosics
2512  __pc_rpc(c, "xmlrpc", req, reply, true);
2513  __tcp_check(c);
2514  } else {
2515  Bottle* pref = reply.get(2).asList();
2516  std::string hostname;
2517  std::string carrier;
2518  int portnum = 0;
2519  if (reply.get(0).asInt32() != 1) {
2520  fprintf(stderr, "Failure looking up topic %s: %s\n", topic.c_str(), reply.toString().c_str());
2521  } else if (pref == nullptr) {
2522  fprintf(stderr, "Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2523  } else if (pref->get(0).asString() != "TCPROS") {
2524  fprintf(stderr, "Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->get(0).asString().c_str());
2525  } else {
2526  Value hostname2 = pref->get(1);
2527  Value portnum2 = pref->get(2);
2528  hostname = hostname2.asString();
2529  portnum = portnum2.asInt32();
2530  carrier = "tcpros+role.pub+topic.";
2531  carrier += topic;
2532  yCDebug(PORTCORE, "topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2533  }
2534  if (portnum != 0) {
2535  Contact addr(hostname, portnum);
2536  OutputProtocol* op = nullptr;
2537  Route r = Route(getName(), pub, carrier);
2538  op = Carriers::connect(addr);
2539  if (op == nullptr) {
2540  fprintf(stderr, "NO CONNECTION\n");
2541  std::exit(1);
2542  } else {
2543  op->attachPort(m_contactable);
2544  op->open(r);
2545  }
2546  Route route = op->getRoute();
2547  route.swapNames();
2548  op->rename(route);
2549  InputProtocol* ip = &(op->getInput());
2550  m_stateSemaphore.wait();
2551  PortCoreUnit* unit = new PortCoreInputUnit(*this,
2552  getNextIndex(),
2553  ip,
2554  true);
2555  yCAssert(PORTCORE, unit != nullptr);
2556  unit->setPupped(pub);
2557  unit->start();
2558  m_units.push_back(unit);
2559  m_stateSemaphore.post();
2560  }
2561  }
2562  }
2563  }
2564  }
2565  result.addInt32(1);
2566  result.addString("ok");
2567  return result;
2568  };
2569 
2570  auto handleAdminRosRequestTopicCmd = [this]() {
2571  // ROS-style query for topics.
2572  Bottle result;
2573  result.addInt32(1);
2574  NestedContact nc(getName());
2575  result.addString(nc.getNodeName());
2576  Bottle& lst = result.addList();
2577  Contact addr = getAddress();
2578  lst.addString("TCPROS");
2579  lst.addString(addr.getHost());
2580  lst.addInt32(addr.getPort());
2581  return result;
2582  };
2583 
2584  auto handleAdminRosGetPidCmd = []() {
2585  // ROS-style query for PID.
2586  Bottle result;
2587  result.addInt32(1);
2588  result.addString("");
2589  result.addInt32(yarp::os::impl::getpid());
2590  return result;
2591  };
2592 
2593  auto handleAdminRosGetBusInfoCmd = []() {
2594  // ROS-style query for bus information - we support this
2595  // in yarp::os::Node but not otherwise.
2596  Bottle result;
2597  result.addInt32(1);
2598  result.addString("");
2599  result.addList().addList();
2600  return result;
2601  };
2602 
2603  auto handleAdminUnknownCmd = [this](const Bottle& cmd) {
2604  Bottle result;
2605  bool ok = false;
2606  if (m_adminReader != nullptr) {
2607  DummyConnector con;
2608  cmd.write(con.getWriter());
2609  lockCallback();
2610  ok = m_adminReader->read(con.getReader());
2611  unlockCallback();
2612  if (ok) {
2613  result.read(con.getReader());
2614  }
2615  }
2616  if (!ok) {
2617  result.addVocab(Vocab::encode("fail"));
2618  result.addString("send [help] for list of valid commands");
2619  }
2620  return result;
2621  };
2622 
2623  const PortCoreCommand command = parseCommand(cmd.get(0));
2624  switch (command) {
2625  case PortCoreCommand::Help:
2626  result = handleAdminHelpCmd();
2627  break;
2628  case PortCoreCommand::Ver:
2629  result = handleAdminVerCmd();
2630  break;
2631  case PortCoreCommand::Pray:
2632  result = handleAdminPrayCmd();
2633  break;
2634  case PortCoreCommand::Add: {
2635  std::string output = cmd.get(1).asString();
2636  std::string carrier = cmd.get(2).asString();
2637  result = handleAdminAddCmd(std::move(output), carrier);
2638  } break;
2639  case PortCoreCommand::Del: {
2640  const std::string dest = cmd.get(1).asString();
2641  result = handleAdminDelCmd(dest);
2642  } break;
2643  case PortCoreCommand::Atch: {
2644  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab());
2645  Property prop(cmd.get(2).asString().c_str());
2646  result = handleAdminAtchCmd(direction, std::move(prop));
2647  } break;
2648  case PortCoreCommand::Dtch: {
2649  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab());
2650  result = handleAdminDtchCmd(direction);
2651  } break;
2652  case PortCoreCommand::List: {
2653  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2654  const std::string target = cmd.get(2).asString();
2655  result = handleAdminListCmd(direction, target);
2656  } break;
2657  case PortCoreCommand::Set: {
2658  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2659  const std::string target = cmd.get(2).asString();
2660  yarp::os::Property property;
2661  property.fromString(cmd.toString());
2662  switch (direction) {
2663  case PortCoreConnectionDirection::In:
2664  result = handleAdminSetInCmd(target, property);
2665  break;
2666  case PortCoreConnectionDirection::Out:
2667  result = handleAdminSetOutCmd(target, property);
2668  break;
2669  case PortCoreConnectionDirection::Error:
2670  yCAssert(PORTCORE, false); // Should never happen (error is out)
2671  break;
2672  }
2673  } break;
2674  case PortCoreCommand::Get: {
2675  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2676  const std::string target = cmd.get(2).asString();
2677  switch (direction) {
2678  case PortCoreConnectionDirection::In:
2679  result = handleAdminGetInCmd(target);
2680  break;
2681  case PortCoreConnectionDirection::Out:
2682  result = handleAdminGetOutCmd(target);
2683  break;
2684  case PortCoreConnectionDirection::Error:
2685  yCAssert(PORTCORE, false); // Should never happen (error is out)
2686  break;
2687  }
2688  } break;
2689  case PortCoreCommand::Prop: {
2690  PortCorePropertyAction action = parsePropertyAction(cmd.get(1).asVocab());
2691  const std::string key = cmd.get(2).asString();
2692  // Set/get arbitrary properties on a port.
2693  switch (action) {
2694  case PortCorePropertyAction::Get:
2695  result = handleAdminPropGetCmd(key);
2696  break;
2697  case PortCorePropertyAction::Set: {
2698  const Value& value = cmd.get(3);
2699  const Bottle& process = cmd.findGroup("process");
2700  const Bottle& sched = cmd.findGroup("sched");
2701  const Bottle& qos = cmd.findGroup("qos");
2702  result = handleAdminPropSetCmd(key, value, process, sched, qos);
2703  } break;
2704  case PortCorePropertyAction::Error:
2705  result.addVocab(Vocab::encode("fail"));
2706  result.addString("property action not known");
2707  break;
2708  }
2709  } break;
2710  case PortCoreCommand::RosPublisherUpdate: {
2711  yCDebug(PORTCORE, "publisherUpdate! --> %s", cmd.toString().c_str());
2712  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2713  std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString());
2714  Bottle* pubs = cmd.get(3).asList();
2715  result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2716  reader.requestDrop(); // ROS needs us to close down.
2717  } break;
2718  case PortCoreCommand::RosRequestTopic:
2719  yCDebug(PORTCORE, "requestTopic! --> %s", cmd.toString().c_str());
2720  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2721  // std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString()); // Currently unused
2722  // Bottle protocols = cmd.get(3).asList(); // Currently unused
2723  result = handleAdminRosRequestTopicCmd();
2724  reader.requestDrop(); // ROS likes to close down.
2725  break;
2726  case PortCoreCommand::RosGetPid:
2727  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2728  result = handleAdminRosGetPidCmd();
2729  reader.requestDrop(); // ROS likes to close down.
2730  break;
2731  case PortCoreCommand::RosGetBusInfo:
2732  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2733  result = handleAdminRosGetBusInfoCmd();
2734  reader.requestDrop(); // ROS likes to close down.
2735  break;
2736  case PortCoreCommand::Unknown:
2737  result = handleAdminUnknownCmd(cmd);
2738  break;
2739  }
2740 
2741  ConnectionWriter* writer = reader.getWriter();
2742  if (writer != nullptr) {
2743  result.write(*writer);
2744  }
2745 
2746  return true;
2747 }
2748 
2749 
2750 bool PortCore::setTypeOfService(PortCoreUnit* unit, int tos)
2751 {
2752  if (unit == nullptr) {
2753  return false;
2754  }
2755 
2756  yCDebug(PORTCORE, "Trying to set TOS = %d", tos);
2757 
2758  if (unit->isOutput()) {
2759  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2760  if (outUnit != nullptr) {
2761  OutputProtocol* op = outUnit->getOutPutProtocol();
2762  if (op != nullptr) {
2763  yCDebug(PORTCORE, "Trying to set TOS = %d on output unit", tos);
2764  bool ok = op->getOutputStream().setTypeOfService(tos);
2765  if (!ok) {
2766  yCWarning(PORTCORE, "Setting TOS on output unit failed");
2767  }
2768  return ok;
2769  }
2770  }
2771  }
2772 
2773  // Some of the input units may have output stream object to write back to
2774  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2775  // also configured for them.
2776 
2777 
2778  if (unit->isInput()) {
2779  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2780  if (inUnit != nullptr) {
2781  InputProtocol* ip = inUnit->getInPutProtocol();
2782  if ((ip != nullptr) && ip->getOutput().isOk()) {
2783  yCDebug(PORTCORE, "Trying to set TOS = %d on input unit", tos);
2784  bool ok = ip->getOutput().getOutputStream().setTypeOfService(tos);
2785  if (!ok) {
2786  yCWarning(PORTCORE, "Setting TOS on input unit failed");
2787  }
2788  return ok;
2789  }
2790  }
2791  }
2792  // if there is nothing to be set, returns true
2793  return true;
2794 }
2795 
2796 int PortCore::getTypeOfService(PortCoreUnit* unit)
2797 {
2798  if (unit == nullptr) {
2799  return -1;
2800  }
2801 
2802  if (unit->isOutput()) {
2803  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2804  if (outUnit != nullptr) {
2805  OutputProtocol* op = outUnit->getOutPutProtocol();
2806  if (op != nullptr) {
2807  return op->getOutputStream().getTypeOfService();
2808  }
2809  }
2810  }
2811 
2812  // Some of the input units may have output stream object to write back to
2813  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2814  // also configured for them.
2815 
2816 
2817  if (unit->isInput()) {
2818  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2819  if (inUnit != nullptr) {
2820  InputProtocol* ip = inUnit->getInPutProtocol();
2821  if ((ip != nullptr) && ip->getOutput().isOk()) {
2822  return ip->getOutput().getOutputStream().getTypeOfService();
2823  }
2824  }
2825  }
2826  return -1;
2827 }
2828 
2829 // attach a portmonitor plugin to the port or to a specific connection
2830 bool PortCore::attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg)
2831 {
2832  // attach to the current port
2833  Carrier* portmonitor = Carriers::chooseCarrier("portmonitor");
2834  if (portmonitor == nullptr) {
2835  errMsg = "Portmonitor carrier modifier cannot be find or it is not enabled in Yarp!";
2836  return false;
2837  }
2838 
2839  if (isOutput) {
2840  dettachPortMonitor(true);
2841  prop.put("source", getName());
2842  prop.put("destination", "");
2843  prop.put("sender_side", 1);
2844  prop.put("receiver_side", 0);
2845  prop.put("carrier", "");
2846  m_modifier.outputMutex.lock();
2847  m_modifier.outputModifier = portmonitor;
2848  if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2849  m_modifier.releaseOutModifier();
2850  errMsg = "Failed to configure the portmonitor plug-in";
2851  m_modifier.outputMutex.unlock();
2852  return false;
2853  }
2854  m_modifier.outputMutex.unlock();
2855  } else {
2856  dettachPortMonitor(false);
2857  prop.put("source", "");
2858  prop.put("destination", getName());
2859  prop.put("sender_side", 0);
2860  prop.put("receiver_side", 1);
2861  prop.put("carrier", "");
2862  m_modifier.inputMutex.lock();
2863  m_modifier.inputModifier = portmonitor;
2864  if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2865  m_modifier.releaseInModifier();
2866  errMsg = "Failed to configure the portmonitor plug-in";
2867  m_modifier.inputMutex.unlock();
2868  return false;
2869  }
2870  m_modifier.inputMutex.unlock();
2871  }
2872  return true;
2873 }
2874 
2875 // detach the portmonitor from the port or specific connection
2876 bool PortCore::dettachPortMonitor(bool isOutput)
2877 {
2878  if (isOutput) {
2879  m_modifier.outputMutex.lock();
2880  m_modifier.releaseOutModifier();
2881  m_modifier.outputMutex.unlock();
2882  } else {
2883  m_modifier.inputMutex.lock();
2884  m_modifier.releaseInModifier();
2885  m_modifier.inputMutex.unlock();
2886  }
2887  return true;
2888 }
2889 
2890 bool PortCore::setParamPortMonitor(const yarp::os::Property& param,
2891  bool isOutput,
2892  std::string& errMsg)
2893 {
2894  if (isOutput) {
2895  m_modifier.outputMutex.lock();
2896  if (m_modifier.outputModifier == nullptr) {
2897  errMsg = "No port modifier is attached to the output";
2898  m_modifier.outputMutex.unlock();
2899  return false;
2900  }
2901  m_modifier.outputModifier->setCarrierParams(param);
2902  m_modifier.outputMutex.unlock();
2903  } else {
2904  m_modifier.inputMutex.lock();
2905  if (m_modifier.inputModifier == nullptr) {
2906  errMsg = "No port modifier is attached to the input";
2907  m_modifier.inputMutex.unlock();
2908  return false;
2909  }
2910  m_modifier.inputModifier->setCarrierParams(param);
2911  m_modifier.inputMutex.unlock();
2912  }
2913  return true;
2914 }
2915 
2916 bool PortCore::getParamPortMonitor(yarp::os::Property& param,
2917  bool isOutput,
2918  std::string& errMsg)
2919 {
2920  if (isOutput) {
2921  m_modifier.outputMutex.lock();
2922  if (m_modifier.outputModifier == nullptr) {
2923  errMsg = "No port modifier is attached to the output";
2924  m_modifier.outputMutex.unlock();
2925  return false;
2926  }
2927  m_modifier.outputModifier->getCarrierParams(param);
2928  m_modifier.outputMutex.unlock();
2929  } else {
2930  m_modifier.inputMutex.lock();
2931  if (m_modifier.inputModifier == nullptr) {
2932  errMsg = "No port modifier is attached to the input";
2933  m_modifier.inputMutex.unlock();
2934  return false;
2935  }
2936  m_modifier.inputModifier->getCarrierParams(param);
2937  m_modifier.inputMutex.unlock();
2938  }
2939  return true;
2940 }
2941 
2942 void PortCore::reportUnit(PortCoreUnit* unit, bool active)
2943 {
2944  YARP_UNUSED(active);
2945  if (unit != nullptr) {
2946  bool isLog = (!unit->getMode().empty());
2947  if (isLog) {
2948  m_logNeeded = true;
2949  }
2950  }
2951 }
2952 
2953 bool PortCore::setProcessSchedulingParam(int priority, int policy)
2954 {
2955 #if defined(__linux__)
2956  // set the sched properties of all threads within the process
2957  struct sched_param sch_param;
2958  sch_param.__sched_priority = priority;
2959 
2960  DIR* dir;
2961  char path[PATH_MAX];
2962  sprintf(path, "/proc/%d/task/", yarp::os::impl::getpid());
2963 
2964  dir = opendir(path);
2965  if (dir == nullptr) {
2966  return false;
2967  }
2968 
2969  struct dirent* d;
2970  char* end;
2971  long tid = 0;
2972  bool ret = true;
2973  while ((d = readdir(dir)) != nullptr) {
2974  if (isdigit(static_cast<unsigned char>(*d->d_name)) == 0) {
2975  continue;
2976  }
2977 
2978  tid = strtol(d->d_name, &end, 10);
2979  if (d->d_name == end || ((end != nullptr) && (*end != 0))) {
2980  closedir(dir);
2981  return false;
2982  }
2983  ret &= (sched_setscheduler(static_cast<pid_t>(tid), policy, &sch_param) == 0);
2984  }
2985  closedir(dir);
2986  return ret;
2987 #elif defined(YARP_HAS_ACE) // for other platforms
2988  // TODO: Check how to set the scheduling properties of all process's threads in Windows
2989  ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2990  int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2991  return (ret != -1);
2992 #else
2993  return false;
2994 #endif
2995 }
2996 
2998 {
2999  m_stateSemaphore.wait();
3000  if (!readOnly) {
3001  if (m_prop == nullptr) {
3002  m_prop = new Property();
3003  }
3004  }
3005  return m_prop;
3006 }
3007 
3009 {
3010  YARP_UNUSED(prop);
3011  m_stateSemaphore.post();
3012 }
3013 
3014 bool PortCore::removeIO(const Route& route, bool synch)
3015 {
3016  return removeUnit(route, synch);
3017 }
3018 
3019 void PortCore::setName(const std::string& name)
3020 {
3021  m_name = name;
3022 }
3023 
3024 std::string PortCore::getName()
3025 {
3026  return m_name;
3027 }
3028 
3029 int PortCore::getNextIndex()
3030 {
3031  int result = m_counter;
3032  m_counter++;
3033  if (m_counter < 0) {
3034  m_counter = 1;
3035  }
3036  return result;
3037 }
3038 
3040 {
3041  return m_address;
3042 }
3043 
3044 void PortCore::resetPortName(const std::string& str)
3045 {
3046  m_address.setName(str);
3047 }
3048 
3050 {
3051  return m_readableCreator;
3052 }
3053 
3055 {
3056  m_controlRegistration = flag;
3057 }
3058 
3060 {
3061  return m_listening;
3062 }
3063 
3065 {
3066  return m_manual;
3067 }
3068 
3070 {
3071  return m_interrupted;
3072 }
3073 
3074 void PortCore::setTimeout(float timeout)
3075 {
3076  m_timeout = timeout;
3077 }
3078 
3079 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3083 {
3085  if (mutex != nullptr) {
3086  m_old_mutex = mutex;
3087  m_mutexOwned = false;
3088  } else {
3089  m_old_mutex = new yarp::os::Mutex();
3090  m_mutexOwned = true;
3091  }
3092  return true;
3093 }
3095 #endif // YARP_NO_DEPRECATED
3096 
3097 bool PortCore::setCallbackLock(std::mutex* mutex)
3098 {
3100  if (mutex != nullptr) {
3101  m_mutex = mutex;
3102  m_mutexOwned = false;
3103  } else {
3104  m_mutex = new std::mutex;
3105  m_mutexOwned = true;
3106  }
3107  return true;
3108 }
3109 
3111 {
3112  if (m_mutexOwned && (m_mutex != nullptr)) {
3113  delete m_mutex;
3114  }
3115  m_mutex = nullptr;
3116 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3117  m_old_mutex = nullptr;
3118 #endif // YARP_NO_DEPRECATED
3119  m_mutexOwned = false;
3120  return true;
3121 }
3122 
3124 {
3125  if (m_mutex == nullptr) {
3126 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3127  if (m_old_mutex == nullptr) {
3128  return false;
3129  }
3130  m_old_mutex->lock();
3131  return true;
3132 #else // YARP_NO_DEPRECATED
3133  return false;
3134 #endif // YARP_NO_DEPRECATED
3135  }
3136  m_mutex->lock();
3137  return true;
3138 }
3139 
3141 {
3142  if (m_mutex == nullptr) {
3143 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3144  if (m_old_mutex == nullptr) {
3145  return true;
3146  }
3147  return m_old_mutex->try_lock();
3148 #else // YARP_NO_DEPRECATED
3149  return true;
3150 #endif // YARP_NO_DEPRECATED
3151  }
3152  return m_mutex->try_lock();
3153 }
3154 
3156 {
3157  if (m_mutex == nullptr) {
3158 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3159  if (m_old_mutex == nullptr) {
3160  return;
3161  }
3162  return m_old_mutex->unlock();
3163 #else // YARP_NO_DEPRECATED
3164  return;
3165 #endif // YARP_NO_DEPRECATED
3166  }
3167  m_mutex->unlock();
3168 }
3169 
3171 {
3172  return m_modifier;
3173 }
3174 
3176 {
3177  m_typeMutex.lock();
3178  if (!m_checkedType) {
3179  if (!m_type.isValid()) {
3180  m_type = reader.getReadType();
3181  }
3182  m_checkedType = true;
3183  }
3184  m_typeMutex.unlock();
3185 }
3186 
3188 {
3189  m_typeMutex.lock();
3190  Type t = m_type;
3191  m_typeMutex.unlock();
3192  return t;
3193 }
3194 
3195 void PortCore::promiseType(const Type& typ)
3196 {
3197  m_typeMutex.lock();
3198  m_type = typ;
3199  m_typeMutex.unlock();
3200 }
yarp::os::DummyConnector
A dummy connection to test yarp::os::Portable implementations.
Definition: DummyConnector.h:38
yarp::os::Route::toString
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:141
yarp::os::impl::PortCoreUnit::setDoomed
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:100
yarp::os::impl::PortCore::send
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
Definition: PortCore.cpp:1295
yarp::os::impl::BufferedConnectionWriter::appendLine
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
Definition: BufferedConnectionWriter.cpp:304
yarp::os::impl::PortCore::acquireProperties
Property * acquireProperties(bool readOnly)
Definition: PortCore.cpp:2997
YARP_WARNING_PUSH
#define YARP_WARNING_PUSH
Definition: system.h:334
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:72
yarp::os::Route::swapNames
void swapNames()
Swap from and to names.
Definition: Route.cpp:136
yarp::os::Value::asVocab
virtual std::int32_t asVocab() const
Get vocabulary identifier as an integer.
Definition: Value.cpp:231
PORTCORE_SEND_LOG
#define PORTCORE_SEND_LOG
Definition: PortCore.h:45
yarp::os::Bottle::toString
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
RosNameSpace.h
yarp::os::Property::put
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:998
yarp::os::impl::ThreadImpl::getPolicy
int getPolicy()
Definition: ThreadImpl.cpp:307
yarp::os::impl::PortCore::setAdminReadHandler
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
Definition: PortCore.cpp:190
yarp::os::createVocab
constexpr yarp::conf::vocab32_t createVocab(char a, char b=0, char c=0, char d=0)
Definition: Vocab.h:22
yarp::os::impl::StreamConnectionReader::reset
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
Definition: StreamConnectionReader.cpp:49
Network.h
yarp::os::PortReport::report
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
PORTCORE_IS_RPC
#define PORTCORE_IS_RPC
Definition: PortCore.h:49
yarp::os::impl::ConnectionRecorder::init
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
Definition: ConnectionRecorder.cpp:23
yarp::os::PortReader::read
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
yarp::os::Route::getCarrierName
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:126
yarp::os::impl::PortCore::resume
void resume()
Undo an interrupt()
Definition: PortCore.cpp:369
yarp::os::ContactStyle
Definition: ContactStyle.h:26
yarp::os::NetworkBase::getQueryBypass
static NameStore * getQueryBypass()
Definition: Network.cpp:1415
yarp::os::NestedContact
A placeholder for rich contact information.
Definition: NestedContact.h:26
yarp::os::impl::PortCoreUnit::isFinished
virtual bool isFinished()
Definition: PortCoreUnit.h:74
yarp::os::PortInfo::PORTINFO_CONNECTION
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:43
yarp::os::impl::ThreadImpl::start
virtual bool start()
Definition: ThreadImpl.cpp:187
yarp::os::impl::PortCorePackets::checkPacket
bool checkPacket(PortCorePacket *packet)
Move a packet to the inactive state if it has finished being sent on all connections.
Definition: PortCorePackets.cpp:82
yarp::os::impl::PortCore::lockCallback
bool lockCallback()
Definition: PortCore.cpp:3123
yarp::os::Bottle::size
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
yarp::os::impl::BufferedConnectionWriter::write
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
Definition: BufferedConnectionWriter.cpp:344
yarp::os::impl::PortCoreUnit::setPupped
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
Definition: PortCoreUnit.h:251
yarp::os::impl::PortCoreUnit
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:28
yarp::os::Carrier
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:47
yarp::os::OutputProtocol
The output side of an active connection between two ports.
Definition: OutputProtocol.h:32
t
float t
Definition: FfmpegWriter.cpp:74
yarp::os::QosStyle::getDSCPByVocab
static PacketPriorityDSCP getDSCPByVocab(int vocab)
returns the IPV4/6 DSCP value given as DSCP code
Definition: QosStyle.cpp:158
PORTCORE_SEND_NORMAL
#define PORTCORE_SEND_NORMAL
Definition: PortCore.h:44
PORTCORE_IS_INPUT
#define PORTCORE_IS_INPUT
Definition: PortCore.h:50
yarp::os::impl::PortCore::isManual
bool isManual() const
Definition: PortCore.cpp:3064
yarp::os::Carriers::listen
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
Definition: Carriers.cpp:249
yarp::os::InputProtocol::close
virtual void close()=0
Negotiate an end to operations.
yarp::os::Face::getLocalAddress
virtual Contact getLocalAddress() const
Get address after open(), if more specific that the address provided to open() - otherwise an invalid...
Definition: Face.h:86
yCWarning
#define yCWarning(component,...)
Definition: LogComponent.h:146
yarp::os::OutputProtocol::getOutputStream
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
yarp::os::Type
Definition: Type.h:23
yarp::os::impl::PortCoreUnit::getRoute
virtual Route getRoute()
Definition: PortCoreUnit.h:83
yarp::os::impl::BufferedConnectionWriter::restart
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
Definition: BufferedConnectionWriter.cpp:66
YARP_UNUSED
#define YARP_UNUSED(var)
Definition: api.h:159
yarp::conf::environment::getEnvironment
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: environment.h:31
yarp::os::SystemInfo::getPlatformInfo
static PlatformInfo getPlatformInfo()
getPlatformInfo
Definition: SystemInfo.cpp:591
yarp::os::NetworkBase::write
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition: Network.cpp:1229
yarp::os::SystemInfo::ProcessInfo::pid
int pid
Definition: SystemInfo.h:121
yarp::os::Property::fromString
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
yarp::os::impl::PortDataModifier::outputModifier
yarp::os::Carrier * outputModifier
Definition: PortCore.h:145
yarp::os::impl::PortCore::getEnvelope
std::string getEnvelope()
Definition: PortCore.cpp:1522
yarp::os::impl::PortDataModifier
This is the heart of a yarp port.
Definition: PortCore.h:111
yarp::os::StringOutputStream::toString
std::string toString() const
Definition: StringOutputStream.h:33
yarp::os::impl::PortDataModifier::outputMutex
std::mutex outputMutex
Definition: PortCore.h:147
yarp::os::Carrier::modifyOutgoingData
const PortWriter & modifyOutgoingData(const PortWriter &writer) override
Modify outgoing payload data, if appropriate.
Definition: Carrier.cpp:73
yarp::os::Name::toAddress
Contact toAddress() const
Create an address from the name.
Definition: Name.cpp:30
yarp::os::impl::PortCore::start
bool start() override
Begin main thread.
Definition: PortCore.cpp:322
yarp::os::OutputStream
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:24
yarp::os::impl::PortCore::unlockCallback
void unlockCallback()
Definition: PortCore.cpp:3155
yarp::os::NetworkBase::writeToNameServer
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
Definition: Network.cpp:1986
yarp::os::impl::PortCore::getFlags
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:308
yarp::os::DummyConnector::getReader
ConnectionReader & getReader()
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
Definition: DummyConnector.cpp:113
yarp::os::Contact::setName
void setName(const std::string &name)
Set the name associated with this Contact.
Definition: Contact.cpp:225
yarp::os::PortInfo::PORTINFO_MISC
@ PORTINFO_MISC
Unspecified information.
Definition: PortInfo.h:46
yarp::os::Carrier::isConnectionless
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
yarp::os::Carrier::configureFromProperty
virtual bool configureFromProperty(yarp::os::Property &options)
Definition: Carrier.cpp:111
yarp::os::StringInputStream::add
void add(const std::string &txt)
Definition: StringInputStream.h:47
yarp::os::Carriers::chooseCarrier
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition: Carriers.cpp:232
InputProtocol.h
yarp::os::OutputProtocol::getRoute
virtual const Route & getRoute() const =0
yarp::os::SystemInfo::PlatformInfo
The PlatformInfo struct holds the operating system information.
Definition: SystemInfo.h:83
yarp::os::impl::PortCore::removeCallbackLock
bool removeCallbackLock()
Definition: PortCore.cpp:3110
yarp::os::Carriers::getCarrierTemplate
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
Definition: Carriers.cpp:237
yarp::os::impl::BufferedConnectionWriter
A helper for creating cached object descriptions.
Definition: BufferedConnectionWriter.h:47
yarp::os::Bottle::fromString
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:207
yarp::os::Mutex::try_lock
bool try_lock()
Lock the associated resource if possible.
Definition: Mutex.cpp:39
yarp::os::Carrier::isPush
bool isPush() const override
Check if carrier is "push" or "pull" style.
Definition: Carrier.cpp:25
__pc_rpc
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
Definition: PortCore.cpp:1542
yarp::os::impl::PortCore::isListening
bool isListening() const
Definition: PortCore.cpp:3059
yarp::os::NestedContact::getNodeName
std::string getNodeName() const
Definition: NestedContact.cpp:167
yarp::os::impl::PortCore::getName
std::string getName()
Definition: PortCore.cpp:3024
yarp::os::Carrier::acceptOutgoingData
bool acceptOutgoingData(const PortWriter &writer) override
Determine whether outgoing data should be accepted.
Definition: Carrier.cpp:98
yarp::os::Type::getName
std::string getName() const
Definition: Type.cpp:142
yarp::os::InputProtocol::setTimeout
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
yarp::os::Property::find
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1034
StreamConnectionReader.h
ConnectionRecorder.h
PortCoreOutputUnit.h
yarp::os::impl::StreamConnectionReader
Lets Readable objects read from the underlying InputStream associated with the connection between two...
Definition: StreamConnectionReader.h:40
yarp::os::getpid
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:94
yarp::os::impl::PortCorePacket::dec
void dec()
Decrement the usage count for this messagae.
Definition: PortCorePacket.h:80
yarp::os::StringOutputStream::reset
void reset()
Definition: StringOutputStream.h:38
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
yarp::os::impl::PortDataModifier::releaseOutModifier
void releaseOutModifier()
Definition: PortCore.h:126
yarp::os::OutputProtocol::getInput
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
yarp::os::PortReport
Definition: PortReport.h:30
yarp::os::Carrier::setCarrierParams
void setCarrierParams(const Property &params) override
Configure carrier from port administrative commands.
Definition: Carrier.cpp:118
yarp::os::impl::PortCore::run
void run() override
The body of the main thread.
Definition: PortCore.cpp:207
yarp::os::Route
Information about a connection between two ports.
Definition: Route.h:31
LogComponent.h
rpc
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
Definition: RosLookup.cpp:22
yarp::os::PortWriter
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:26
yarp::os::Semaphore::wait
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:99
yarp::os::impl::PortCore::setCallbackLock
bool setCallbackLock(yarp::os::Mutex *mutex)
Definition: PortCore.cpp:3082
yarp::os::Contact::getRegName
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:220
yarp::os::StringOutputStream
An OutputStream that produces a string.
Definition: StringOutputStream.h:24
yarp::os::impl::PortCore::removeIO
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
Definition: PortCore.cpp:3014
yarp::os::PortInfo
Definition: PortInfo.h:28
yarp::os::impl::PortCorePacket::inc
void inc()
Increment the usage count for this messagae.
Definition: PortCorePacket.h:72
yarp::os::Bottle::find
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Bottle.cpp:290
yarp::os::PortInfo::carrierName
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:69
Name.h
yarp::os::InputProtocol::getOutput
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
yarp::os::impl::PortCoreUnit::isInput
virtual bool isInput()
Definition: PortCoreUnit.h:57
yarp::os::impl::PortCore::notifyCompletion
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1484
yarp::os::impl::PortCore::PortCore
PortCore()
Constructor.
Definition: PortCore.cpp:62
yarp::os::impl::PortCore::describe
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
Definition: PortCore.cpp:1092
yarp::os::Face::close
virtual void close()=0
Stop listening.
yarp::os::Contact::toURI
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:316
yarp::os::Bottle::addDict
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Definition: Bottle.cpp:191
yarp::os::Bottle::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Bottle.cpp:280
yarp::os::impl::PortCore::setName
void setName(const std::string &name)
Set the name of this port.
Definition: PortCore.cpp:3019
yarp::os::Bottle::findGroup
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Bottle.cpp:305
yarp::os::Face::write
virtual OutputProtocol * write(const Contact &address)=0
Try to reach out and talk to someone.
PortCoreInputUnit.h
yarp::os::Value::isString
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:159
yarp::os::NetworkBase::disconnectInput
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1555
yarp::os::NetworkBase::queryName
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition: Network.cpp:998
yarp::os::impl::PortCore::promiseType
void promiseType(const Type &typ)
Definition: PortCore.cpp:3195
yarp::os::impl::PortCorePacket
A single message, potentially being transmitted on multiple connections.
Definition: PortCorePacket.h:24
yarp::os::Semaphore::post
void post()
Increment the counter.
Definition: Semaphore.cpp:114
yarp::os::OutputProtocol::open
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
yarp::os::PortInfo::targetName
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:66
yarp::os::impl::ThreadImpl::setPriority
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:249
yarp::os::Semaphore::check
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:109
yarp::os::Type::isValid
bool isValid() const
Definition: Type.cpp:157
yarp::os::PortReader
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:27
yarp::os::Bottle::get
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
yarp::os::impl::PortDataModifier::releaseInModifier
void releaseInModifier()
Definition: PortCore.h:135
yarp::os::ContactStyle::quiet
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:39
yarp::os::Bottle::addList
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:185
yarp::os::Contact::getPort
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
yarp::os::Property::toString
std::string toString() const override
Return a standard text representation of the content of the object.
Definition: Property.cpp:1052
yarp::os::Bottle::write
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:233
yarp::os::impl::ThreadImpl::id
std::thread::id id
Definition: ThreadImpl.h:70
yarp::os::NetworkBase::unregisterName
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1026
yarp::os::ConnectionWriter
An interface for writing to a network connection.
Definition: ConnectionWriter.h:39
yarp::os::impl::PortCore::getEventCount
int getEventCount()
A diagnostic for testing purposes.
Definition: PortCore.cpp:571
yarp::os::SystemInfo::ProcessInfo::name
std::string name
Definition: SystemInfo.h:117
yarp::os::impl::ThreadImpl::tid
long tid
Definition: ThreadImpl.h:69
yarp::os::PortInfo::tag
int tag
Type of information.
Definition: PortInfo.h:51
yarp::os::Contact::fromString
static Contact fromString(const std::string &txt)
Factory method.
Definition: Contact.cpp:142
DummyConnector.h
yarp::os::impl::ConnectionRecorder::fini
void fini()
Call this when all reading/writing has been done.
Definition: ConnectionRecorder.cpp:32
yarp::os::NetworkBase::getLocalMode
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
Definition: Network.cpp:1057
BufferedConnectionWriter.h
yarp::os::DummyConnector::getWriter
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
Definition: DummyConnector.cpp:108
yarp::os::Contact::getCarrier
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition: Contact.cpp:253
yarp::os::impl::PortCore::readBlock
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
Definition: PortCore.cpp:1246
yarp::os::impl::PortCore::sendHelper
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
Definition: PortCore.cpp:1319
__tcp_check
static bool __tcp_check(const Contact &c)
Definition: PortCore.cpp:1558
yarp::os::impl::PortCore::reportUnit
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2942
yarp::os::Value::asString
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
yarp::os::PortReaderCreator
A creator for readers.
Definition: PortReaderCreator.h:33
yarp::os::SystemInfo::getProcessInfo
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
Definition: SystemInfo.cpp:799
yarp::os::Connection::isPush
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
yarp::os::impl::ThreadImpl::join
int join(double seconds=-1)
Definition: ThreadImpl.cpp:123
yarp::os::PortInfo::incoming
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:54
yarp::os::impl::PortCore::removeOutput
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
Definition: PortCore.cpp:1060
yarp::os::impl::PortCore::getPortModifier
yarp::os::impl::PortDataModifier & getPortModifier()
Definition: PortCore.cpp:3170
yarp::os::Name::getCarrierModifier
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition: Name.cpp:47
yarp::os::OutputProtocol::getConnection
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
yarp::os::OutputProtocol::rename
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
yarp::os::impl::PortCore::setControlRegistration
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
Definition: PortCore.cpp:3054
yarp::os::PortInfo::portName
std::string portName
Name of port.
Definition: PortInfo.h:60
yarp::os::impl::BufferedConnectionWriter::toString
std::string toString() const
Definition: BufferedConnectionWriter.cpp:475
yarp::os::impl::PortCoreUnit::isOutput
virtual bool isOutput()
Definition: PortCoreUnit.h:65
yarp::os::impl::PortCoreUnit::isPupped
bool isPupped() const
Definition: PortCoreUnit.h:225
yarp::os::impl::PortCore::setReadCreator
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
Definition: PortCore.cpp:198
yarp::os::Bottle::addInt32
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:143
yarp::os::Vocab::encode
NetInt32 encode(const std::string &str)
Convert a string into a vocabulary identifier.
Definition: Vocab.cpp:14
yarp::os::NetworkBase::initialized
static bool initialized()
Returns true if YARP has been fully initialized.
Definition: Network.cpp:1392
yarp::os::impl::PortCoreUnit::isDoomed
bool isDoomed()
Definition: PortCoreUnit.h:92
yarp::os::OutputProtocol::attachPort
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
yarp::os::impl::PortDataModifier::inputModifier
yarp::os::Carrier * inputModifier
Definition: PortCore.h:146
yarp::os::impl::PortCore::getReadCreator
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Definition: PortCore.cpp:3049
yarp::os::QosStyle::PacketPriorityDSCP
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
Definition: QosStyle.h:47
yarp::os::impl::PortCore::~PortCore
~PortCore()
Destructor.
Definition: PortCore.cpp:105
yarp::os::Name
Simple abstraction for a YARP port name.
Definition: Name.h:21
yarp::os::StringInputStream
An InputStream that reads from a string.
Definition: StringInputStream.h:24
yarp::os::PortInfo::message
std::string message
A human-readable description of contents.
Definition: PortInfo.h:72
yarp::os::Carriers::connect
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
Definition: Carriers.cpp:281
yarp::os::impl::PortCore::manualStart
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
Definition: PortCore.cpp:354
yarp::os::Property::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
yarp::os::ConnectionReader::getWriter
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
yarp::os::impl::PortDataModifier::inputMutex
std::mutex inputMutex
Definition: PortCore.h:148
yarp::os::PortReader::getReadType
virtual Type getReadType() const
Definition: PortReader.cpp:15
yarp::os::impl::PortCoreUnit::getIndex
int getIndex()
Definition: PortCoreUnit.h:198
yarp::os::impl::PortCore::releaseProperties
void releaseProperties(Property *prop)
Definition: PortCore.cpp:3008
yarp::os::PortWriter::write
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
yarp::os::Bottle::addString
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
yarp::os::InputProtocol::attachPort
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
yarp::os::OutputProtocol::isOk
virtual bool isOk() const =0
Check if the connection is valid and can be used.
yarp::os::impl::PortCorePackets::getFreePacket
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
Definition: PortCorePackets.cpp:38
yarp::os::OutputProtocol::close
virtual void close()=0
Negotiate an end to operations.
yarp::os::Bottle::addVocab
void addVocab(int x)
Places a vocabulary item in the bottle, at the end of the list.
Definition: Bottle.cpp:167
yarp::os::impl::PortCoreUnit::getMode
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
Definition: PortCoreUnit.h:211
yarp::os::impl::PortCore::isWriting
bool isWriting()
Check if a message is currently being sent.
Definition: PortCore.cpp:1443
yarp::os::impl::PortCore::tryLockCallback
bool tryLockCallback()
Definition: PortCore.cpp:3140
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
YARP_WARNING_POP
#define YARP_WARNING_POP
Definition: system.h:335
yarp::os::OutputProtocol::setTimeout
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
yarp::os::impl::PortCore::getAddress
const Contact & getAddress() const
Get the address associated with the port.
Definition: PortCore.cpp:3039
yarp::os::ConnectionReader
An interface for reading from a network connection.
Definition: ConnectionReader.h:39
yarp::os::impl::PortCore::resetReportCallback
void resetReportCallback()
Reset the callback to be notified of changes in port status.
Definition: PortCore.cpp:1225
StringOutputStream.h
yarp::os::impl::PortCore::addOutput
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition: PortCore.cpp:889
yarp::os::SystemInfo::ProcessInfo::schedPriority
int schedPriority
Definition: SystemInfo.h:120
yarp::os::NetType::toString
static std::string toString(int x)
Definition: NetType.cpp:138
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::PortWriter::onCommencement
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
Definition: PortWriter.cpp:20
yarp::os::Value::asInt32
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:207
yarp::os::impl::PortCoreUnit::send
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
Definition: PortCoreUnit.h:130
yarp::os::impl::PortCoreOutputUnit
Manager for a single output from a port.
Definition: PortCoreOutputUnit.h:28
yarp::os::impl::PortCore::getInputCount
int getInputCount()
Check how many input connections there are.
Definition: PortCore.cpp:1465
yarp::os::ContactStyle::carrier
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:56
yarp::os::SystemInfo::ProcessInfo
The ProcessInfo struct provides the operating system process information.
Definition: SystemInfo.h:115
yarp::os::impl::PortCore::removeInput
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
Definition: PortCore.cpp:1076
yarp::os::impl::PortCore::setEnvelope
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
Definition: PortCore.cpp:1508
yarp::os
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::impl::ThreadImpl::getPriority
int getPriority()
Definition: ThreadImpl.cpp:280
PORTCORE_IS_OUTPUT
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:51
PortCore.h
yarp::os::Contact::isValid
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
yarp::os::impl::PortCoreUnit::setCarrierParams
virtual void setCarrierParams(const yarp::os::Property &params)
Set arbitrary parameters for this connection.
Definition: PortCoreUnit.h:261
yarp::os::SystemInfo::ProcessInfo::schedPolicy
int schedPolicy
Definition: SystemInfo.h:119
yarp
Definition: environment.h:18
yarp::os::impl::PortCoreInputUnit
Manager for a single input to a port.
Definition: PortCoreInputUnit.h:26
yarp::os::impl::PortCore::report
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1232
PlatformUnistd.h
yarp::conf::vocab32_t
std::int32_t vocab32_t
Definition: numeric.h:52
environment.h
yarp::os::Carrier::getCarrierParams
void getCarrierParams(Property &params) const override
Get carrier configuration and deliver it by port administrative commands.
Definition: Carrier.cpp:123
yarp::os::Contact::getHost
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::os::impl::PortCore::setReportCallback
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
Definition: PortCore.cpp:1216
yarp::os::Bottle::read
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:38
yarp::os::Route::getToName
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
yarp::os::Route::setToContact
void setToContact(const Contact &toContact)
Set the destination contact of the route.
Definition: Route.cpp:121
yarp::os::Value::asList
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:243
yarp::os::impl::PortCore::getOutputCount
int getOutputCount()
Check how many output connections there are.
Definition: PortCore.cpp:1474
yarp::os::ContactStyle::timeout
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:50
Time.h
yarp::os::impl::PortCorePacket::setContent
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
Definition: PortCorePacket.h:109
yarp::os::Face::read
virtual InputProtocol * read()=0
Block and wait for someone to talk to us.
yarp::os::Mutex
Basic wrapper for mutual exclusion.
Definition: Mutex.h:34
yarp::os::InputProtocol
The input side of an active connection between two ports.
Definition: InputProtocol.h:37
yarp::os::Bottle::add
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
Definition: Bottle.cpp:339
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
yarp::os::Mutex::lock
void lock()
Lock the associated resource, waiting if necessary.
Definition: Mutex.cpp:34
yarp::os::impl::PortCore::isInterrupted
bool isInterrupted() const
Definition: PortCore.cpp:3069
yarp::os::impl::PortCore::checkType
void checkType(PortReader &reader)
Definition: PortCore.cpp:3175
yarp::os::impl::PortCore::interrupt
void interrupt()
Prepare the port to be shut down.
Definition: PortCore.cpp:375
yarp::os::impl::PortCore::setReadHandler
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
Definition: PortCore.cpp:182
yarp::os::impl::PortCoreUnit::getCarrierParams
virtual void getCarrierParams(yarp::os::Property &params)
Definition: PortCoreUnit.h:269
yarp::os::impl::ThreadImpl::close
virtual void close()
Definition: ThreadImpl.cpp:158
yarp::os::impl::ConnectionRecorder
A helper for recording entire message/reply transactions.
Definition: ConnectionRecorder.h:27
yarp::os::Value
A single value (typically within a Bottle).
Definition: Value.h:46
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
yarp::os::impl::PortCoreUnit::isBusy
virtual bool isBusy()
Definition: PortCoreUnit.h:167
PortInfo.h
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
YARP_DISABLE_DEPRECATED_WARNING
#define YARP_DISABLE_DEPRECATED_WARNING
Definition: system.h:336
yarp::os::impl
yarp::os::impl::PortCore::getType
yarp::os::Type getType()
Definition: PortCore.cpp:3187
yarp::os::ConnectionReader::requestDrop
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
yarp::os::RosNameSpace::fromRosName
static std::string fromRosName(const std::string &name)
Definition: RosNameSpace.cpp:678
yarp::os::impl::PortCoreUnit::getPupString
std::string getPupString() const
Definition: PortCoreUnit.h:237
yarp::os::Contact::setTimeout
void setTimeout(float timeout)
Set timeout for this Contact.
Definition: Contact.cpp:285
yarp::os::impl::ThreadImpl::getTid
long getTid()
Definition: ThreadImpl.cpp:334
yarp::os::impl::PortCore::adminBlock
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
Definition: PortCore.cpp:1710
Bottle.h
yarp::os::SystemInfo::PlatformInfo::name
std::string name
Definition: SystemInfo.h:85
yarp::os::SystemInfo::ProcessInfo::arguments
std::string arguments
Definition: SystemInfo.h:118
yarp::os::QosStyle::DSCP_Invalid
@ DSCP_Invalid
Definition: QosStyle.h:49
yarp::os::OutputStream::setTypeOfService
virtual bool setTypeOfService(int tos)
Definition: OutputStream.cpp:45
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:34
yarp::os::impl::PortCore::close
void close() override
Shut down port.
Definition: PortCore.cpp:309
yarp::os::impl::PortCore::listen
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
Definition: PortCore.cpp:112
yarp::os::Mutex::unlock
void unlock()
Unlock the associated resource.
Definition: Mutex.cpp:44
SystemInfo.h
yarp::os::NetworkBase::disconnect
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition: Network.cpp:703
yarp::os::impl::PortCore::resetPortName
void resetPortName(const std::string &str)
Definition: PortCore.cpp:3044
yarp::os::NetInt32
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:33
yarp::os::impl::PortCore::setTimeout
void setTimeout(float timeout)
Definition: PortCore.cpp:3074
yarp::os::OutputStream::getTypeOfService
virtual int getTypeOfService()
Definition: OutputStream.cpp:51
yarp::os::PortInfo::sourceName
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:63