YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
PortCore.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
10
11#include <yarp/os/Bottle.h>
14#include <yarp/os/Name.h>
15#include <yarp/os/Network.h>
16#include <yarp/os/PortInfo.h>
19#include <yarp/os/SystemInfo.h>
20#include <yarp/os/Time.h>
28
29#include <cstdio>
30#include <functional>
31#include <random>
32#include <regex>
33#include <vector>
34
35#ifdef YARP_HAS_ACE
36# include <ace/INET_Addr.h>
37# include <ace/Sched_Params.h>
38// In one the ACE headers there is a definition of "main" for WIN32
39# ifdef main
40# undef main
41# endif
42#endif
43
44#if defined(__linux__) // used for configuring scheduling properties
45# include <dirent.h>
46# include <sys/types.h>
47# include <unistd.h>
48#endif
49
50
51using namespace yarp::os::impl;
52using namespace yarp::os;
53using namespace yarp;
54
55namespace {
56YARP_OS_LOG_COMPONENT(PORTCORE, "yarp.os.impl.PortCore")
57} // namespace
58
59PortCore::PortCore() = default;
60
66
67
68bool PortCore::listen(const Contact& address, bool shouldAnnounce)
69{
70 yCIDebug(PORTCORE, getName(), "Starting listening on %s", address.toURI().c_str());
71 // If we're using ACE, we really need to have it initialized before
72 // this point.
74 yCIError(PORTCORE, getName(), "YARP not initialized; create a yarp::os::Network object before using ports");
75 return false;
76 }
77
78 yCITrace(PORTCORE, getName(), "listen");
79
80 {
81 // Critical section
82 std::lock_guard<std::mutex> lock(m_stateMutex);
83
84 // This method assumes we are not already on the network.
85 // We can assume this because it is not a user-facing class,
86 // and we carefully never call this method again without
87 // calling close().
88 yCIAssert(PORTCORE, getName(), !m_listening);
89 yCIAssert(PORTCORE, getName(), !m_running);
90 yCIAssert(PORTCORE, getName(), !m_closing.load());
91 yCIAssert(PORTCORE, getName(), !m_finished.load());
92 yCIAssert(PORTCORE, getName(), m_face == nullptr);
93
94 // Try to put the port on the network, using the user-supplied
95 // address (which may be incomplete). You can think of
96 // this as getting a server socket.
97 m_address = address;
98 setName(address.getRegName());
99 if (m_timeout > 0) {
100 m_address.setTimeout(m_timeout);
101 }
102 m_face = Carriers::listen(m_address);
103
104 // We failed, abort.
105 if (m_face == nullptr) {
106 return false;
107 }
108
109 // Update our address if it was incomplete.
110 if (m_address.getPort() <= 0) {
111 m_address = m_face->getLocalAddress();
112 if (m_address.getRegName() == "...") {
113 m_address.setName(std::string("/") + m_address.getHost() + "_" + yarp::conf::numeric::to_string(m_address.getPort()));
114 setName(m_address.getRegName());
115 }
116 }
117
118 // Move into listening phase
119 m_listening.store(true);
120 }
121
122 // Now that we are on the network, we can let the name server know this.
123 if (shouldAnnounce) {
125 std::string portName = address.getRegName();
126 Bottle cmd;
127 Bottle reply;
128 cmd.addString("announce");
129 cmd.addString(portName);
130 ContactStyle style;
131 NetworkBase::writeToNameServer(cmd, reply, style);
132 }
133 }
134
135 // Success!
136 return true;
137}
138
139
141{
142 // Don't even try to do this when the port is hot, it'll burn you
143 yCIAssert(PORTCORE, getName(), !m_running.load());
144 yCIAssert(PORTCORE, getName(), m_reader == nullptr);
145 m_reader = &reader;
146}
147
149{
150 // Don't even try to do this when the port is hot, it'll burn you
151 yCIAssert(PORTCORE, getName(), !m_running.load());
152 yCIAssert(PORTCORE, getName(), m_adminReader == nullptr);
153 m_adminReader = &reader;
154}
155
157{
158 // Don't even try to do this when the port is hot, it'll burn you
159 yCIAssert(PORTCORE, getName(), !m_running.load());
160 yCIAssert(PORTCORE, getName(), m_readableCreator == nullptr);
161 m_readableCreator = &creator;
162}
163
164
166{
167 yCITrace(PORTCORE, getName(), "run");
168
169 // This is the server thread for the port. We listen on
170 // the network and handle any incoming connections.
171 // We don't touch those connections, just shove them
172 // in a list and move on. It is important that this
173 // thread doesn't make a connecting client wait just
174 // because some other client is slow.
175
176 // We assume that listen() has succeeded and that
177 // start() has been called.
178 yCIAssert(PORTCORE, getName(), m_listening.load());
179 yCIAssert(PORTCORE, getName(), !m_running.load());
180 yCIAssert(PORTCORE, getName(), !m_closing.load());
181 yCIAssert(PORTCORE, getName(), !m_finished.load());
182 yCIAssert(PORTCORE, getName(), m_starting.load());
183
184 // Enter running phase
185 {
186 // Critical section
187 std::lock_guard<std::mutex> lock(m_stateMutex);
188 m_running.store(true);
189 m_starting.store(false);
190 }
191
192 // Notify the start() thread that the run() thread is running
193 m_stateCv.notify_one();
194
195 yCITrace(PORTCORE, getName(), "run running");
196
197 // Enter main loop, where we block on incoming connections.
198 // The loop is exited when PortCore#closing is set. One last
199 // connection will be needed to de-block this thread and ensure
200 // that it checks PortCore#closing.
201 bool shouldStop = false;
202 while (!shouldStop) {
203
204 // Block and wait for a connection
205 InputProtocol* ip = m_face->read();
206
207 {
208 // Critical section
209 std::lock_guard<std::mutex> lock(m_stateMutex);
210
211 // Attach the connection to this port and update its timeout setting
212 if (ip != nullptr) {
213 ip->attachPort(m_contactable);
214 yCIDebug(PORTCORE, getName(), "received something");
215 if (m_timeout > 0) {
216 ip->setTimeout(m_timeout);
217 }
218 }
219
220 // Check whether we should shut down
221 shouldStop |= m_closing.load();
222
223 // Increment a global count of connection events
224 m_events++;
225 }
226
227 // It we are not shutting down, spin off the connection.
228 // It starts life as an input connection (although it
229 // may later morph into an output).
230 if (!shouldStop) {
231 if (ip != nullptr) {
232 addInput(ip);
233 }
234 yCIDebug(PORTCORE, getName(), "spun off a connection");
235 ip = nullptr;
236 }
237
238 // If the connection wasn't spun off, just shut it down.
239 if (ip != nullptr) {
240 ip->close();
241 delete ip;
242 ip = nullptr;
243 }
244
245 // Remove any defunct connections.
246 reapUnits();
247
248 // Notify anyone listening for connection changes.
249 std::lock_guard<std::mutex> lock(m_stateMutex);
250 m_connectionListeners = 0;
251 m_connectionChangeCv.notify_all();
252 }
253
254 yCITrace(PORTCORE, getName(), "run closing");
255
256 // The server thread is shutting down.
257 std::lock_guard<std::mutex> lock(m_stateMutex);
258 m_connectionListeners = 0;
259 m_connectionChangeCv.notify_all();
260 m_finished.store(true);
261}
262
263
265{
266 closeMain();
267
268 if (m_prop != nullptr) {
269 delete m_prop;
270 m_prop = nullptr;
271 }
272 m_modifier.releaseOutModifier();
273 m_modifier.releaseInModifier();
274}
275
276
278{
279 yCITrace(PORTCORE, getName(), "start");
280
281 // This wait will, on success, be matched by a post in run()
282 std::unique_lock<std::mutex> lock(m_stateMutex);
283
284 // We assume that listen() has been called.
285 yCIAssert(PORTCORE, getName(), m_listening.load());
286 yCIAssert(PORTCORE, getName(), !m_running.load());
287 yCIAssert(PORTCORE, getName(), !m_starting.load());
288 yCIAssert(PORTCORE, getName(), !m_finished.load());
289 yCIAssert(PORTCORE, getName(), !m_closing.load());
290
291 m_starting.store(true);
292
293 // Start the server thread.
294 bool started = ThreadImpl::start();
295 if (!started) {
296 // run() won't be happening
297 yAssert(false);
298
299 } else {
300 // Wait for the signal from the run thread before returning.
301 m_stateCv.wait(lock, [&]{ return m_running.load(); });
302 yCIAssert(PORTCORE, getName(), m_running.load());
303 }
304 return started;
305}
306
307
308bool PortCore::manualStart(const char* sourceName)
309{
310 yCITrace(PORTCORE, getName(), "manualStart");
311
312 // This variant of start() does not create a server thread.
313 // That is appropriate if we never expect to receive incoming
314 // connections for any reason. No incoming data, no requests
315 // for state information, no requests to change connections,
316 // nothing. We set the port's name to something fake, and
317 // act like nothing is wrong.
318 m_interruptable = false;
319 m_manual = true;
320 setName(sourceName);
321 return true;
322}
323
324
326{
327 // We are no longer interrupted.
328 m_interrupted = false;
329}
330
332{
333 yCITrace(PORTCORE, getName(), "interrupt");
334
335 // This is a no-op if there is no server thread.
336 if (!m_listening.load()) {
337 return;
338 }
339
340 // Ignore any future incoming data
341 m_interrupted = true;
342
343 // What about data that is already coming in?
344 // If interruptable is not currently set, no worries, the user
345 // did not or will not end up blocked on a read.
346 if (!m_interruptable) {
347 return;
348 }
349
350 // Since interruptable is set, it is possible that the user
351 // may be blocked on a read. We send an empty message,
352 // which is reserved for giving blocking readers a chance to
353 // update their state.
354 {
355 // Critical section
356 std::lock_guard<std::mutex> lock(m_stateMutex);
357 if (m_reader != nullptr) {
358 yCIDebug(PORTCORE, getName(), "sending update-state message to listener");
360 lockCallback();
361 m_reader->read(sbr);
363 }
364 }
365}
366
367
368void PortCore::closeMain()
369{
370 yCITrace(PORTCORE, getName(), "closeMain");
371
372 {
373 // Critical section
374 std::lock_guard<std::mutex> lock(m_stateMutex);
375
376 // We may not have anything to do.
377 if (m_finishing || !(m_running.load() || m_manual)) {
378 yCITrace(PORTCORE, getName(), "closeMain - nothing to do");
379 return;
380 }
381
382 yCITrace(PORTCORE, getName(), "closeMain - Central");
383
384 // Move into official "finishing" phase.
385 m_finishing = true;
386 yCIDebug(PORTCORE, getName(), "now preparing to shut down port");
387 }
388
389 // Start disconnecting inputs. We ask the other side of the
390 // connection to do this, so it won't come as a surprise.
391 // The details of how disconnection works vary by carrier.
392 // While we are doing this, the server thread may be still running.
393 // This is because other ports may need to talk to the server
394 // to organize details of how a connection should be broken down.
395 bool done = false;
396 std::string prevName;
397 while (!done) {
398 done = true;
399 std::string removeName;
400 {
401 // Critical section
402 std::lock_guard<std::mutex> lock(m_stateMutex);
403 for (auto* unit : m_units) {
404 if ((unit != nullptr) && unit->isInput() && !unit->isDoomed()) {
405 Route r = unit->getRoute();
406 std::string s = r.getFromName();
407 if (s.length() >= 1 && s[0] == '/' && s != getName() && s != prevName) {
408 removeName = s;
409 done = false;
410 break;
411 }
412 }
413 }
414 }
415 if (!done) {
416 yCIDebug(PORTCORE, getName(), "requesting removal of connection from %s", removeName.c_str());
418 getName(),
419 true);
420 if (!result) {
423 true);
424 }
426 }
427 }
428
429 // Start disconnecting outputs. We don't negotiate with the
430 // other side, we just break them down.
431 done = false;
432 while (!done) {
433 done = true;
435 {
436 // Critical section
437 std::lock_guard<std::mutex> lock(m_stateMutex);
438 for (auto* unit : m_units) {
439 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
440 removeRoute = unit->getRoute();
441 if (removeRoute.getFromName() == getName()) {
442 done = false;
443 break;
444 }
445 }
446 }
447 }
448 if (!done) {
449 removeUnit(removeRoute, true);
450 }
451 }
452
453 bool stopRunning = m_running.load();
454
455 // If the server thread is still running, we need to bring it down.
456 if (stopRunning) {
457 // Let the server thread know we no longer need its services.
458 m_closing.store(true);
459
460 // Wake up the server thread the only way we can, by sending
461 // a message to it. Then join it, it is done.
462 if (!m_manual) {
463 OutputProtocol* op = m_face->write(m_address);
464 if (op != nullptr) {
465 op->close();
466 delete op;
467 }
468 join();
469 }
470
471 // We should be finished now.
472 yCIAssert(PORTCORE, getName(), m_finished.load());
473
474 // Clean up our connection list. We couldn't do this earlier,
475 // because the server thread was running.
476 closeUnits();
477
478 // Reset some state flags.
479 {
480 // Critical section
481 std::lock_guard<std::mutex> lock(m_stateMutex);
482 m_finished.store(false);
483 m_closing.store(false);
484 m_running.store(false);
485 }
486 }
487
488 // There should be no other threads at this point and we
489 // can stop listening on the network.
490 if (m_listening.load()) {
491 yCIAssert(PORTCORE, getName(), m_face != nullptr);
492 m_face->close();
493 delete m_face;
494 m_face = nullptr;
495 m_listening.store(false);
496 }
497
498 // Check if the client is waiting for input. If so, wake them up
499 // with the bad news. An empty message signifies a request to
500 // check the port state.
501 if (m_reader != nullptr) {
502 yCIDebug(PORTCORE, getName(), "sending end-of-port message to listener");
504 m_reader->read(sbr);
505 m_reader = nullptr;
506 }
507
508 // We may need to unregister the port with the name server.
509 if (stopRunning) {
510 std::string name = getName();
511 if (name != std::string("")) {
512 if (m_controlRegistration) {
514 }
515 }
516 }
517
518 // We are done with the finishing process.
519 m_finishing = false;
520
521 // We are fresh as a daisy.
522 yCIAssert(PORTCORE, getName(), !m_listening.load());
523 yCIAssert(PORTCORE, getName(), !m_running.load());
524 yCIAssert(PORTCORE, getName(), !m_starting.load());
525 yCIAssert(PORTCORE, getName(), !m_closing.load());
526 yCIAssert(PORTCORE, getName(), !m_finished.load());
527 yCIAssert(PORTCORE, getName(), !m_finishing);
528 yCIAssert(PORTCORE, getName(), m_face == nullptr);
529}
530
531
533{
534 // How many times has the server thread spun off a connection.
535 std::lock_guard<std::mutex> lock(m_stateMutex);
536 int ct = m_events;
537 return ct;
538}
539
540
541void PortCore::closeUnits()
542{
543 // Empty the PortCore#units list. This is only possible when
544 // the server thread is finished.
545 yCIAssert(PORTCORE, getName(), m_finished.load());
546
547 // In the "finished" phase, nobody else touches the units,
548 // so we can go ahead and shut them down and delete them.
549 for (auto& i : m_units) {
551 if (unit != nullptr) {
552 yCIDebug(PORTCORE, getName(), "closing a unit");
553 unit->close();
554 yCIDebug(PORTCORE, getName(), "joining a unit");
555 unit->join();
556 delete unit;
557 yCIDebug(PORTCORE, getName(), "deleting a unit");
558 i = nullptr;
559 }
560 }
561
562 // Get rid of all our nulls. Done!
563 m_units.clear();
564}
565
566void PortCore::reapUnits()
567{
568 // Connections that should be shut down get tagged as "doomed"
569 // but aren't otherwise touched until it is safe to do so.
570 if (!m_finished.load()) {
571 std::lock_guard<std::mutex> lock(m_stateMutex);
572 for (auto* unit : m_units) {
573 if ((unit != nullptr) && unit->isDoomed() && !unit->isFinished()) {
574 std::string s = unit->getRoute().toString();
575 yCIDebug(PORTCORE, getName(), "Informing connection %s that it is doomed", s.c_str());
576 unit->close();
577 yCIDebug(PORTCORE, getName(), "Closed connection %s", s.c_str());
578 unit->join();
579 yCIDebug(PORTCORE, getName(), "Joined thread of connection %s", s.c_str());
580 }
581 }
582 }
583 cleanUnits();
584}
585
586void PortCore::cleanUnits(bool blocking)
587{
588 // We will remove any connections that have finished operating from
589 // the PortCore#units list.
590
591 // Depending on urgency, either wait for a safe time to do this
592 // or skip if unsafe.
593 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
594 if (blocking) {
595 lock.lock();
596 } else {
597 bool have_lock = lock.try_lock();
598 if (!have_lock) {
599 return;
600 }
601 }
602 // here we have the lock
603
604 // We will update our connection counts as a by-product.
605 int updatedInputCount = 0;
606 int updatedOutputCount = 0;
608 yCIDebug(PORTCORE, getName(), "/ routine check of connections to this port begins");
609 if (!m_finished.load()) {
610
611 // First, we delete and null out any defunct entries in the list.
612 for (auto& i : m_units) {
614 if (unit != nullptr) {
615 yCIDebug(PORTCORE, getName(), "| checking connection %s %s", unit->getRoute().toString().c_str(), unit->getMode().c_str());
616 if (unit->isFinished()) {
617 std::string con = unit->getRoute().toString();
618 yCIDebug(PORTCORE, getName(), "| removing connection %s", con.c_str());
619 unit->close();
620 unit->join();
621 delete unit;
622 i = nullptr;
623 yCIDebug(PORTCORE, getName(), "| removed connection %s", con.c_str());
624 } else {
625 // No work to do except updating connection counts.
626 if (!unit->isDoomed()) {
627 if (unit->isOutput()) {
629 if (unit->getMode().empty()) {
631 }
632 }
633 if (unit->isInput()) {
634 if (unit->getRoute().getFromName() != "admin") {
636 }
637 }
638 }
639 }
640 }
641 }
642
643 // Now we do some awkward shuffling (list class may be from ACE
644 // or STL, if ACE it is quite limited). We move the nulls to
645 // the end of the list ...
646 size_t rem = 0;
647 for (size_t i2 = 0; i2 < m_units.size(); i2++) {
648 if (m_units[i2] != nullptr) {
649 if (rem < i2) {
650 m_units[rem] = m_units[i2];
651 m_units[i2] = nullptr;
652 }
653 rem++;
654 }
655 }
656
657 // ... Now we get rid of the null entries
658 for (size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
659 m_units.pop_back();
660 }
661 }
662
663 // Finalize the connection counts.
664 m_dataOutputCount = updatedDataOutputCount;
665 lock.unlock();
666
667 m_packetMutex.lock();
668 m_inputCount = updatedInputCount;
669 m_outputCount = updatedOutputCount;
670 m_packetMutex.unlock();
671 yCIDebug(PORTCORE, getName(), "\\ routine check of connections to this port ends");
672}
673
674
675void PortCore::addInput(InputProtocol* ip)
676{
677 yCITrace(PORTCORE, getName(), "addInput");
678
679 // This method is only called by the server thread in its running phase.
680 // It wraps up a network connection as a unit and adds it to
681 // PortCore#units. The unit will have its own thread associated
682 // with it.
683
684 yCIAssert(PORTCORE, getName(), ip != nullptr);
685 std::lock_guard<std::mutex> lock(m_stateMutex);
687 getNextIndex(),
688 ip,
689 false);
690 yCIAssert(PORTCORE, getName(), unit != nullptr);
691 unit->start();
692 m_units.push_back(unit);
693 yCITrace(PORTCORE, getName(), "there are now %zu units", m_units.size());
694}
695
696
698{
699 yCITrace(PORTCORE, getName(), "addOutput");
700
701 // This method is called from threads associated with input
702 // connections.
703 // It wraps up a network connection as a unit and adds it to
704 // PortCore#units. The unit will start with its own thread
705 // associated with it, but that thread will be very short-lived
706 // if the port is not configured to do background writes.
707
708 yCIAssert(PORTCORE, getName(), op != nullptr);
709 if (!m_finished.load()) {
710 std::lock_guard<std::mutex> lock(m_stateMutex);
711 PortCoreUnit* unit = new PortCoreOutputUnit(*this, getNextIndex(), op);
712 yCIAssert(PORTCORE, getName(), unit != nullptr);
713 unit->start();
714 m_units.push_back(unit);
715 }
716}
717
718
719bool PortCore::isUnit(const Route& route, int index)
720{
721 // Check if a connection with a specified route (and optional ID) is present
722 bool needReap = false;
723 if (!m_finished.load()) {
724 for (auto* unit : m_units) {
725 if (unit != nullptr) {
726 Route alt = unit->getRoute();
727 std::string wild = "*";
728 bool ok = true;
729 if (index >= 0) {
730 ok = ok && (unit->getIndex() == index);
731 }
732 if (ok) {
733 if (route.getFromName() != wild) {
734 ok = ok && (route.getFromName() == alt.getFromName());
735 }
736 if (route.getToName() != wild) {
737 ok = ok && (route.getToName() == alt.getToName());
738 }
739 if (route.getCarrierName() != wild) {
740 ok = ok && (route.getCarrierName() == alt.getCarrierName());
741 }
742 }
743 if (ok) {
744 needReap = true;
745 break;
746 }
747 }
748 }
749 }
750 return needReap;
751}
752
753
754bool PortCore::removeUnit(const Route& route, bool synch, bool* except)
755{
756 // This is a request to remove a connection. It could arise from any
757 // input thread.
758
759 if (except != nullptr) {
760 yCIDebug(PORTCORE, getName(), "asked to remove connection in the way of %s", route.toString().c_str());
761 *except = false;
762 } else {
763 yCIDebug(PORTCORE, getName(), "asked to remove connection %s", route.toString().c_str());
764 }
765
766 // Scan for units that match the given route, and collect their IDs.
767 // Mark them as "doomed".
768 std::vector<int> removals;
769
770 bool needReap = false;
771 if (!m_finished.load()) {
772 std::lock_guard<std::mutex> lock(m_stateMutex);
773 for (auto* unit : m_units) {
774 if (unit != nullptr) {
775 Route alt = unit->getRoute();
776 std::string wild = "*";
777 bool ok = true;
778 if (route.getFromName() != wild) {
779 ok = ok && (route.getFromName() == alt.getFromName());
780 }
781 if (route.getToName() != wild) {
782 ok = ok && (route.getToName() == alt.getToName());
783 }
784 if (route.getCarrierName() != wild) {
785 if (except == nullptr) {
786 ok = ok && (route.getCarrierName() == alt.getCarrierName());
787 } else {
788 if (route.getCarrierName() == alt.getCarrierName()) {
789 *except = true;
790 ok = false;
791 }
792 }
793 }
794
795 if (ok) {
796 yCIDebug(PORTCORE, getName(), "removing connection %s", alt.toString().c_str());
797 removals.push_back(unit->getIndex());
798 unit->setDoomed();
799 needReap = true;
800 }
801 }
802 }
803 }
804
805 // If we find one or more matches, we need to do some work.
806 // We've marked those matches as "doomed" so they'll get cleared
807 // up eventually, but we can speed this up by waking up the
808 // server thread.
809 if (needReap) {
810 yCIDebug(PORTCORE, getName(), "one or more connections need prodding to die");
811
812 if (m_manual) {
813 // No server thread, no problems.
814 reapUnits();
815 } else {
816 // Send a blank message to make up server thread.
817 OutputProtocol* op = m_face->write(m_address);
818 if (op != nullptr) {
819 op->close();
820 delete op;
821 }
822 yCIDebug(PORTCORE, getName(), "sent message to prod connection death");
823
824 if (synch) {
825 // Wait for connections to be cleaned up.
826 yCIDebug(PORTCORE, getName(), "synchronizing with connection death");
827 {
828 // Critical section
829 std::unique_lock<std::mutex> lock(m_stateMutex);
830 while (std::any_of(removals.begin(), removals.end(), [&](int removal){ return isUnit(route, removal); })) {
831 m_connectionListeners++;
832 m_connectionChangeCv.wait(lock, [&]{ return m_connectionListeners == 0; });
833 }
834 }
835 }
836 }
837 }
838 return needReap;
839}
840
841
842bool PortCore::addOutput(const std::string& dest,
843 void* id,
844 OutputStream* os,
845 bool onlyIfNeeded)
846{
847 YARP_UNUSED(id);
848 yCIDebug(PORTCORE, getName(), "asked to add output to %s", dest.c_str());
849
850 // Buffer to store text describing outcome (successful connection,
851 // or a failure).
853
854 // Look up the address we'll be connecting to.
855 Contact parts = Name(dest).toAddress();
856 Contact contact = NetworkBase::queryName(parts.getRegName());
857 Contact address = contact;
858
859 // If we can't find it, say so and abort.
860 if (!address.isValid()) {
861 bw.appendLine(std::string("Do not know how to connect to ") + dest);
862 if (os != nullptr) {
863 bw.write(*os);
864 }
865 return false;
866 }
867
868 // We clean all existing connections to the desired destination,
869 // optionally stopping if we find one with the right carrier.
870 if (onlyIfNeeded) {
871 // Remove any existing connections between source and destination
872 // with a different carrier. If we find a connection already
873 // present with the correct carrier, then we are done.
874 bool except = false;
875 removeUnit(Route(getName(), address.getRegName(), address.getCarrier()), true, &except);
876 if (except) {
877 // Connection already present.
878 yCIDebug(PORTCORE, getName(), "output already present to %s", dest.c_str());
879 bw.appendLine(std::string("Desired connection already present from ") + getName() + " to " + dest);
880 if (os != nullptr) {
881 bw.write(*os);
882 }
883 return true;
884 }
885 } else {
886 // Remove any existing connections between source and destination.
887 removeUnit(Route(getName(), address.getRegName(), "*"), true);
888 }
889
890 // Set up a named route for this connection.
891 std::string aname = address.getRegName();
892 if (aname.empty()) {
893 aname = address.toURI(false);
894 }
895 Route r(getName(),
896 aname,
897 ((!parts.getCarrier().empty()) ? parts.getCarrier() : address.getCarrier()));
898 r.setToContact(contact);
899
900 // Check for any restrictions on the port. Perhaps it can only
901 // read, or write.
902 bool allowed = true;
903 std::string err;
904 std::string append;
905 unsigned int f = getFlags();
906 bool allow_output = (f & PORTCORE_IS_OUTPUT) != 0;
907 bool rpc = (f & PORTCORE_IS_RPC) != 0;
908 Name name(r.getCarrierName() + std::string("://test"));
909 std::string mode = name.getCarrierModifier("log");
910 bool is_log = (!mode.empty());
911 if (is_log) {
912 if (mode != "in") {
913 err = "Logger configured as log." + mode + ", but only log.in is supported";
914 allowed = false;
915 } else {
916 append = "; " + r.getFromName() + " will forward messages and replies (if any) to " + r.getToName();
917 }
918 }
919 if (!allow_output) {
920 if (!is_log) {
921 bool push = false;
923 if (c != nullptr) {
924 push = c->isPush();
925 }
926 if (push) {
927 err = "Outputs not allowed";
928 allowed = false;
929 }
930 }
931 } else if (rpc) {
932 if (m_dataOutputCount >= 1 && !is_log) {
933 err = "RPC output already connected";
934 allowed = false;
935 }
936 }
937
938 // If we found a relevant restriction, abort.
939 if (!allowed) {
940 bw.appendLine(err);
941 if (os != nullptr) {
942 bw.write(*os);
943 }
944 return false;
945 }
946
947 // Ok! We can go ahead and make a connection.
948 OutputProtocol* op = nullptr;
949 if (m_timeout > 0) {
950 address.setTimeout(m_timeout);
951 }
952 op = Carriers::connect(address);
953 if (op != nullptr) {
954 op->attachPort(m_contactable);
955 if (m_timeout > 0) {
956 op->setTimeout(m_timeout);
957 }
958
959 bool ok = op->open(r);
960 if (!ok) {
961 yCIDebug(PORTCORE, getName(), "open route error");
962 delete op;
963 op = nullptr;
964 }
965 }
966
967 // No connection, abort.
968 if (op == nullptr) {
969 bw.appendLine(std::string("Cannot connect to ") + dest);
970 if (os != nullptr) {
971 bw.write(*os);
972 }
973 return false;
974 }
975
976 // Ok, we have a connection, now add it to PortCore#units
977 if (op->getConnection().isPush()) {
978 // This is the normal case
979 addOutput(op);
980 } else {
981 // This is the case for connections that are initiated
982 // in the opposite direction to native YARP connections.
983 // Native YARP has push connections, initiated by the
984 // sender. HTTP and ROS have pull connections, initiated
985 // by the receiver.
986 // We invert the route, flip the protocol direction, and add.
987 r.swapNames();
988 op->rename(r);
989 InputProtocol* ip = &(op->getInput());
990 if (!m_finished.load()) {
991 std::lock_guard<std::mutex> lock(m_stateMutex);
993 getNextIndex(),
994 ip,
995 true);
996 yCIAssert(PORTCORE, getName(), unit != nullptr);
997 unit->start();
998 m_units.push_back(unit);
999 }
1000 }
1001
1002 // Communicated the good news.
1003 bw.appendLine(std::string("Added connection from ") + getName() + " to " + dest + append);
1004 if (os != nullptr) {
1005 bw.write(*os);
1006 }
1007 cleanUnits();
1008 return true;
1009}
1010
1011
1012void PortCore::removeOutput(const std::string& dest, void* id, OutputStream* os)
1013{
1014 YARP_UNUSED(id);
1015 yCIDebug(PORTCORE, getName(), "asked to remove output to %s", dest.c_str());
1016
1017 // All the real work done by removeUnit().
1019 if (removeUnit(Route("*", dest, "*"), true)) {
1020 bw.appendLine(std::string("Removed connection from ") + getName() + " to " + dest);
1021 } else {
1022 bw.appendLine(std::string("Could not find an outgoing connection to ") + dest);
1023 }
1024 if (os != nullptr) {
1025 bw.write(*os);
1026 }
1027 cleanUnits();
1028}
1029
1030void PortCore::removeInput(const std::string& src, void* id, OutputStream* os)
1031{
1032 YARP_UNUSED(id);
1033 yCIDebug(PORTCORE, getName(), "asked to remove input to %s", src.c_str());
1034
1035 // All the real work done by removeUnit().
1037 if (removeUnit(Route(src, "*", "*"), true)) {
1038 bw.appendLine(std::string("Removing connection from ") + src + " to " + getName());
1039 } else {
1040 bw.appendLine(std::string("Could not find an incoming connection from ") + src);
1041 }
1042 if (os != nullptr) {
1043 bw.write(*os);
1044 }
1045 cleanUnits();
1046}
1047
1049{
1050 YARP_UNUSED(id);
1051 cleanUnits();
1052
1053 // Buffer to store a human-readable description of the port's
1054 // state.
1056
1057 {
1058 // Critical section
1059 std::lock_guard<std::mutex> lock(m_stateMutex);
1060
1061 // Report name and address.
1062 bw.appendLine(std::string("This is ") + m_address.getRegName() + " at " + m_address.toURI());
1063
1064 // Report outgoing connections.
1065 int oct = 0;
1066 for (auto* unit : m_units) {
1067 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1068 Route route = unit->getRoute();
1069 std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1070 bw.appendLine(msg);
1071 oct++;
1072 }
1073 }
1074 if (oct < 1) {
1075 bw.appendLine("There are no outgoing connections");
1076 }
1077
1078 // Report incoming connections.
1079 int ict = 0;
1080 for (auto* unit : m_units) {
1081 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1082 Route route = unit->getRoute();
1083 if (!route.getCarrierName().empty()) {
1084 std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1085 bw.appendLine(msg);
1086 ict++;
1087 }
1088 }
1089 }
1090 if (ict < 1) {
1091 bw.appendLine("There are no incoming connections");
1092 }
1093 }
1094
1095 // Send description across network, or print it.
1096 if (os != nullptr) {
1097 bw.write(*os);
1098 } else {
1100 bw.write(sos);
1101 printf("%s\n", sos.toString().c_str());
1102 }
1103}
1104
1105
1107{
1108 cleanUnits();
1109
1110 std::lock_guard<std::mutex> lock(m_stateMutex);
1111
1112 // Report name and address of port.
1115 std::string portName = m_address.getRegName();
1116 baseInfo.message = std::string("This is ") + portName + " at " + m_address.toURI();
1117 reporter.report(baseInfo);
1118
1119 // Report outgoing connections.
1120 int oct = 0;
1121 for (auto* unit : m_units) {
1122 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1123 Route route = unit->getRoute();
1124 std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1125 PortInfo info;
1126 info.message = msg;
1128 info.incoming = false;
1129 info.portName = portName;
1130 info.sourceName = route.getFromName();
1131 info.targetName = route.getToName();
1132 info.carrierName = route.getCarrierName();
1133 reporter.report(info);
1134 oct++;
1135 }
1136 }
1137 if (oct < 1) {
1138 PortInfo info;
1140 info.message = "There are no outgoing connections";
1141 reporter.report(info);
1142 }
1143
1144 // Report incoming connections.
1145 int ict = 0;
1146 for (auto* unit : m_units) {
1147 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1148 Route route = unit->getRoute();
1149 std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1150 PortInfo info;
1151 info.message = msg;
1153 info.incoming = true;
1154 info.portName = portName;
1155 info.sourceName = route.getFromName();
1156 info.targetName = route.getToName();
1157 info.carrierName = route.getCarrierName();
1158 reporter.report(info);
1159 ict++;
1160 }
1161 }
1162 if (ict < 1) {
1163 PortInfo info;
1165 info.message = "There are no incoming connections";
1166 reporter.report(info);
1167 }
1168}
1169
1170
1172{
1173 std::lock_guard<std::mutex> lock(m_stateMutex);
1174 if (reporter != nullptr) {
1175 m_eventReporter = reporter;
1176 }
1177}
1178
1180{
1181 std::lock_guard<std::mutex> lock(m_stateMutex);
1182 m_eventReporter = nullptr;
1183}
1184
1186{
1187 // We are in the context of one of the input or output threads,
1188 // so our contact with the PortCore must be absolutely minimal.
1189 //
1190 // It is safe to pick up the address of the reporter if this is
1191 // kept constant over the lifetime of the input/output threads.
1192
1193 if (m_eventReporter != nullptr) {
1194 m_eventReporter->report(info);
1195 }
1196}
1197
1198
1200{
1201 YARP_UNUSED(id);
1202 YARP_UNUSED(os);
1203 bool result = true;
1204
1205 // We are in the context of one of the input threads,
1206 // so our contact with the PortCore must be absolutely minimal.
1207 //
1208 // It is safe to pick up the address of the reader since this is
1209 // constant over the lifetime of the input threads.
1210
1211 if (m_reader != nullptr && !m_interrupted) {
1212 m_interruptable = false; // No mutexing; user of interrupt() has to be careful.
1213
1214 bool haveOutputs = (m_outputCount != 0); // No mutexing, but failure modes are benign.
1215
1216 if (m_logNeeded && haveOutputs) {
1217 // Normally, yarp doesn't pay attention to the content of
1218 // messages received by the client. Likewise, the content
1219 // of replies are not monitored. However it may sometimes
1220 // be useful to log this traffic.
1221
1223 recorder.init(&reader);
1224 lockCallback();
1225 result = m_reader->read(recorder);
1227 recorder.fini();
1228 // send off a log of this transaction to whoever wants it
1230 } else {
1231 // YARP is not needed as a middleman
1232 lockCallback();
1233 result = m_reader->read(reader);
1235 }
1236
1237 m_interruptable = true;
1238 } else {
1239 // Read and ignore message, there is no where to send it.
1240 yCIDebug(PORTCORE, getName(), "data received, no reader for it");
1241 Bottle b;
1242 result = b.read(reader);
1243 }
1244 return result;
1245}
1246
1247
1248bool PortCore::send(const PortWriter& writer,
1249 PortReader* reader,
1250 const PortWriter* callback)
1251{
1252 // check if there is any modifier
1253 // we need to protect this part while the modifier
1254 // plugin is loading or unloading!
1255 m_modifier.outputMutex.lock();
1256 if (m_modifier.outputModifier != nullptr) {
1257 if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1258 m_modifier.outputMutex.unlock();
1259 return false;
1260 }
1261 m_modifier.outputModifier->modifyOutgoingData(writer);
1262 }
1263 m_modifier.outputMutex.unlock();
1264 if (!m_logNeeded) {
1265 return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1266 }
1267 // logging is desired, so we need to wrap up and log this send
1268 // (and any reply it gets) -- TODO not yet supported
1269 return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1270}
1271
1273 int mode,
1274 PortReader* reader,
1275 const PortWriter* callback)
1276{
1277 if (m_interrupted || m_finishing) {
1278 return false;
1279 }
1280
1281 bool all_ok = true;
1282 bool gotReply = false;
1283 int logCount = 0;
1284 std::string envelopeString = m_envelope;
1285
1286 // Pass a message to all output units for sending on. We could
1287 // be doing more here to cache the serialization of the message
1288 // and reuse it across output connections. However, one key
1289 // optimization is present: external blocks written by
1290 // yarp::os::ConnectionWriter::appendExternalBlock are never
1291 // copied. So for example the core image array in a yarp::sig::Image
1292 // is untouched by the port communications code.
1293
1294 yCITrace(PORTCORE, getName(), "------- send in real");
1295
1296 // Give user the chance to know that this object is about to be
1297 // written.
1298 writer.onCommencement();
1299
1300 // All user-facing parts of this port will be blocked on this
1301 // operation, so we'll want to be snappy. How long the
1302 // operation lasts will depend on these flags:
1303 // * waitAfterSend
1304 // * waitBeforeSend
1305 // set by setWaitAfterSend() and setWaitBeforeSend().
1306 std::lock_guard<std::mutex> lock(m_stateMutex);
1307
1308 // If the port is shutting down, abort.
1309 if (m_finished.load()) {
1310 return false;
1311 }
1312
1313 yCITrace(PORTCORE, getName(), "------- send in");
1314 // Prepare a "packet" for tracking a single message which
1315 // may travel by multiple outputs.
1316 m_packetMutex.lock();
1317 PortCorePacket* packet = m_packets.getFreePacket();
1318 yCIAssert(PORTCORE, getName(), packet != nullptr);
1319 packet->setContent(&writer, false, callback);
1320 m_packetMutex.unlock();
1321
1322 // Scan connections, placing message everywhere we can.
1323 for (auto* unit : m_units) {
1324 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1325 bool log = (!unit->getMode().empty());
1326 if (log) {
1327 // Some connections are for logging only.
1328 logCount++;
1329 }
1330 bool ok = (mode == PORTCORE_SEND_NORMAL) ? (!log) : (log);
1331 if (!ok) {
1332 continue;
1333 }
1334 bool waiter = m_waitAfterSend || (mode == PORTCORE_SEND_LOG);
1335 yCITrace(PORTCORE, getName(), "------- -- inc");
1336 m_packetMutex.lock();
1337 packet->inc(); // One more connection carrying message.
1338 m_packetMutex.unlock();
1339 yCITrace(PORTCORE, getName(), "------- -- pre-send");
1340 bool gotReplyOne = false;
1341 // Send the message off on this connection.
1342 void* out = unit->send(writer,
1343 reader,
1344 (callback != nullptr) ? callback : (&writer),
1345 reinterpret_cast<void*>(packet),
1347 waiter,
1348 m_waitBeforeSend,
1349 &gotReplyOne);
1351 yCITrace(PORTCORE, getName(), "------- -- send");
1352 if (out != nullptr) {
1353 // We got back a report of a message already sent.
1354 m_packetMutex.lock();
1355 (static_cast<PortCorePacket*>(out))->dec(); // Message on one fewer connections.
1356 m_packets.checkPacket(static_cast<PortCorePacket*>(out));
1357 m_packetMutex.unlock();
1358 }
1359 if (waiter) {
1360 if (unit->isFinished()) {
1361 all_ok = false;
1362 }
1363 }
1364 yCITrace(PORTCORE, getName(), "------- -- dec");
1365 }
1366 }
1367 yCITrace(PORTCORE, getName(), "------- pack check");
1368 m_packetMutex.lock();
1369
1370 // We no longer concern ourselves with the message.
1371 // It may or may not be traveling on some connections.
1372 // But that is not our problem anymore.
1373 packet->dec();
1374
1375 m_packets.checkPacket(packet);
1376 m_packetMutex.unlock();
1377 yCITrace(PORTCORE, getName(), "------- packed");
1378 yCITrace(PORTCORE, getName(), "------- send out");
1379 if (mode == PORTCORE_SEND_LOG) {
1380 if (logCount == 0) {
1381 m_logNeeded = false;
1382 }
1383 }
1384
1385 yCITrace(PORTCORE, getName(), "------- send out real");
1386
1387 if (m_waitAfterSend && reader != nullptr) {
1388 all_ok = all_ok && gotReply;
1389 }
1390
1391 return all_ok;
1392}
1393
1394
1396{
1397 bool writing = false;
1398
1399 // Check if any port is currently writing. TODO optimize
1400 // this query by counting down with notifyCompletion().
1401 if (!m_finished.load()) {
1402 std::lock_guard<std::mutex> lock(m_stateMutex);
1403 for (auto* unit : m_units) {
1404 if ((unit != nullptr) && !unit->isFinished() && unit->isBusy()) {
1405 writing = true;
1406 }
1407 }
1408 }
1409
1410 return writing;
1411}
1412
1413
1415{
1416 cleanUnits(false);
1417 m_packetMutex.lock();
1418 int result = m_inputCount;
1419 m_packetMutex.unlock();
1420 return result;
1421}
1422
1424{
1425 cleanUnits(false);
1426 m_packetMutex.lock();
1427 int result = m_outputCount;
1428 m_packetMutex.unlock();
1429 return result;
1430}
1431
1432
1434{
1435 yCITrace(PORTCORE, getName(), "starting notifyCompletion");
1436 m_packetMutex.lock();
1437 if (tracker != nullptr) {
1438 (static_cast<PortCorePacket*>(tracker))->dec();
1439 m_packets.checkPacket(static_cast<PortCorePacket*>(tracker));
1440 }
1441 m_packetMutex.unlock();
1442 yCITrace(PORTCORE, getName(), "stopping notifyCompletion");
1443}
1444
1445
1447{
1448 m_envelopeWriter.restart();
1449 bool ok = envelope.write(m_envelopeWriter);
1450 if (ok) {
1451 setEnvelope(m_envelopeWriter.toString());
1452 }
1453 return ok;
1454}
1455
1456
1457void PortCore::setEnvelope(const std::string& envelope)
1458{
1459 m_envelope = envelope;
1460 for (size_t i = 0; i < m_envelope.length(); i++) {
1461 // It looks like envelopes are constrained to be printable ASCII?
1462 // I'm not sure why this would be. TODO check.
1463 if (m_envelope[i] < 32) {
1464 m_envelope = m_envelope.substr(0, i);
1465 break;
1466 }
1467 }
1468 yCIDebug(PORTCORE, getName(), "set envelope to %s", m_envelope.c_str());
1469}
1470
1472{
1473 return m_envelope;
1474}
1475
1477{
1479 sis.add(m_envelope);
1480 sis.add("\r\n");
1482 Route route;
1483 sbr.reset(sis, nullptr, route, 0, true);
1484 return envelope.read(sbr);
1485}
1486
1487// Make an RPC connection to talk to a ROS API, send a message, get reply.
1488// NOTE: ROS support can now be moved out of here, once all documentation
1489// of older ways to interoperate with it are purged and people stop
1490// doing it.
1491static bool __pc_rpc(const Contact& c,
1492 const char* carrier,
1493 Bottle& writer,
1494 Bottle& reader,
1495 bool verbose)
1496{
1497 ContactStyle style;
1498 style.quiet = !verbose;
1499 style.timeout = 4;
1500 style.carrier = carrier;
1501 bool ok = Network::write(c, writer, reader, style);
1502 return ok;
1503}
1504
1505// ACE is sometimes confused by localhost aliases, in a ROS-incompatible
1506// way. This method does a quick sanity check if we are using ROS.
1507static bool __tcp_check(const Contact& c)
1508{
1509#ifdef YARP_HAS_ACE
1511 int result = addr.set(c.getPort(), c.getHost().c_str());
1512 if (result != 0) {
1513 yCWarning(PORTCORE, "ACE choked on %s:%d\n", c.getHost().c_str(), c.getPort());
1514 }
1515 result = addr.set(c.getPort(), "127.0.0.1");
1516 if (result != 0) {
1517 yCWarning(PORTCORE, "ACE choked on 127.0.0.1:%d\n", c.getPort());
1518 }
1519 result = addr.set(c.getPort(), "127.0.1.1");
1520 if (result != 0) {
1521 yCWarning(PORTCORE, "ACE choked on 127.0.1.1:%d\n", c.getPort());
1522 }
1523#endif
1524 return true;
1525}
1526
1527namespace {
1528enum class PortCoreCommand : yarp::conf::vocab32_t
1529{
1530 Unknown = 0,
1531 Help = yarp::os::createVocab32('h', 'e', 'l', 'p'),
1532 Ver = yarp::os::createVocab32('v', 'e', 'r'),
1533 Pray = yarp::os::createVocab32('p', 'r', 'a', 'y'),
1534 Add = yarp::os::createVocab32('a', 'd', 'd'),
1535 Del = yarp::os::createVocab32('d', 'e', 'l'),
1536 Atch = yarp::os::createVocab32('a', 't', 'c', 'h'),
1537 Dtch = yarp::os::createVocab32('d', 't', 'c', 'h'),
1538 List = yarp::os::createVocab32('l', 'i', 's', 't'),
1539 Set = yarp::os::createVocab32('s', 'e', 't'),
1540 Get = yarp::os::createVocab32('g', 'e', 't'),
1541 Prop = yarp::os::createVocab32('p', 'r', 'o', 'p'),
1542 RosPublisherUpdate = yarp::os::createVocab32('r', 'p', 'u', 'p'),
1543 RosRequestTopic = yarp::os::createVocab32('r', 't', 'o', 'p'),
1544 RosGetPid = yarp::os::createVocab32('p', 'i', 'd'),
1545 RosGetBusInfo = yarp::os::createVocab32('b', 'u', 's'),
1546};
1547
1548enum class PortCoreConnectionDirection : yarp::conf::vocab32_t
1549{
1550 Error = 0,
1551 Out = yarp::os::createVocab32('o', 'u', 't'),
1552 In = yarp::os::createVocab32('i', 'n'),
1553};
1554
1555enum class PortCorePropertyAction : yarp::conf::vocab32_t
1556{
1557 Error = 0,
1558 Get = yarp::os::createVocab32('g', 'e', 't'),
1559 Set = yarp::os::createVocab32('s', 'e', 't')
1560};
1561
1562PortCoreCommand parseCommand(const yarp::os::Value& v)
1563{
1564 if (v.isString()) {
1565 // We support ROS client API these days. Here we recode some long ROS
1566 // command names, just for convenience.
1567 std::string cmd = v.asString();
1568 if (cmd == "publisherUpdate") {
1569 return PortCoreCommand::RosPublisherUpdate;
1570 }
1571 if (cmd == "requestTopic") {
1572 return PortCoreCommand::RosRequestTopic;
1573 }
1574 if (cmd == "getPid") {
1575 return PortCoreCommand::RosGetPid;
1576 }
1577 if (cmd == "getBusInfo") {
1578 return PortCoreCommand::RosGetBusInfo;
1579 }
1580 }
1581
1582 auto cmd = static_cast<PortCoreCommand>(v.asVocab32());
1583 switch (cmd) {
1584 case PortCoreCommand::Help:
1585 case PortCoreCommand::Ver:
1586 case PortCoreCommand::Pray:
1587 case PortCoreCommand::Add:
1588 case PortCoreCommand::Del:
1589 case PortCoreCommand::Atch:
1590 case PortCoreCommand::Dtch:
1591 case PortCoreCommand::List:
1592 case PortCoreCommand::Set:
1593 case PortCoreCommand::Get:
1594 case PortCoreCommand::Prop:
1595 case PortCoreCommand::RosPublisherUpdate:
1596 case PortCoreCommand::RosRequestTopic:
1597 case PortCoreCommand::RosGetPid:
1598 case PortCoreCommand::RosGetBusInfo:
1599 return cmd;
1600 default:
1601 return PortCoreCommand::Unknown;
1602 }
1603}
1604
1605PortCoreConnectionDirection parseConnectionDirection(yarp::conf::vocab32_t v, bool errorIsOut = false)
1606{
1607 auto dir = static_cast<PortCoreConnectionDirection>(v);
1608 switch (dir) {
1609 case PortCoreConnectionDirection::In:
1610 case PortCoreConnectionDirection::Out:
1611 return dir;
1612 default:
1613 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1614 }
1615}
1616
1617PortCorePropertyAction parsePropertyAction(yarp::conf::vocab32_t v)
1618{
1619 auto action = static_cast<PortCorePropertyAction>(v);
1620 switch (action) {
1621 case PortCorePropertyAction::Get:
1622 case PortCorePropertyAction::Set:
1623 return action;
1624 default:
1625 return PortCorePropertyAction::Error;
1626 }
1627}
1628
1629void describeRoute(const Route& route, Bottle& result)
1630{
1631 Bottle& bfrom = result.addList();
1632 bfrom.addString("from");
1633 bfrom.addString(route.getFromName());
1634
1635 Bottle& bto = result.addList();
1636 bto.addString("to");
1637 bto.addString(route.getToName());
1638
1639 Bottle& bcarrier = result.addList();
1640 bcarrier.addString("carrier");
1641 bcarrier.addString(route.getCarrierName());
1642
1644 if (carrier->isConnectionless()) {
1645 Bottle& bconnectionless = result.addList();
1646 bconnectionless.addString("connectionless");
1647 bconnectionless.addInt32(1);
1648 }
1649 if (!carrier->isPush()) {
1650 Bottle& breverse = result.addList();
1651 breverse.addString("push");
1652 breverse.addInt32(0);
1653 }
1654 delete carrier;
1655}
1656
1657} // namespace
1658
1660 void* id)
1661{
1662 Bottle cmd;
1663 Bottle result;
1664
1665 // We've received a message to the port that is marked as administrative.
1666 // That means that instead of passing it along as data to the user of the
1667 // port, the port itself is responsible for reading and responding to
1668 // it. So let's read the message and see what we're supposed to do.
1669 cmd.read(reader);
1670
1671 yCIDebug(PORTCORE, getName(), "Port %s received command %s", getName().c_str(), cmd.toString().c_str());
1672
1673 auto handleAdminHelpCmd = []() {
1674 Bottle result;
1675 // We give a list of the most useful administrative commands.
1676 result.addVocab32('m', 'a', 'n', 'y');
1677 result.addString("[help] # give this help");
1678 result.addString("[ver] # report protocol version information");
1679 result.addString("[add] $portname # add an output connection");
1680 result.addString("[add] $portname $car # add an output with a given protocol");
1681 result.addString("[del] $portname # remove an input or output connection");
1682 result.addString("[list] [in] # list input connections");
1683 result.addString("[list] [out] # list output connections");
1684 result.addString("[list] [in] $portname # give details for input");
1685 result.addString("[list] [out] $portname # give details for output");
1686 result.addString("[prop] [get] # get all user-defined port properties");
1687 result.addString("[prop] [get] $prop # get a user-defined port property (prop, val)");
1688 result.addString("[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1689 result.addString("[prop] [get] $portname # get Qos properties of a connection to/from a port");
1690 result.addString("[prop] [set] $portname # set Qos properties of a connection to/from a port");
1691 result.addString("[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1692 result.addString("[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1693 result.addString("[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1694 result.addString("[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1695 result.addString("[dtch] [out] # detach portmonitor plug-in from the port's output");
1696 result.addString("[dtch] [in] # detach portmonitor plug-in from the port's input");
1697 //result.addString("[atch] $portname $prop # attach a portmonitor plug-in to the connection to/from $portname");
1698 //result.addString("[dtch] $portname # detach any portmonitor plug-in from the connection to/from $portname");
1699 return result;
1700 };
1701
1702 auto handleAdminVerCmd = []() {
1703 // Gives a version number for the administrative commands.
1704 // It is distinct from YARP library versioning.
1705 Bottle result;
1706 result.addVocab32("ver");
1707 result.addInt32(1);
1708 result.addInt32(2);
1709 result.addInt32(3);
1710 return result;
1711 };
1712
1713 auto handleAdminPrayCmd = [this]() {
1714 // Strongly inspired by nethack #pray command:
1715 // https://nethackwiki.com/wiki/Prayer
1716 // http://www.steelypips.org/nethack/pray.html
1717
1718 Bottle result;
1719
1720 bool found = false;
1721 std::string name = yarp::conf::environment::get_string("YARP_ROBOT_NAME", &found);
1722 if (!found) {
1723 name = getName();
1724 // Remove initial "/"
1725 while (name[0] == '/') {
1726 name = name.substr(1);
1727 }
1728 // Keep only the first part of the port name
1729 auto i = name.find('/');
1730 if (i != std::string::npos) {
1731 name = name.substr(0, i);
1732 }
1733 }
1734
1735 std::random_device rd;
1736 std::mt19937 mt(rd());
1737 std::uniform_int_distribution<int> dist2(0,1);
1738 auto d2 = std::bind(dist2, mt);
1739
1740 result.addString("You begin praying to " + name + ".");
1741 result.addString("You finish your prayer.");
1742
1743 static const char* godvoices[] = {
1744 "booms out",
1745 "thunders",
1746 "rings out",
1747 "booms",
1748 };
1749 std::uniform_int_distribution<int> godvoices_dist(0, (sizeof(godvoices) / sizeof(godvoices[0])) - 1);
1750 auto godvoice = [&]() {
1751 return std::string(godvoices[godvoices_dist(mt)]);
1752 };
1753
1754 static const char* creatures[] = {
1755 "mortal",
1756 "creature",
1757 "robot",
1758 };
1759 std::uniform_int_distribution<int> creatures_dist(0, (sizeof(creatures) / sizeof(creatures[0])) - 1);
1760 auto creature = [&]() {
1761 return std::string(creatures[creatures_dist(mt)]);
1762 };
1763
1764 static const char* auras[] = {
1765 "amber",
1766 "light blue",
1767 "golden",
1768 "white",
1769 "orange",
1770 "black",
1771 };
1772 std::uniform_int_distribution<int> auras_dist(0, (sizeof(auras) / sizeof(auras[0])) - 1);
1773 auto aura = [&]() {
1774 return std::string(auras[auras_dist(mt)]);
1775 };
1776
1777 static const char* items[] = {
1778 "keyboard",
1779 "mouse",
1780 "monitor",
1781 "headphones",
1782 "smartphone",
1783 "wallet",
1784 "eyeglasses",
1785 "shirt",
1786 };
1787 std::uniform_int_distribution<int> items_dist(0, (sizeof(items) / sizeof(items[0])) - 1);
1788 auto item = [&]() {
1789 return std::string(items[items_dist(mt)]);
1790 };
1791
1792 static const char* blessings[] = {
1793 "You feel more limber.",
1794 "The slime disappears.",
1795 "Your amulet vanishes! You can breathe again.",
1796 "You can breathe again.",
1797 "You are back on solid ground.",
1798 "Your stomach feels content.",
1799 "You feel better.",
1800 "You feel much better.",
1801 "Your surroundings change.",
1802 "Your shape becomes uncertain.",
1803 "Your chain disappears.",
1804 "There's a tiger in your tank.",
1805 "You feel in good health again.",
1806 "Your eye feels better.",
1807 "Your eyes feel better.",
1808 "Looks like you are back in Kansas.",
1809 "Your <ITEM> softly glows <AURA>.",
1810 };
1811 std::uniform_int_distribution<int> blessings_dist(0, (sizeof(blessings) / sizeof(blessings[0])) - 1);
1812 auto blessing = [&](){
1813 auto blessing = std::string(blessings[blessings_dist(mt)]);
1814 blessing = std::regex_replace(blessing, std::regex("<ITEM>"), item());
1815 blessing = std::regex_replace(blessing, std::regex("<AURA>"), aura());
1816 return blessing;
1817 };
1818
1819 std::uniform_int_distribution<int> dist13(0,12);
1820 switch(dist13(mt)) {
1821 case 0:
1822 case 1:
1823 result.addString("You feel that " + name + " is " + (d2() ? "bummed" : "displeased") + ".");
1824 break;
1825 case 2:
1826 case 3:
1827 result.addString("The voice of " + name + " " + godvoice() +
1828 ": \"Thou " + (d2() ? "hast strayed from the path" : "art arrogant") +
1829 ", " + creature() + ". Thou must relearn thy lessons!\"");
1830 break;
1831 case 4:
1832 case 5:
1833 result.addString("The voice of " + name + " " + godvoice() +
1834 ": \"Thou hast angered me.\"");
1835 result.addString("A black glow surrounds you.");
1836 break;
1837 case 6:
1838 result.addString("The voice of " + name + " " + godvoice() +
1839 ": \"Thou hast angered me.\"");
1840 break;
1841 case 7:
1842 case 8:
1843 result.addString("The voice of " + name + " " + godvoice() +
1844 ": \"Thou durst " + (d2() ? "scorn" : "call upon") +
1845 " me? Then die, " + creature() + "!\"");
1846 break;
1847 case 9:
1848 result.addString("You feel that " + name + " is " + (d2() ? "pleased as punch" : "well-pleased") + ".");
1849 result.addString(blessing());
1850 break;
1851 case 10:
1852 result.addString("You feel that " + name + " is " + (d2() ? "ticklish" : "pleased") + ".");
1853 result.addString(blessing());
1854 break;
1855 case 11:
1856 result.addString("You feel that " + name + " is " + (d2() ? "full" : "satisfied") + ".");
1857 result.addString(blessing());
1858 break;
1859 default:
1860 result.addString("The voice of " + name + " " + godvoice() +
1861 ": \"Thou hast angered me.\"");
1862 result.addString("Suddenly, a bolt of lightning strikes you!");
1863 result.addString("You fry to a crisp!");
1864 break;
1865 }
1866
1867 return result;
1868 };
1869
1870 auto handleAdminAddCmd = [this, id](std::string output,
1871 const std::string& carrier) {
1872 // Add an output to the port.
1873 Bottle result;
1874 StringOutputStream cache;
1875 if (!carrier.empty()) {
1876 output = carrier + ":/" + output;
1877 }
1878 addOutput(output, id, &cache, false);
1879 std::string r = cache.toString();
1880 int v = (r[0] == 'A') ? 0 : -1;
1881 result.addInt32(v);
1882 result.addString(r);
1883 return result;
1884 };
1885
1886 auto handleAdminDelCmd = [this, id](const std::string& dest) {
1887 // Delete any inputs or outputs involving the named port.
1888 Bottle result;
1889 StringOutputStream cache;
1890 removeOutput(dest, id, &cache);
1891 std::string r1 = cache.toString();
1892 cache.reset();
1893 removeInput(dest, id, &cache);
1894 std::string r2 = cache.toString();
1895 int v = (r1[0] == 'R' || r2[0] == 'R') ? 0 : -1;
1896 result.addInt32(v);
1897 if (r1[0] == 'R' && r2[0] != 'R') {
1898 result.addString(r1);
1899 } else if (r1[0] != 'R' && r2[0] == 'R') {
1900 result.addString(r2);
1901 } else {
1902 result.addString(r1 + r2);
1903 }
1904 return result;
1905 };
1906
1907 auto handleAdminAtchCmd = [this](PortCoreConnectionDirection direction,
1908 Property prop) {
1909 Bottle result;
1910 switch (direction) {
1911 case PortCoreConnectionDirection::Out: {
1912 std::string errMsg;
1913 if (!attachPortMonitor(prop, true, errMsg)) {
1914 result.addVocab32("fail");
1915 result.addString(errMsg);
1916 } else {
1917 result.addVocab32("ok");
1918 }
1919 } break;
1920 case PortCoreConnectionDirection::In: {
1921 std::string errMsg;
1922 if (!attachPortMonitor(prop, false, errMsg)) {
1923 result.addVocab32("fail");
1924 result.addString(errMsg);
1925 } else {
1926 result.addVocab32("ok");
1927 }
1928 } break;
1929 case PortCoreConnectionDirection::Error:
1930 result.addVocab32("fail");
1931 result.addString("attach command must be followed by [out] or [in]");
1932 }
1933 return result;
1934 };
1935
1936 auto handleAdminDtchCmd = [this](PortCoreConnectionDirection direction) {
1937 Bottle result;
1938 switch (direction) {
1939 case PortCoreConnectionDirection::Out: {
1940 if (detachPortMonitor(true)) {
1941 result.addVocab32("ok");
1942 } else {
1943 result.addVocab32("fail");
1944 }
1945 } break;
1946 case PortCoreConnectionDirection::In: {
1947 if (detachPortMonitor(false)) {
1948 result.addVocab32("ok");
1949 } else {
1950 result.addVocab32("fail");
1951 }
1952 } break;
1953 case PortCoreConnectionDirection::Error:
1954 result.addVocab32("fail");
1955 result.addString("detach command must be followed by [out] or [in]");
1956 };
1957 return result;
1958 };
1959
1960 auto handleAdminListCmd = [this](const PortCoreConnectionDirection direction,
1961 const std::string& target) {
1962 Bottle result;
1963 switch (direction) {
1964 case PortCoreConnectionDirection::In: {
1965 // Return a list of all input connections.
1966 std::lock_guard<std::mutex> lock(m_stateMutex);
1967 for (auto* unit : m_units) {
1968 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1969 Route route = unit->getRoute();
1970 if (target.empty()) {
1971 const std::string& name = route.getFromName();
1972 if (!name.empty()) {
1973 result.addString(name);
1974 }
1975 } else if (route.getFromName() == target) {
1976 describeRoute(route, result);
1977 }
1978 }
1979 }
1980 } break;
1981 case PortCoreConnectionDirection::Out: {
1982 // Return a list of all output connections.
1983 std::lock_guard<std::mutex> lock(m_stateMutex);
1984 for (auto* unit : m_units) {
1985 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1986 Route route = unit->getRoute();
1987 if (target.empty()) {
1988 result.addString(route.getToName());
1989 } else if (route.getToName() == target) {
1990 describeRoute(route, result);
1991 }
1992 }
1993 }
1994 } break;
1995 case PortCoreConnectionDirection::Error:
1996 // Should never happen
1997 yCIAssert(PORTCORE, getName(), false);
1998 break;
1999 }
2000 return result;
2001 };
2002
2003 auto handleAdminSetInCmd = [this](const std::string& target,
2004 const Property& property) {
2005 Bottle result;
2006 // Set carrier parameters on a given input connection.
2007 std::lock_guard<std::mutex> lock(m_stateMutex);
2008 if (target.empty()) {
2009 result.addInt32(-1);
2010 result.addString("target port is not specified.\r\n");
2011 } else {
2012 if (target == getName()) {
2013 std::string errMsg;
2014 if (!setParamPortMonitor(property, false, errMsg)) {
2015 result.addVocab32("fail");
2016 result.addString(errMsg);
2017 } else {
2018 result.addVocab32("ok");
2019 }
2020 } else {
2021 for (auto* unit : m_units) {
2022 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2023 Route route = unit->getRoute();
2024 if (route.getFromName() == target) {
2025 unit->setCarrierParams(property);
2026 result.addInt32(0);
2027 std::string msg = "Configured connection from ";
2028 msg += route.getFromName();
2029 msg += "\r\n";
2030 result.addString(msg);
2031 break;
2032 }
2033 }
2034 }
2035 }
2036 if (result.size() == 0) {
2037 result.addInt32(-1);
2038 std::string msg = "Could not find an incoming connection from ";
2039 msg += target;
2040 msg += "\r\n";
2041 result.addString(msg);
2042 }
2043 }
2044 return result;
2045 };
2046
2047 auto handleAdminSetOutCmd = [this](const std::string& target,
2048 const Property& property) {
2049 Bottle result;
2050 // Set carrier parameters on a given output connection.
2051 std::lock_guard<std::mutex> lock(m_stateMutex);
2052 if (target.empty()) {
2053 result.addInt32(-1);
2054 result.addString("target port is not specified.\r\n");
2055 } else {
2056 if (target == getName()) {
2057 std::string errMsg;
2058 if (!setParamPortMonitor(property, true, errMsg)) {
2059 result.addVocab32("fail");
2060 result.addString(errMsg);
2061 } else {
2062 result.addVocab32("ok");
2063 }
2064 } else {
2065 for (auto* unit : m_units) {
2066 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2067 Route route = unit->getRoute();
2068 if (route.getToName() == target) {
2069 unit->setCarrierParams(property);
2070 result.addInt32(0);
2071 std::string msg = "Configured connection to ";
2072 msg += route.getToName();
2073 msg += "\r\n";
2074 result.addString(msg);
2075 break;
2076 }
2077 }
2078 }
2079 }
2080 if (result.size() == 0) {
2081 result.addInt32(-1);
2082 std::string msg = "Could not find an incoming connection to ";
2083 msg += target;
2084 msg += "\r\n";
2085 result.addString(msg);
2086 }
2087 }
2088 return result;
2089 };
2090
2091 auto handleAdminGetInCmd = [this](const std::string& target) {
2092 Bottle result;
2093 // Get carrier parameters for a given input connection.
2094 std::lock_guard<std::mutex> lock(m_stateMutex);
2095 if (target.empty()) {
2096 result.addInt32(-1);
2097 result.addString("target port is not specified.\r\n");
2098 } else if (target == getName()) {
2099 yarp::os::Property property;
2100 std::string errMsg;
2101 if (!getParamPortMonitor(property, false, errMsg)) {
2102 result.addVocab32("fail");
2103 result.addString(errMsg);
2104 } else {
2105 result.addDict() = property;
2106 }
2107 } else {
2108 for (auto* unit : m_units) {
2109 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2110 Route route = unit->getRoute();
2111 if (route.getFromName() == target) {
2112 yarp::os::Property property;
2113 unit->getCarrierParams(property);
2114 result.addDict() = property;
2115 break;
2116 }
2117 }
2118 }
2119 if (result.size() == 0) {
2120 result.addInt32(-1);
2121 std::string msg = "Could not find an incoming connection from ";
2122 msg += target;
2123 msg += "\r\n";
2124 result.addString(msg);
2125 }
2126 }
2127 return result;
2128 };
2129
2130 auto handleAdminGetOutCmd = [this](const std::string& target) {
2131 Bottle result;
2132 // Get carrier parameters for a given output connection.
2133 std::lock_guard<std::mutex> lock(m_stateMutex);
2134 if (target.empty()) {
2135 result.addInt32(-1);
2136 result.addString("target port is not specified.\r\n");
2137 } else if (target == getName()) {
2138 yarp::os::Property property;
2139 std::string errMsg;
2140 if (!getParamPortMonitor(property, true, errMsg)) {
2141 result.addVocab32("fail");
2142 result.addString(errMsg);
2143 } else {
2144 result.addDict() = property;
2145 }
2146 } else {
2147 for (auto* unit : m_units) {
2148 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2149 Route route = unit->getRoute();
2150 if (route.getToName() == target) {
2151 yarp::os::Property property;
2152 unit->getCarrierParams(property);
2153 result.addDict() = property;
2154 break;
2155 }
2156 }
2157 }
2158 if (result.size() == 0) {
2159 result.addInt32(-1);
2160 std::string msg = "Could not find an incoming connection to ";
2161 msg += target;
2162 msg += "\r\n";
2163 result.addString(msg);
2164 }
2165 }
2166 return result;
2167 };
2168
2169 auto handleAdminPropGetCmd = [this](const std::string& key) {
2170 Bottle result;
2171 Property* p = acquireProperties(false);
2172 if (p != nullptr) {
2173 if (key.empty()) {
2174 result.fromString(p->toString());
2175 } else {
2176 // request: "prop get /portname"
2177 if (key[0] == '/') {
2178 bool bFound = false;
2179 // check for their own name
2180 if (key == getName()) {
2181 bFound = true;
2182 Bottle& sched = result.addList();
2183 sched.addString("sched");
2184 Property& sched_prop = sched.addDict();
2185 sched_prop.put("tid", static_cast<int>(this->getTid()));
2186 sched_prop.put("priority", this->getPriority());
2187 sched_prop.put("policy", this->getPolicy());
2188
2190 Bottle& proc = result.addList();
2191 proc.addString("process");
2192 Property& proc_prop = proc.addDict();
2193 proc_prop.put("pid", info.pid);
2194 proc_prop.put("name", (info.pid != -1) ? info.name : "unknown");
2195 proc_prop.put("arguments", (info.pid != -1) ? info.arguments : "unknown");
2196 proc_prop.put("priority", info.schedPriority);
2197 proc_prop.put("policy", info.schedPolicy);
2198
2200 Bottle& platform = result.addList();
2201 platform.addString("platform");
2202 Property& platform_prop = platform.addDict();
2203 platform_prop.put("os", pinfo.name);
2204 platform_prop.put("hostname", m_address.getHost());
2205
2206 unsigned int f = getFlags();
2207 bool is_input = (f & PORTCORE_IS_INPUT) != 0;
2208 bool is_output = (f & PORTCORE_IS_OUTPUT) != 0;
2209 bool is_rpc = (f & PORTCORE_IS_RPC) != 0;
2210 Bottle& port = result.addList();
2211 port.addString("port");
2212 Property& port_prop = port.addDict();
2213 port_prop.put("is_input", is_input);
2214 port_prop.put("is_output", is_output);
2215 port_prop.put("is_rpc", is_rpc);
2216 port_prop.put("type", getType().getName());
2217 } else {
2218 for (auto* unit : m_units) {
2219 if ((unit != nullptr) && !unit->isFinished()) {
2220 Route route = unit->getRoute();
2221 std::string coreName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2222 if (key == coreName) {
2223 bFound = true;
2224 int priority = unit->getPriority();
2225 int policy = unit->getPolicy();
2226 int tos = getTypeOfService(unit);
2227 int tid = static_cast<int>(unit->getTid());
2228 Bottle& sched = result.addList();
2229 sched.addString("sched");
2230 Property& sched_prop = sched.addDict();
2231 sched_prop.put("tid", tid);
2232 sched_prop.put("priority", priority);
2233 sched_prop.put("policy", policy);
2234 Bottle& qos = result.addList();
2235 qos.addString("qos");
2236 Property& qos_prop = qos.addDict();
2237 qos_prop.put("tos", tos);
2238 }
2239 } // end isFinished()
2240 } // end for loop
2241 } // end portName == getname()
2242
2243 if (!bFound) { // cannot find any port matches the requested one
2244 result.addVocab32("fail");
2245 std::string msg = "cannot find any connection to/from ";
2246 msg = msg + key;
2247 result.addString(msg);
2248 }
2249 // end of (portName[0] == '/')
2250 } else {
2251 result.add(p->find(key));
2252 }
2253 }
2254 }
2256 return result;
2257 };
2258
2259 auto handleAdminPropSetCmd = [this](const std::string& key,
2260 const Value& value,
2261 const Bottle& process,
2262 const Bottle& sched,
2263 const Bottle& qos) {
2264 Bottle result;
2265 Property* p = acquireProperties(false);
2266 bool bOk = true;
2267 if (p != nullptr) {
2268 p->put(key, value);
2269 // setting scheduling properties of all threads within the process
2270 // scope through the admin port
2271 // e.g. prop set /current_port (process ((priority 30) (policy 1)))
2272 if (!process.isNull()) {
2273 std::string portName = key;
2274 if ((!portName.empty()) && (portName[0] == '/')) {
2275 // check for their own name
2276 if (portName == getName()) {
2277 bOk = false;
2278 Bottle* process_prop = process.find("process").asList();
2279 if (process_prop != nullptr) {
2280 int prio = -1;
2281 int policy = -1;
2282 if (process_prop->check("priority")) {
2283 prio = process_prop->find("priority").asInt32();
2284 }
2285 if (process_prop->check("policy")) {
2286 policy = process_prop->find("policy").asInt32();
2287 }
2288 bOk = setProcessSchedulingParam(prio, policy);
2289 }
2290 }
2291 }
2292 }
2293 // check if we need to set the PortCoreUnit scheduling policy
2294 // e.g., "prop set /portname (sched ((priority 30) (policy 1)))"
2295 // The priority and policy values on Linux are:
2296 // SCHED_OTHER : policy=0, priority=[0 .. 0]
2297 // SCHED_FIFO : policy=1, priority=[1 .. 99]
2298 // SCHED_RR : policy=2, priority=[1 .. 99]
2299 if (!sched.isNull()) {
2300 if ((!key.empty()) && (key[0] == '/')) {
2301 bOk = false;
2302 for (auto* unit : m_units) {
2303 if ((unit != nullptr) && !unit->isFinished()) {
2304 Route route = unit->getRoute();
2305 std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2306
2307 if (portName == key) {
2308 Bottle* sched_prop = sched.find("sched").asList();
2309 if (sched_prop != nullptr) {
2310 int prio = -1;
2311 int policy = -1;
2312 if (sched_prop->check("priority")) {
2313 prio = sched_prop->find("priority").asInt32();
2314 }
2315 if (sched_prop->check("policy")) {
2316 policy = sched_prop->find("policy").asInt32();
2317 }
2318 bOk = (unit->setPriority(prio, policy) != -1);
2319 } else {
2320 bOk = false;
2321 }
2322 break;
2323 }
2324 }
2325 }
2326 }
2327 }
2328 // check if we need to set the packet QOS policy
2329 // e.g., "prop set /portname (qos ((priority HIGH)))"
2330 // e.g., "prop set /portname (qos ((dscp AF12)))"
2331 // e.g., "prop set /portname (qos ((tos 12)))"
2332 if (!qos.isNull()) {
2333 if ((!key.empty()) && (key[0] == '/')) {
2334 bOk = false;
2335 for (auto* unit : m_units) {
2336 if ((unit != nullptr) && !unit->isFinished()) {
2337 Route route = unit->getRoute();
2338 std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2339 if (portName == key) {
2340 Bottle* qos_prop = qos.find("qos").asList();
2341 if (qos_prop != nullptr) {
2342 int tos = -1;
2343 if (qos_prop->check("priority")) {
2344 // set the packet TOS value on the socket based on some predefined
2345 // priority levels.
2346 // the expected levels are: LOW, NORM, HIGH, CRIT
2347 NetInt32 priority = qos_prop->find("priority").asVocab32();
2348 int dscp;
2349 switch (priority) {
2350 case yarp::os::createVocab32('L', 'O', 'W'):
2351 dscp = 10;
2352 break;
2353 case yarp::os::createVocab32('N', 'O', 'R', 'M'):
2354 dscp = 0;
2355 break;
2356 case yarp::os::createVocab32('H', 'I', 'G', 'H'):
2357 dscp = 36;
2358 break;
2359 case yarp::os::createVocab32('C', 'R', 'I', 'T'):
2360 dscp = 44;
2361 break;
2362 default:
2363 dscp = -1;
2364 }
2365 if (dscp >= 0) {
2366 tos = (dscp << 2);
2367 }
2368 } else if (qos_prop->check("dscp")) {
2369 // Set the packet TOS value on the socket based on the DSCP level
2371 int dscp = -1;
2373 auto dscp_val = qos_prop->find("dscp");
2374 if (dscp_val.isInt32()) {
2375 dscp = dscp_val.asInt32();
2376 }
2377 } else {
2378 dscp = static_cast<int>(dscp_class);
2379 }
2380 if ((dscp >= 0) && (dscp < 64)) {
2381 tos = (dscp << 2);
2382 }
2383 } else if (qos_prop->check("tos")) {
2384 // Set the TOS value directly
2385 auto tos_val = qos_prop->find("tos");
2386 if (tos_val.isInt32()) {
2387 tos = tos_val.asInt32();
2388 }
2389 }
2390 if (tos >= 0) {
2391 bOk = setTypeOfService(unit, tos);
2392 }
2393 } else {
2394 bOk = false;
2395 }
2396 break;
2397 }
2398 }
2399 }
2400 }
2401 }
2402 }
2404 result.addVocab32((bOk) ? "ok" : "fail");
2405 return result;
2406 };
2407
2408 // NOTE: YARP partially supports the ROS Slave API https://wiki.ros.org/ROS/Slave_API
2409
2410 auto handleAdminRosPublisherUpdateCmd = [this](const std::string& topic, Bottle* pubs) {
2411 // When running against a ROS name server, we need to
2412 // support ROS-style callbacks for connecting publishers
2413 // with subscribers. Note: this should not be necessary
2414 // anymore, now that a dedicated yarp::os::Node class
2415 // has been implemented, but is still needed for older
2416 // ways of interfacing with ROS without using dedicated
2417 // node ports.
2418 Bottle result;
2419 if (pubs != nullptr) {
2421 for (size_t i = 0; i < pubs->size(); i++) {
2422 std::string pub = pubs->get(i).asString();
2423 listed.put(pub, 1);
2424 }
2426 {
2427 // Critical section
2428 std::lock_guard<std::mutex> lock(m_stateMutex);
2429 for (auto* unit : m_units) {
2430 if ((unit != nullptr) && unit->isPupped()) {
2431 std::string me = unit->getPupString();
2432 present.put(me, 1);
2433 if (!listed.check(me)) {
2434 unit->setDoomed();
2435 }
2436 }
2437 }
2438 }
2439 for (size_t i = 0; i < pubs->size(); i++) {
2440 std::string pub = pubs->get(i).asString();
2441 if (!present.check(pub)) {
2442 yCIDebug(PORTCORE, getName(), "ROS ADD %s", pub.c_str());
2443 Bottle req;
2444 Bottle reply;
2445 req.addString("requestTopic");
2446 NestedContact nc(getName());
2447 req.addString(nc.getNodeName());
2448 req.addString(topic);
2449 Bottle& lst = req.addList();
2450 Bottle& sublst = lst.addList();
2451 sublst.addString("TCPROS");
2452 yCIDebug(PORTCORE, getName(), "Sending [%s] to %s", req.toString().c_str(), pub.c_str());
2454 if (!__pc_rpc(c, "xmlrpc", req, reply, false)) {
2455 fprintf(stderr, "Cannot connect to ROS subscriber %s\n", pub.c_str());
2456 // show diagnosics
2457 __pc_rpc(c, "xmlrpc", req, reply, true);
2458 __tcp_check(c);
2459 } else {
2460 Bottle* pref = reply.get(2).asList();
2461 std::string hostname;
2462 std::string carrier;
2463 int portnum = 0;
2464 if (reply.get(0).asInt32() != 1) {
2465 fprintf(stderr, "Failure looking up topic %s: %s\n", topic.c_str(), reply.toString().c_str());
2466 } else if (pref == nullptr) {
2467 fprintf(stderr, "Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2468 } else if (pref->get(0).asString() != "TCPROS") {
2469 fprintf(stderr, "Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->get(0).asString().c_str());
2470 } else {
2471 Value hostname2 = pref->get(1);
2472 Value portnum2 = pref->get(2);
2473 hostname = hostname2.asString();
2474 portnum = portnum2.asInt32();
2475 carrier = "tcpros+role.pub+topic.";
2476 carrier += topic;
2477 yCIDebug(PORTCORE, getName(), "topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2478 }
2479 if (portnum != 0) {
2480 Contact addr(hostname, portnum);
2481 OutputProtocol* op = nullptr;
2482 Route r = Route(getName(), pub, carrier);
2483 op = Carriers::connect(addr);
2484 if (op == nullptr) {
2485 fprintf(stderr, "NO CONNECTION\n");
2486 std::exit(1);
2487 } else {
2488 op->attachPort(m_contactable);
2489 op->open(r);
2490 }
2491 Route route = op->getRoute();
2492 route.swapNames();
2493 op->rename(route);
2494 InputProtocol* ip = &(op->getInput());
2495 {
2496 // Critical section
2497 std::lock_guard<std::mutex> lock(m_stateMutex);
2498 PortCoreUnit* unit = new PortCoreInputUnit(*this,
2499 getNextIndex(),
2500 ip,
2501 true);
2502 yCIAssert(PORTCORE, getName(), unit != nullptr);
2503 unit->setPupped(pub);
2504 unit->start();
2505 m_units.push_back(unit);
2506 }
2507 }
2508 }
2509 }
2510 }
2511 }
2512 result.addInt32(1);
2513 result.addString("ok");
2514 return result;
2515 };
2516
2517 auto handleAdminRosRequestTopicCmd = [this]() {
2518 // ROS-style query for topics.
2519 Bottle result;
2520 result.addInt32(1);
2521 NestedContact nc(getName());
2522 result.addString(nc.getNodeName());
2523 Bottle& lst = result.addList();
2525 lst.addString("TCPROS");
2526 lst.addString(addr.getHost());
2527 lst.addInt32(addr.getPort());
2528 return result;
2529 };
2530
2531 auto handleAdminRosGetPidCmd = []() {
2532 // ROS-style query for PID.
2533 Bottle result;
2534 result.addInt32(1);
2535 result.addString("");
2536 result.addInt32(yarp::os::impl::getpid());
2537 return result;
2538 };
2539
2540 auto handleAdminRosGetBusInfoCmd = []() {
2541 // ROS-style query for bus information - we support this
2542 // in yarp::os::Node but not otherwise.
2543 Bottle result;
2544 result.addInt32(1);
2545 result.addString("");
2546 result.addList().addList();
2547 return result;
2548 };
2549
2550 auto handleAdminUnknownCmd = [this](const Bottle& cmd) {
2551 Bottle result;
2552 bool ok = false;
2553 if (m_adminReader != nullptr) {
2554 DummyConnector con;
2555 cmd.write(con.getWriter());
2556 lockCallback();
2557 ok = m_adminReader->read(con.getReader());
2559 if (ok) {
2560 result.read(con.getReader());
2561 }
2562 }
2563 if (!ok) {
2564 result.addVocab32("fail");
2565 result.addString("send [help] for list of valid commands");
2566 }
2567 return result;
2568 };
2569
2570 const PortCoreCommand command = parseCommand(cmd.get(0));
2571 switch (command) {
2572 case PortCoreCommand::Help:
2573 result = handleAdminHelpCmd();
2574 break;
2575 case PortCoreCommand::Ver:
2576 result = handleAdminVerCmd();
2577 break;
2578 case PortCoreCommand::Pray:
2579 result = handleAdminPrayCmd();
2580 break;
2581 case PortCoreCommand::Add: {
2582 std::string output = cmd.get(1).asString();
2583 std::string carrier = cmd.get(2).asString();
2584 result = handleAdminAddCmd(std::move(output), carrier);
2585 } break;
2586 case PortCoreCommand::Del: {
2587 const std::string dest = cmd.get(1).asString();
2588 result = handleAdminDelCmd(dest);
2589 } break;
2590 case PortCoreCommand::Atch: {
2591 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32());
2592 Property prop(cmd.get(2).asString().c_str());
2593 result = handleAdminAtchCmd(direction, std::move(prop));
2594 } break;
2595 case PortCoreCommand::Dtch: {
2596 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32());
2597 result = handleAdminDtchCmd(direction);
2598 } break;
2599 case PortCoreCommand::List: {
2600 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2601 const std::string target = cmd.get(2).asString();
2602 result = handleAdminListCmd(direction, target);
2603 } break;
2604 case PortCoreCommand::Set: {
2605 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2606 const std::string target = cmd.get(2).asString();
2607 yarp::os::Property property;
2608 property.fromString(cmd.toString());
2609 switch (direction) {
2610 case PortCoreConnectionDirection::In:
2611 result = handleAdminSetInCmd(target, property);
2612 break;
2613 case PortCoreConnectionDirection::Out:
2614 result = handleAdminSetOutCmd(target, property);
2615 break;
2616 case PortCoreConnectionDirection::Error:
2617 yCIAssert(PORTCORE, getName(), false); // Should never happen (error is out)
2618 break;
2619 }
2620 } break;
2621 case PortCoreCommand::Get: {
2622 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2623 const std::string target = cmd.get(2).asString();
2624 switch (direction) {
2625 case PortCoreConnectionDirection::In:
2626 result = handleAdminGetInCmd(target);
2627 break;
2628 case PortCoreConnectionDirection::Out:
2629 result = handleAdminGetOutCmd(target);
2630 break;
2631 case PortCoreConnectionDirection::Error:
2632 yCIAssert(PORTCORE, getName(), false); // Should never happen (error is out)
2633 break;
2634 }
2635 } break;
2636 case PortCoreCommand::Prop: {
2637 PortCorePropertyAction action = parsePropertyAction(cmd.get(1).asVocab32());
2638 const std::string key = cmd.get(2).asString();
2639 // Set/get arbitrary properties on a port.
2640 switch (action) {
2641 case PortCorePropertyAction::Get:
2642 result = handleAdminPropGetCmd(key);
2643 break;
2644 case PortCorePropertyAction::Set: {
2645 const Value& value = cmd.get(3);
2646 const Bottle& process = cmd.findGroup("process");
2647 const Bottle& sched = cmd.findGroup("sched");
2648 const Bottle& qos = cmd.findGroup("qos");
2649 result = handleAdminPropSetCmd(key, value, process, sched, qos);
2650 } break;
2651 case PortCorePropertyAction::Error:
2652 result.addVocab32("fail");
2653 result.addString("property action not known");
2654 break;
2655 }
2656 } break;
2657 case PortCoreCommand::RosPublisherUpdate: {
2658 yCIDebug(PORTCORE, getName(), "publisherUpdate! --> %s", cmd.toString().c_str());
2659 // std::string caller_id = cmd.get(1).asString(); // Currently unused
2660 std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString());
2661 Bottle* pubs = cmd.get(3).asList();
2662 result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2663 reader.requestDrop(); // ROS needs us to close down.
2664 } break;
2665 case PortCoreCommand::RosRequestTopic:
2666 yCIDebug(PORTCORE, getName(), "requestTopic! --> %s", cmd.toString().c_str());
2667 // std::string caller_id = cmd.get(1).asString(); // Currently unused
2668 // std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString()); // Currently unused
2669 // Bottle protocols = cmd.get(3).asList(); // Currently unused
2671 reader.requestDrop(); // ROS likes to close down.
2672 break;
2673 case PortCoreCommand::RosGetPid:
2674 // std::string caller_id = cmd.get(1).asString(); // Currently unused
2675 result = handleAdminRosGetPidCmd();
2676 reader.requestDrop(); // ROS likes to close down.
2677 break;
2678 case PortCoreCommand::RosGetBusInfo:
2679 // std::string caller_id = cmd.get(1).asString(); // Currently unused
2680 result = handleAdminRosGetBusInfoCmd();
2681 reader.requestDrop(); // ROS likes to close down.
2682 break;
2683 case PortCoreCommand::Unknown:
2684 result = handleAdminUnknownCmd(cmd);
2685 break;
2686 }
2687
2688 ConnectionWriter* writer = reader.getWriter();
2689 if (writer != nullptr) {
2690 result.write(*writer);
2691 }
2692
2693 return true;
2694}
2695
2696
2697bool PortCore::setTypeOfService(PortCoreUnit* unit, int tos)
2698{
2699 if (unit == nullptr) {
2700 return false;
2701 }
2702
2703 yCIDebug(PORTCORE, getName(), "Trying to set TOS = %d", tos);
2704
2705 if (unit->isOutput()) {
2706 auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2707 if (outUnit != nullptr) {
2708 OutputProtocol* op = outUnit->getOutPutProtocol();
2709 if (op != nullptr) {
2710 yCIDebug(PORTCORE, getName(), "Trying to set TOS = %d on output unit", tos);
2711 bool ok = op->getOutputStream().setTypeOfService(tos);
2712 if (!ok) {
2713 yCIWarning(PORTCORE, getName(), "Setting TOS on output unit failed");
2714 }
2715 return ok;
2716 }
2717 }
2718 }
2719
2720 // Some of the input units may have output stream object to write back to
2721 // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2722 // also configured for them.
2723
2724
2725 if (unit->isInput()) {
2726 auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2727 if (inUnit != nullptr) {
2728 InputProtocol* ip = inUnit->getInPutProtocol();
2729 if ((ip != nullptr) && ip->getOutput().isOk()) {
2730 yCIDebug(PORTCORE, getName(), "Trying to set TOS = %d on input unit", tos);
2731 bool ok = ip->getOutput().getOutputStream().setTypeOfService(tos);
2732 if (!ok) {
2733 yCIWarning(PORTCORE, getName(), "Setting TOS on input unit failed");
2734 }
2735 return ok;
2736 }
2737 }
2738 }
2739 // if there is nothing to be set, returns true
2740 return true;
2741}
2742
2743int PortCore::getTypeOfService(PortCoreUnit* unit)
2744{
2745 if (unit == nullptr) {
2746 return -1;
2747 }
2748
2749 if (unit->isOutput()) {
2750 auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2751 if (outUnit != nullptr) {
2752 OutputProtocol* op = outUnit->getOutPutProtocol();
2753 if (op != nullptr) {
2754 return op->getOutputStream().getTypeOfService();
2755 }
2756 }
2757 }
2758
2759 // Some of the input units may have output stream object to write back to
2760 // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2761 // also configured for them.
2762
2763
2764 if (unit->isInput()) {
2765 auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2766 if (inUnit != nullptr) {
2767 InputProtocol* ip = inUnit->getInPutProtocol();
2768 if ((ip != nullptr) && ip->getOutput().isOk()) {
2769 return ip->getOutput().getOutputStream().getTypeOfService();
2770 }
2771 }
2772 }
2773 return -1;
2774}
2775
2776// attach a portmonitor plugin to the port or to a specific connection
2777bool PortCore::attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg)
2778{
2779 // attach to the current port
2781 if (portmonitor == nullptr) {
2782 errMsg = "Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2783 return false;
2784 }
2785
2786 if (isOutput) {
2787 detachPortMonitor(true);
2788 prop.put("source", getName());
2789 prop.put("destination", "");
2790 prop.put("sender_side", 1);
2791 prop.put("receiver_side", 0);
2792 prop.put("carrier", "");
2793 m_modifier.outputMutex.lock();
2794 m_modifier.outputModifier = portmonitor;
2795 if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2796 m_modifier.releaseOutModifier();
2797 errMsg = "Failed to configure the portmonitor plug-in";
2798 m_modifier.outputMutex.unlock();
2799 return false;
2800 }
2801 m_modifier.outputMutex.unlock();
2802 } else {
2803 detachPortMonitor(false);
2804 prop.put("source", "");
2805 prop.put("destination", getName());
2806 prop.put("sender_side", 0);
2807 prop.put("receiver_side", 1);
2808 prop.put("carrier", "");
2809 m_modifier.inputMutex.lock();
2810 m_modifier.inputModifier = portmonitor;
2811 if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2812 m_modifier.releaseInModifier();
2813 errMsg = "Failed to configure the portmonitor plug-in";
2814 m_modifier.inputMutex.unlock();
2815 return false;
2816 }
2817 m_modifier.inputMutex.unlock();
2818 }
2819 return true;
2820}
2821
2822// detach the portmonitor from the port or specific connection
2823bool PortCore::detachPortMonitor(bool isOutput)
2824{
2825 if (isOutput) {
2826 m_modifier.outputMutex.lock();
2827 m_modifier.releaseOutModifier();
2828 m_modifier.outputMutex.unlock();
2829 } else {
2830 m_modifier.inputMutex.lock();
2831 m_modifier.releaseInModifier();
2832 m_modifier.inputMutex.unlock();
2833 }
2834 return true;
2835}
2836
2837bool PortCore::setParamPortMonitor(const yarp::os::Property& param,
2838 bool isOutput,
2839 std::string& errMsg)
2840{
2841 if (isOutput) {
2842 m_modifier.outputMutex.lock();
2843 if (m_modifier.outputModifier == nullptr) {
2844 errMsg = "No port modifier is attached to the output";
2845 m_modifier.outputMutex.unlock();
2846 return false;
2847 }
2848 m_modifier.outputModifier->setCarrierParams(param);
2849 m_modifier.outputMutex.unlock();
2850 } else {
2851 m_modifier.inputMutex.lock();
2852 if (m_modifier.inputModifier == nullptr) {
2853 errMsg = "No port modifier is attached to the input";
2854 m_modifier.inputMutex.unlock();
2855 return false;
2856 }
2857 m_modifier.inputModifier->setCarrierParams(param);
2858 m_modifier.inputMutex.unlock();
2859 }
2860 return true;
2861}
2862
2863bool PortCore::getParamPortMonitor(yarp::os::Property& param,
2864 bool isOutput,
2865 std::string& errMsg)
2866{
2867 if (isOutput) {
2868 m_modifier.outputMutex.lock();
2869 if (m_modifier.outputModifier == nullptr) {
2870 errMsg = "No port modifier is attached to the output";
2871 m_modifier.outputMutex.unlock();
2872 return false;
2873 }
2874 m_modifier.outputModifier->getCarrierParams(param);
2875 m_modifier.outputMutex.unlock();
2876 } else {
2877 m_modifier.inputMutex.lock();
2878 if (m_modifier.inputModifier == nullptr) {
2879 errMsg = "No port modifier is attached to the input";
2880 m_modifier.inputMutex.unlock();
2881 return false;
2882 }
2883 m_modifier.inputModifier->getCarrierParams(param);
2884 m_modifier.inputMutex.unlock();
2885 }
2886 return true;
2887}
2888
2890{
2891 YARP_UNUSED(active);
2892 if (unit != nullptr) {
2893 bool isLog = (!unit->getMode().empty());
2894 if (isLog) {
2895 m_logNeeded = true;
2896 }
2897 }
2898}
2899
2900bool PortCore::setProcessSchedulingParam(int priority, int policy)
2901{
2902#if defined(__linux__)
2903 // set the sched properties of all threads within the process
2904 struct sched_param sch_param;
2905 sch_param.__sched_priority = priority;
2906
2907 DIR* dir;
2908 char path[PATH_MAX];
2909 sprintf(path, "/proc/%d/task/", yarp::os::impl::getpid());
2910
2911 dir = opendir(path);
2912 if (dir == nullptr) {
2913 return false;
2914 }
2915
2916 struct dirent* d;
2917 char* end;
2918 long tid = 0;
2919 bool ret = true;
2920 while ((d = readdir(dir)) != nullptr) {
2921 if (isdigit(static_cast<unsigned char>(*d->d_name)) == 0) {
2922 continue;
2923 }
2924
2925 tid = strtol(d->d_name, &end, 10);
2926 if (d->d_name == end || ((end != nullptr) && (*end != 0))) {
2927 closedir(dir);
2928 return false;
2929 }
2930 ret &= (sched_setscheduler(static_cast<pid_t>(tid), policy, &sch_param) == 0);
2931 }
2932 closedir(dir);
2933 return ret;
2934#elif defined(YARP_HAS_ACE) // for other platforms
2935 // TODO: Check how to set the scheduling properties of all process's threads in Windows
2936 ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2937 int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2938 return (ret != -1);
2939#else
2940 return false;
2941#endif
2942}
2943
2945{
2946 m_stateMutex.lock();
2947 if (!readOnly) {
2948 if (m_prop == nullptr) {
2949 m_prop = new Property();
2950 }
2951 }
2952 return m_prop;
2953}
2954
2956{
2957 YARP_UNUSED(prop);
2958 m_stateMutex.unlock();
2959}
2960
2961bool PortCore::removeIO(const Route& route, bool synch)
2962{
2963 return removeUnit(route, synch);
2964}
2965
2966void PortCore::setName(const std::string& name)
2967{
2968 m_name = name;
2969}
2970
2972{
2973 return m_name;
2974}
2975
2976int PortCore::getNextIndex()
2977{
2978 int result = m_counter;
2979 m_counter++;
2980 if (m_counter < 0) {
2981 m_counter = 1;
2982 }
2983 return result;
2984}
2985
2987{
2988 return m_address;
2989}
2990
2991void PortCore::resetPortName(const std::string& str)
2992{
2993 m_address.setName(str);
2994}
2995
2997{
2998 return m_readableCreator;
2999}
3000
3002{
3003 m_controlRegistration = flag;
3004}
3005
3007{
3008 return m_listening.load();
3009}
3010
3012{
3013 return m_manual;
3014}
3015
3017{
3018 return m_interrupted;
3019}
3020
3021void PortCore::setTimeout(float timeout)
3022{
3023 m_timeout = timeout;
3024}
3025
3026bool PortCore::setCallbackLock(std::mutex* mutex)
3027{
3029 if (mutex != nullptr) {
3030 m_mutex = mutex;
3031 m_mutexOwned = false;
3032 } else {
3033 m_mutex = new std::mutex;
3034 m_mutexOwned = true;
3035 }
3036 return true;
3037}
3038
3040{
3041 if (m_mutexOwned && (m_mutex != nullptr)) {
3042 delete m_mutex;
3043 }
3044 m_mutex = nullptr;
3045 m_mutexOwned = false;
3046 return true;
3047}
3048
3050{
3051 if (m_mutex == nullptr) {
3052 return false;
3053 }
3054 m_mutex->lock();
3055 return true;
3056}
3057
3059{
3060 if (m_mutex == nullptr) {
3061 return true;
3062 }
3063 return m_mutex->try_lock();
3064}
3065
3067{
3068 if (m_mutex == nullptr) {
3069 return;
3070 }
3071 m_mutex->unlock();
3072}
3073
3078
3080{
3081 m_typeMutex.lock();
3082 if (!m_checkedType) {
3083 if (!m_type.isValid()) {
3084 m_type = reader.getReadType();
3085 }
3086 m_checkedType = true;
3087 }
3088 m_typeMutex.unlock();
3089}
3090
3092{
3093 m_typeMutex.lock();
3094 Type t = m_type;
3095 m_typeMutex.unlock();
3096 return t;
3097}
3098
3100{
3101 m_typeMutex.lock();
3102 m_type = typ;
3103 m_typeMutex.unlock();
3104}
float t
bool ret
#define yAssert(x)
Definition Log.h:388
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
Definition PortCore.h:40
#define PORTCORE_SEND_LOG
Definition PortCore.h:35
#define PORTCORE_IS_RPC
Definition PortCore.h:39
#define PORTCORE_IS_OUTPUT
Definition PortCore.h:41
#define PORTCORE_SEND_NORMAL
Definition PortCore.h:34
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
Definition Bottle.cpp:309
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
Definition Bottle.cpp:164
void fromString(const std::string &text)
Initializes bottle from a string.
Definition Bottle.cpp:204
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Definition Bottle.cpp:188
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition Bottle.cpp:182
size_type size() const
Gets the number of elements in the bottle.
Definition Bottle.cpp:251
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition Bottle.cpp:240
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition Bottle.cpp:246
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition Bottle.cpp:293
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition Bottle.cpp:230
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition Bottle.cpp:211
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition Carrier.h:44
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
void getCarrierParams(Property &params) const override
Get carrier configuration and deliver it by port administrative commands.
Definition Carrier.cpp:127
virtual bool configureFromProperty(yarp::os::Property &options)
Definition Carrier.cpp:115
bool isPush() const override
Check if carrier is "push" or "pull" style.
Definition Carrier.cpp:23
bool acceptOutgoingData(const PortWriter &writer) override
Determine whether outgoing data should be accepted.
Definition Carrier.cpp:102
const PortWriter & modifyOutgoingData(const PortWriter &writer) override
Modify outgoing payload data, if appropriate.
Definition Carrier.cpp:77
void setCarrierParams(const Property &params) override
Configure carrier from port administrative commands.
Definition Carrier.cpp:122
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
Definition Carriers.cpp:250
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
Definition Carriers.cpp:238
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition Carriers.cpp:233
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
Definition Carriers.cpp:282
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
Preferences for how to communicate with a contact.
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
bool quiet
Suppress all outputs and warnings.
std::string carrier
Request that communication be made using a particular carrier.
Represents how to reach a part of a YARP network.
Definition Contact.h:33
bool isValid() const
Checks if a Contact is tagged as valid.
Definition Contact.cpp:298
std::string getRegName() const
Get the name associated with this Contact.
Definition Contact.cpp:217
static Contact fromString(const std::string &txt)
Factory method.
Definition Contact.cpp:139
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition Contact.cpp:239
void setName(const std::string &name)
Set the name associated with this Contact.
Definition Contact.cpp:222
void setTimeout(float timeout)
Set timeout for this Contact.
Definition Contact.cpp:282
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition Contact.cpp:250
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition Contact.cpp:228
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
virtual InputProtocol * read()=0
Block and wait for someone to talk to us.
virtual Contact getLocalAddress() const
Get address after open(), if more specific that the address provided to open() - otherwise an invalid...
Definition Face.h:82
virtual OutputProtocol * write(const Contact &address)=0
Try to reach out and talk to someone.
virtual void close()=0
Stop listening.
The input side of an active connection between two ports.
virtual void close()=0
Negotiate an end to operations.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
Simple abstraction for a YARP port name.
Definition Name.h:18
Contact toAddress() const
Create an address from the name.
Definition Name.cpp:27
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition Name.cpp:44
A placeholder for rich contact information.
std::string getNodeName() const
static bool initialized()
Returns true if YARP has been fully initialized.
Definition Network.cpp:1382
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
Definition Network.cpp:1054
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition Network.cpp:1023
static NameStore * getQueryBypass()
Definition Network.cpp:1405
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition Network.cpp:995
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition Network.cpp:1511
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
Definition Network.cpp:1942
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition Network.cpp:700
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition Network.cpp:1219
The output side of an active connection between two ports.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual const Route & getRoute() const =0
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
Simple specification of the minimum functions needed from output streams.
Information about a port connection or event.
Definition PortInfo.h:25
std::string message
A human-readable description of contents.
Definition PortInfo.h:68
int tag
Type of information.
Definition PortInfo.h:47
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition PortInfo.h:39
@ PORTINFO_MISC
Unspecified information.
Definition PortInfo.h:42
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
Definition PortReport.h:25
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
Definition Property.h:33
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition Property.cpp:987
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
Definition QosStyle.h:45
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
Definition QosStyle.cpp:155
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
Definition Route.h:28
const std::string & getToName() const
Get the destination of the route.
Definition Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition Route.cpp:93
void swapNames()
Swap from and to names.
Definition Route.cpp:133
void setToContact(const Contact &toContact)
Set the destination contact of the route.
Definition Route.cpp:118
An InputStream that reads from a string.
void add(const std::string &txt)
An OutputStream that produces a string.
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
bool isValid() const
Definition Type.cpp:154
std::string getName() const
Definition Type.cpp:139
A single value (typically within a Bottle).
Definition Value.h:43
virtual bool isString() const
Checks if value is a string.
Definition Value.cpp:156
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
Definition Value.cpp:228
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition Value.cpp:204
virtual Bottle * asList() const
Get list value.
Definition Value.cpp:240
virtual std::string asString() const
Get string value.
Definition Value.cpp:234
A helper for creating cached object descriptions.
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
A helper for recording entire message/reply transactions.
Manager for a single input to a port.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
bool checkPacket(PortCorePacket *packet)
Move a packet to the inactive state if it has finished being sent on all connections.
This manages a single threaded resource related to a single input or output connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
Definition PortCore.cpp:148
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
Definition PortCore.cpp:165
bool start() override
Begin main thread.
Definition PortCore.cpp:277
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
Definition PortCore.cpp:325
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
Definition PortCore.cpp:331
bool setCallbackLock(std::mutex *mutex=nullptr)
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
Definition PortCore.cpp:68
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
Definition PortCore.cpp:308
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
Definition PortCore.cpp:140
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
Definition PortCore.cpp:532
void close() override
Shut down port.
Definition PortCore.cpp:264
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
yarp::os::Type getType()
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
unsigned int getFlags()
Check current configuration of port.
Definition PortCore.h:298
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
Definition PortCore.cpp:156
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition PortCore.cpp:842
This is the heart of a yarp port.
Definition PortCore.h:102
yarp::os::Carrier * outputModifier
Definition PortCore.h:135
yarp::os::Carrier * inputModifier
Definition PortCore.h:136
Lets Readable objects read from the underlying InputStream associated with the connection between two...
int join(double seconds=-1)
#define yCIAssert(component, id, x)
#define yCWarning(component,...)
#define yCIError(component, id,...)
#define yCITrace(component, id,...)
#define yCIDebug(component, id,...)
#define yCIWarning(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition environment.h:66
std::string to_string(IntegerType x)
Definition numeric.h:115
std::int32_t vocab32_t
Definition numeric.h:78
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition NetInt32.h:29
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
Definition Vocab.h:27
The main, catch-all namespace for YARP.
Definition dirs.h:16
The PlatformInfo struct holds the operating system information.
Definition SystemInfo.h:80
The ProcessInfo struct provides the operating system process information.
Definition SystemInfo.h:112
#define YARP_UNUSED(var)
Definition api.h:162