YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
PortCore.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
10
11#include <yarp/os/Bottle.h>
14#include <yarp/os/Name.h>
15#include <yarp/os/Network.h>
16#include <yarp/os/PortInfo.h>
18#include <yarp/os/SystemInfo.h>
19#include <yarp/os/Time.h>
27
28#include <cstdio>
29#include <functional>
30#include <random>
31#include <regex>
32#include <vector>
33
34#ifdef YARP_HAS_ACE
35# include <ace/INET_Addr.h>
36# include <ace/Sched_Params.h>
37// In one the ACE headers there is a definition of "main" for WIN32
38# ifdef main
39# undef main
40# endif
41#endif
42
43#if defined(__linux__) // used for configuring scheduling properties
44# include <dirent.h>
45# include <sys/types.h>
46# include <unistd.h>
47#endif
48
49
50using namespace yarp::os::impl;
51using namespace yarp::os;
52using namespace yarp;
53
54namespace {
55YARP_OS_LOG_COMPONENT(PORTCORE, "yarp.os.impl.PortCore")
56} // namespace
57
58PortCore::PortCore() = default;
59
65
66
67bool PortCore::listen(const Contact& address, bool shouldAnnounce)
68{
69 yCIDebug(PORTCORE, getName(), "Starting listening on %s", address.toURI().c_str());
70 // If we're using ACE, we really need to have it initialized before
71 // this point.
73 yCIError(PORTCORE, getName(), "YARP not initialized; create a yarp::os::Network object before using ports");
74 return false;
75 }
76
77 yCITrace(PORTCORE, getName(), "listen");
78
79 {
80 // Critical section
81 std::lock_guard<std::mutex> lock(m_stateMutex);
82
83 // This method assumes we are not already on the network.
84 // We can assume this because it is not a user-facing class,
85 // and we carefully never call this method again without
86 // calling close().
87 yCIAssert(PORTCORE, getName(), !m_listening);
88 yCIAssert(PORTCORE, getName(), !m_running);
89 yCIAssert(PORTCORE, getName(), !m_closing.load());
90 yCIAssert(PORTCORE, getName(), !m_finished.load());
91 yCIAssert(PORTCORE, getName(), m_face == nullptr);
92
93 // Try to put the port on the network, using the user-supplied
94 // address (which may be incomplete). You can think of
95 // this as getting a server socket.
96 m_address = address;
97 setName(address.getRegName());
98 if (m_timeout > 0) {
99 m_address.setTimeout(m_timeout);
100 }
101 m_face = Carriers::listen(m_address);
102
103 // We failed, abort.
104 if (m_face == nullptr) {
105 return false;
106 }
107
108 // Update our address if it was incomplete.
109 if (m_address.getPort() <= 0) {
110 m_address = m_face->getLocalAddress();
111 if (m_address.getRegName() == "...") {
112 m_address.setName(std::string("/") + m_address.getHost() + "_" + yarp::conf::numeric::to_string(m_address.getPort()));
113 setName(m_address.getRegName());
114 }
115 }
116
117 // Move into listening phase
118 m_listening.store(true);
119 }
120
121 // Now that we are on the network, we can let the name server know this.
122 if (shouldAnnounce) {
124 std::string portName = address.getRegName();
125 Bottle cmd;
126 Bottle reply;
127 cmd.addString("announce");
128 cmd.addString(portName);
129 ContactStyle style;
130 NetworkBase::writeToNameServer(cmd, reply, style);
131 }
132 }
133
134 // Success!
135 return true;
136}
137
138
140{
141 // Don't even try to do this when the port is hot, it'll burn you
142 yCIAssert(PORTCORE, getName(), !m_running.load());
143 yCIAssert(PORTCORE, getName(), m_reader == nullptr);
144 m_reader = &reader;
145}
146
148{
149 // Don't even try to do this when the port is hot, it'll burn you
150 yCIAssert(PORTCORE, getName(), !m_running.load());
151 yCIAssert(PORTCORE, getName(), m_adminReader == nullptr);
152 m_adminReader = &reader;
153}
154
156{
157 // Don't even try to do this when the port is hot, it'll burn you
158 yCIAssert(PORTCORE, getName(), !m_running.load());
159 yCIAssert(PORTCORE, getName(), m_readableCreator == nullptr);
160 m_readableCreator = &creator;
161}
162
163
165{
166 yCITrace(PORTCORE, getName(), "run");
167
168 // This is the server thread for the port. We listen on
169 // the network and handle any incoming connections.
170 // We don't touch those connections, just shove them
171 // in a list and move on. It is important that this
172 // thread doesn't make a connecting client wait just
173 // because some other client is slow.
174
175 // We assume that listen() has succeeded and that
176 // start() has been called.
177 yCIAssert(PORTCORE, getName(), m_listening.load());
178 yCIAssert(PORTCORE, getName(), !m_running.load());
179 yCIAssert(PORTCORE, getName(), !m_closing.load());
180 yCIAssert(PORTCORE, getName(), !m_finished.load());
181 yCIAssert(PORTCORE, getName(), m_starting.load());
182
183 // Enter running phase
184 {
185 // Critical section
186 std::lock_guard<std::mutex> lock(m_stateMutex);
187 m_running.store(true);
188 m_starting.store(false);
189 }
190
191 // Notify the start() thread that the run() thread is running
192 m_stateCv.notify_one();
193
194 yCITrace(PORTCORE, getName(), "run running");
195
196 // Enter main loop, where we block on incoming connections.
197 // The loop is exited when PortCore#closing is set. One last
198 // connection will be needed to de-block this thread and ensure
199 // that it checks PortCore#closing.
200 bool shouldStop = false;
201 while (!shouldStop) {
202
203 // Block and wait for a connection
204 InputProtocol* ip = m_face->read();
205
206 {
207 // Critical section
208 std::lock_guard<std::mutex> lock(m_stateMutex);
209
210 // Attach the connection to this port and update its timeout setting
211 if (ip != nullptr) {
212 ip->attachPort(m_contactable);
213 yCIDebug(PORTCORE, getName(), "received something");
214 if (m_timeout > 0) {
215 ip->setTimeout(m_timeout);
216 }
217 }
218
219 // Check whether we should shut down
220 shouldStop |= m_closing.load();
221
222 // Increment a global count of connection events
223 m_events++;
224 }
225
226 // It we are not shutting down, spin off the connection.
227 // It starts life as an input connection (although it
228 // may later morph into an output).
229 if (!shouldStop) {
230 if (ip != nullptr) {
231 addInput(ip);
232 }
233 yCIDebug(PORTCORE, getName(), "spun off a connection");
234 ip = nullptr;
235 }
236
237 // If the connection wasn't spun off, just shut it down.
238 if (ip != nullptr) {
239 ip->close();
240 delete ip;
241 ip = nullptr;
242 }
243
244 // Remove any defunct connections.
245 reapUnits();
246
247 // Notify anyone listening for connection changes.
248 std::lock_guard<std::mutex> lock(m_stateMutex);
249 m_connectionListeners = 0;
250 m_connectionChangeCv.notify_all();
251 }
252
253 yCITrace(PORTCORE, getName(), "run closing");
254
255 // The server thread is shutting down.
256 std::lock_guard<std::mutex> lock(m_stateMutex);
257 m_connectionListeners = 0;
258 m_connectionChangeCv.notify_all();
259 m_finished.store(true);
260}
261
262
264{
265 closeMain();
266
267 if (m_prop != nullptr) {
268 delete m_prop;
269 m_prop = nullptr;
270 }
271 m_modifier.releaseOutModifier();
272 m_modifier.releaseInModifier();
273}
274
275
277{
278 yCITrace(PORTCORE, getName(), "start");
279
280 // This wait will, on success, be matched by a post in run()
281 std::unique_lock<std::mutex> lock(m_stateMutex);
282
283 // We assume that listen() has been called.
284 yCIAssert(PORTCORE, getName(), m_listening.load());
285 yCIAssert(PORTCORE, getName(), !m_running.load());
286 yCIAssert(PORTCORE, getName(), !m_starting.load());
287 yCIAssert(PORTCORE, getName(), !m_finished.load());
288 yCIAssert(PORTCORE, getName(), !m_closing.load());
289
290 m_starting.store(true);
291
292 // Start the server thread.
293 bool started = ThreadImpl::start();
294 if (!started) {
295 // run() won't be happening
296 yAssert(false);
297
298 } else {
299 // Wait for the signal from the run thread before returning.
300 m_stateCv.wait(lock, [&]{ return m_running.load(); });
301 yCIAssert(PORTCORE, getName(), m_running.load());
302 }
303 return started;
304}
305
306
307bool PortCore::manualStart(const char* sourceName)
308{
309 yCITrace(PORTCORE, getName(), "manualStart");
310
311 // This variant of start() does not create a server thread.
312 // That is appropriate if we never expect to receive incoming
313 // connections for any reason. No incoming data, no requests
314 // for state information, no requests to change connections,
315 // nothing. We set the port's name to something fake, and
316 // act like nothing is wrong.
317 m_interruptable = false;
318 m_manual = true;
319 setName(sourceName);
320 return true;
321}
322
323
325{
326 // We are no longer interrupted.
327 m_interrupted = false;
328}
329
331{
332 yCITrace(PORTCORE, getName(), "interrupt");
333
334 // This is a no-op if there is no server thread.
335 if (!m_listening.load()) {
336 return;
337 }
338
339 // Ignore any future incoming data
340 m_interrupted = true;
341
342 // What about data that is already coming in?
343 // If interruptable is not currently set, no worries, the user
344 // did not or will not end up blocked on a read.
345 if (!m_interruptable) {
346 return;
347 }
348
349 // Since interruptable is set, it is possible that the user
350 // may be blocked on a read. We send an empty message,
351 // which is reserved for giving blocking readers a chance to
352 // update their state.
353 {
354 // Critical section
355 std::lock_guard<std::mutex> lock(m_stateMutex);
356 if (m_reader != nullptr) {
357 yCIDebug(PORTCORE, getName(), "sending update-state message to listener");
359 lockCallback();
360 m_reader->read(sbr);
362 }
363 }
364}
365
366
367void PortCore::closeMain()
368{
369 yCITrace(PORTCORE, getName(), "closeMain");
370
371 {
372 // Critical section
373 std::lock_guard<std::mutex> lock(m_stateMutex);
374
375 // We may not have anything to do.
376 if (m_finishing || !(m_running.load() || m_manual)) {
377 yCITrace(PORTCORE, getName(), "closeMain - nothing to do");
378 return;
379 }
380
381 yCITrace(PORTCORE, getName(), "closeMain - Central");
382
383 // Move into official "finishing" phase.
384 m_finishing = true;
385 yCIDebug(PORTCORE, getName(), "now preparing to shut down port");
386 }
387
388 // Start disconnecting inputs. We ask the other side of the
389 // connection to do this, so it won't come as a surprise.
390 // The details of how disconnection works vary by carrier.
391 // While we are doing this, the server thread may be still running.
392 // This is because other ports may need to talk to the server
393 // to organize details of how a connection should be broken down.
394 bool done = false;
395 std::string prevName;
396 while (!done) {
397 done = true;
398 std::string removeName;
399 {
400 // Critical section
401 std::lock_guard<std::mutex> lock(m_stateMutex);
402 for (auto* unit : m_units) {
403 if ((unit != nullptr) && unit->isInput() && !unit->isDoomed()) {
404 Route r = unit->getRoute();
405 std::string s = r.getFromName();
406 if (s.length() >= 1 && s[0] == '/' && s != getName() && s != prevName) {
407 removeName = s;
408 done = false;
409 break;
410 }
411 }
412 }
413 }
414 if (!done) {
415 yCIDebug(PORTCORE, getName(), "requesting removal of connection from %s", removeName.c_str());
417 getName(),
418 true);
419 if (!result) {
422 true);
423 }
425 }
426 }
427
428 // Start disconnecting outputs. We don't negotiate with the
429 // other side, we just break them down.
430 done = false;
431 while (!done) {
432 done = true;
434 {
435 // Critical section
436 std::lock_guard<std::mutex> lock(m_stateMutex);
437 for (auto* unit : m_units) {
438 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
439 removeRoute = unit->getRoute();
440 if (removeRoute.getFromName() == getName()) {
441 done = false;
442 break;
443 }
444 }
445 }
446 }
447 if (!done) {
448 removeUnit(removeRoute, true);
449 }
450 }
451
452 bool stopRunning = m_running.load();
453
454 // If the server thread is still running, we need to bring it down.
455 if (stopRunning) {
456 // Let the server thread know we no longer need its services.
457 m_closing.store(true);
458
459 // Wake up the server thread the only way we can, by sending
460 // a message to it. Then join it, it is done.
461 if (!m_manual) {
462 OutputProtocol* op = m_face->write(m_address);
463 if (op != nullptr) {
464 op->close();
465 delete op;
466 }
467 join();
468 }
469
470 // We should be finished now.
471 yCIAssert(PORTCORE, getName(), m_finished.load());
472
473 // Clean up our connection list. We couldn't do this earlier,
474 // because the server thread was running.
475 closeUnits();
476
477 // Reset some state flags.
478 {
479 // Critical section
480 std::lock_guard<std::mutex> lock(m_stateMutex);
481 m_finished.store(false);
482 m_closing.store(false);
483 m_running.store(false);
484 }
485 }
486
487 // There should be no other threads at this point and we
488 // can stop listening on the network.
489 if (m_listening.load()) {
490 yCIAssert(PORTCORE, getName(), m_face != nullptr);
491 m_face->close();
492 delete m_face;
493 m_face = nullptr;
494 m_listening.store(false);
495 }
496
497 // Check if the client is waiting for input. If so, wake them up
498 // with the bad news. An empty message signifies a request to
499 // check the port state.
500 if (m_reader != nullptr) {
501 yCIDebug(PORTCORE, getName(), "sending end-of-port message to listener");
503 m_reader->read(sbr);
504 m_reader = nullptr;
505 }
506
507 // We may need to unregister the port with the name server.
508 if (stopRunning) {
509 std::string name = getName();
510 if (name != std::string("")) {
511 if (m_controlRegistration) {
513 }
514 }
515 }
516
517 // We are done with the finishing process.
518 m_finishing = false;
519
520 // We are fresh as a daisy.
521 yCIAssert(PORTCORE, getName(), !m_listening.load());
522 yCIAssert(PORTCORE, getName(), !m_running.load());
523 yCIAssert(PORTCORE, getName(), !m_starting.load());
524 yCIAssert(PORTCORE, getName(), !m_closing.load());
525 yCIAssert(PORTCORE, getName(), !m_finished.load());
526 yCIAssert(PORTCORE, getName(), !m_finishing);
527 yCIAssert(PORTCORE, getName(), m_face == nullptr);
528}
529
530
532{
533 // How many times has the server thread spun off a connection.
534 std::lock_guard<std::mutex> lock(m_stateMutex);
535 int ct = m_events;
536 return ct;
537}
538
539
540void PortCore::closeUnits()
541{
542 // Empty the PortCore#units list. This is only possible when
543 // the server thread is finished.
544 yCIAssert(PORTCORE, getName(), m_finished.load());
545
546 // In the "finished" phase, nobody else touches the units,
547 // so we can go ahead and shut them down and delete them.
548 for (auto& i : m_units) {
549 PortCoreUnit* unit = i;
550 if (unit != nullptr) {
551 yCIDebug(PORTCORE, getName(), "closing a unit");
552 unit->close();
553 yCIDebug(PORTCORE, getName(), "joining a unit");
554 unit->join();
555 delete unit;
556 yCIDebug(PORTCORE, getName(), "deleting a unit");
557 i = nullptr;
558 }
559 }
560
561 // Get rid of all our nulls. Done!
562 m_units.clear();
563}
564
565void PortCore::reapUnits()
566{
567 // Connections that should be shut down get tagged as "doomed"
568 // but aren't otherwise touched until it is safe to do so.
569 if (!m_finished.load()) {
570 std::lock_guard<std::mutex> lock(m_stateMutex);
571 for (auto* unit : m_units) {
572 if ((unit != nullptr) && unit->isDoomed() && !unit->isFinished()) {
573 std::string s = unit->getRoute().toString();
574 yCIDebug(PORTCORE, getName(), "Informing connection %s that it is doomed", s.c_str());
575 unit->close();
576 yCIDebug(PORTCORE, getName(), "Closed connection %s", s.c_str());
577 unit->join();
578 yCIDebug(PORTCORE, getName(), "Joined thread of connection %s", s.c_str());
579 }
580 }
581 }
582 cleanUnits();
583}
584
585void PortCore::cleanUnits(bool blocking)
586{
587 // We will remove any connections that have finished operating from
588 // the PortCore#units list.
589
590 // Depending on urgency, either wait for a safe time to do this
591 // or skip if unsafe.
592 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
593 if (blocking) {
594 lock.lock();
595 } else {
596 bool have_lock = lock.try_lock();
597 if (!have_lock) {
598 return;
599 }
600 }
601 // here we have the lock
602
603 // We will update our connection counts as a by-product.
604 int updatedInputCount = 0;
605 int updatedOutputCount = 0;
607 yCIDebug(PORTCORE, getName(), "/ routine check of connections to this port begins");
608 if (!m_finished.load()) {
609
610 // First, we delete and null out any defunct entries in the list.
611 for (auto& i : m_units) {
612 PortCoreUnit* unit = i;
613 if (unit != nullptr) {
614 yCIDebug(PORTCORE, getName(), "| checking connection %s %s", unit->getRoute().toString().c_str(), unit->getMode().c_str());
615 if (unit->isFinished()) {
616 std::string con = unit->getRoute().toString();
617 yCIDebug(PORTCORE, getName(), "| removing connection %s", con.c_str());
618 unit->close();
619 unit->join();
620 delete unit;
621 i = nullptr;
622 yCIDebug(PORTCORE, getName(), "| removed connection %s", con.c_str());
623 } else {
624 // No work to do except updating connection counts.
625 if (!unit->isDoomed()) {
626 if (unit->isOutput()) {
628 if (unit->getMode().empty()) {
630 }
631 }
632 if (unit->isInput()) {
633 if (unit->getRoute().getFromName() != "admin") {
635 }
636 }
637 }
638 }
639 }
640 }
641
642 // Now we do some awkward shuffling (list class may be from ACE
643 // or STL, if ACE it is quite limited). We move the nulls to
644 // the end of the list ...
645 size_t rem = 0;
646 for (size_t i2 = 0; i2 < m_units.size(); i2++) {
647 if (m_units[i2] != nullptr) {
648 if (rem < i2) {
649 m_units[rem] = m_units[i2];
650 m_units[i2] = nullptr;
651 }
652 rem++;
653 }
654 }
655
656 // ... Now we get rid of the null entries
657 for (size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
658 m_units.pop_back();
659 }
660 }
661
662 // Finalize the connection counts.
663 m_dataOutputCount = updatedDataOutputCount;
664 lock.unlock();
665
666 m_packetMutex.lock();
667 m_inputCount = updatedInputCount;
668 m_outputCount = updatedOutputCount;
669 m_packetMutex.unlock();
670 yCIDebug(PORTCORE, getName(), "\\ routine check of connections to this port ends");
671}
672
673
674void PortCore::addInput(InputProtocol* ip)
675{
676 yCITrace(PORTCORE, getName(), "addInput");
677
678 // This method is only called by the server thread in its running phase.
679 // It wraps up a network connection as a unit and adds it to
680 // PortCore#units. The unit will have its own thread associated
681 // with it.
682
683 yCIAssert(PORTCORE, getName(), ip != nullptr);
684 std::lock_guard<std::mutex> lock(m_stateMutex);
686 getNextIndex(),
687 ip,
688 false);
689 yCIAssert(PORTCORE, getName(), unit != nullptr);
690 unit->start();
691 m_units.push_back(unit);
692 yCITrace(PORTCORE, getName(), "there are now %zu units", m_units.size());
693}
694
695
697{
698 yCITrace(PORTCORE, getName(), "addOutput");
699
700 // This method is called from threads associated with input
701 // connections.
702 // It wraps up a network connection as a unit and adds it to
703 // PortCore#units. The unit will start with its own thread
704 // associated with it, but that thread will be very short-lived
705 // if the port is not configured to do background writes.
706
707 yCIAssert(PORTCORE, getName(), op != nullptr);
708 if (!m_finished.load()) {
709 std::lock_guard<std::mutex> lock(m_stateMutex);
710 PortCoreUnit* unit = new PortCoreOutputUnit(*this, getNextIndex(), op);
711 yCIAssert(PORTCORE, getName(), unit != nullptr);
712 unit->start();
713 m_units.push_back(unit);
714 }
715}
716
717
718bool PortCore::isUnit(const Route& route, int index)
719{
720 // Check if a connection with a specified route (and optional ID) is present
721 bool needReap = false;
722 if (!m_finished.load()) {
723 for (auto* unit : m_units) {
724 if (unit != nullptr) {
725 Route alt = unit->getRoute();
726 std::string wild = "*";
727 bool ok = true;
728 if (index >= 0) {
729 ok = ok && (unit->getIndex() == index);
730 }
731 if (ok) {
732 if (route.getFromName() != wild) {
733 ok = ok && (route.getFromName() == alt.getFromName());
734 }
735 if (route.getToName() != wild) {
736 ok = ok && (route.getToName() == alt.getToName());
737 }
738 if (route.getCarrierName() != wild) {
739 ok = ok && (route.getCarrierName() == alt.getCarrierName());
740 }
741 }
742 if (ok) {
743 needReap = true;
744 break;
745 }
746 }
747 }
748 }
749 return needReap;
750}
751
752
753bool PortCore::removeUnit(const Route& route, bool synch, bool* except)
754{
755 // This is a request to remove a connection. It could arise from any
756 // input thread.
757
758 if (except != nullptr) {
759 yCIDebug(PORTCORE, getName(), "asked to remove connection in the way of %s", route.toString().c_str());
760 *except = false;
761 } else {
762 yCIDebug(PORTCORE, getName(), "asked to remove connection %s", route.toString().c_str());
763 }
764
765 // Scan for units that match the given route, and collect their IDs.
766 // Mark them as "doomed".
767 std::vector<int> removals;
768
769 bool needReap = false;
770 if (!m_finished.load()) {
771 std::lock_guard<std::mutex> lock(m_stateMutex);
772 for (auto* unit : m_units) {
773 if (unit != nullptr) {
774 Route alt = unit->getRoute();
775 std::string wild = "*";
776 bool ok = true;
777 if (route.getFromName() != wild) {
778 ok = ok && (route.getFromName() == alt.getFromName());
779 }
780 if (route.getToName() != wild) {
781 ok = ok && (route.getToName() == alt.getToName());
782 }
783 if (route.getCarrierName() != wild) {
784 if (except == nullptr) {
785 ok = ok && (route.getCarrierName() == alt.getCarrierName());
786 } else {
787 if (route.getCarrierName() == alt.getCarrierName()) {
788 *except = true;
789 ok = false;
790 }
791 }
792 }
793
794 if (ok) {
795 yCIDebug(PORTCORE, getName(), "removing connection %s", alt.toString().c_str());
796 removals.push_back(unit->getIndex());
797 unit->setDoomed();
798 needReap = true;
799 }
800 }
801 }
802 }
803
804 // If we find one or more matches, we need to do some work.
805 // We've marked those matches as "doomed" so they'll get cleared
806 // up eventually, but we can speed this up by waking up the
807 // server thread.
808 if (needReap) {
809 yCIDebug(PORTCORE, getName(), "one or more connections need prodding to die");
810
811 if (m_manual) {
812 // No server thread, no problems.
813 reapUnits();
814 } else {
815 // Send a blank message to make up server thread.
816 OutputProtocol* op = m_face->write(m_address);
817 if (op != nullptr) {
818 op->close();
819 delete op;
820 }
821 yCIDebug(PORTCORE, getName(), "sent message to prod connection death");
822
823 if (synch) {
824 // Wait for connections to be cleaned up.
825 yCIDebug(PORTCORE, getName(), "synchronizing with connection death");
826 {
827 // Critical section
828 std::unique_lock<std::mutex> lock(m_stateMutex);
829 while (std::any_of(removals.begin(), removals.end(), [&](int removal){ return isUnit(route, removal); })) {
830 m_connectionListeners++;
831 m_connectionChangeCv.wait(lock, [&]{ return m_connectionListeners == 0; });
832 }
833 }
834 }
835 }
836 }
837 return needReap;
838}
839
840
841bool PortCore::addOutput(const std::string& dest,
842 void* id,
843 OutputStream* os,
844 bool onlyIfNeeded)
845{
846 YARP_UNUSED(id);
847 yCIDebug(PORTCORE, getName(), "asked to add output to %s", dest.c_str());
848
849 // Buffer to store text describing outcome (successful connection,
850 // or a failure).
852
853 // Look up the address we'll be connecting to.
854 Contact parts = Name(dest).toAddress();
855 Contact contact = NetworkBase::queryName(parts.getRegName());
856 Contact address = contact;
857
858 // If we can't find it, say so and abort.
859 if (!address.isValid()) {
860 bw.appendLine(std::string("Do not know how to connect to ") + dest);
861 if (os != nullptr) {
862 bw.write(*os);
863 }
864 return false;
865 }
866
867 // We clean all existing connections to the desired destination,
868 // optionally stopping if we find one with the right carrier.
869 if (onlyIfNeeded) {
870 // Remove any existing connections between source and destination
871 // with a different carrier. If we find a connection already
872 // present with the correct carrier, then we are done.
873 bool except = false;
874 removeUnit(Route(getName(), address.getRegName(), address.getCarrier()), true, &except);
875 if (except) {
876 // Connection already present.
877 yCIDebug(PORTCORE, getName(), "output already present to %s", dest.c_str());
878 bw.appendLine(std::string("Desired connection already present from ") + getName() + " to " + dest);
879 if (os != nullptr) {
880 bw.write(*os);
881 }
882 return true;
883 }
884 } else {
885 // Remove any existing connections between source and destination.
886 removeUnit(Route(getName(), address.getRegName(), "*"), true);
887 }
888
889 // Set up a named route for this connection.
890 std::string aname = address.getRegName();
891 if (aname.empty()) {
892 aname = address.toURI(false);
893 }
894 Route r(getName(),
895 aname,
896 ((!parts.getCarrier().empty()) ? parts.getCarrier() : address.getCarrier()));
897 r.setToContact(contact);
898
899 // Check for any restrictions on the port. Perhaps it can only
900 // read, or write.
901 bool allowed = true;
902 std::string err;
903 std::string append;
904 unsigned int f = getFlags();
905 bool allow_output = (f & PORTCORE_IS_OUTPUT) != 0;
906 bool rpc = (f & PORTCORE_IS_RPC) != 0;
907 Name name(r.getCarrierName() + std::string("://test"));
908 std::string mode = name.getCarrierModifier("log");
909 bool is_log = (!mode.empty());
910 if (is_log) {
911 if (mode != "in") {
912 err = "Logger configured as log." + mode + ", but only log.in is supported";
913 allowed = false;
914 } else {
915 append = "; " + r.getFromName() + " will forward messages and replies (if any) to " + r.getToName();
916 }
917 }
918 if (!allow_output) {
919 if (!is_log) {
920 bool push = false;
922 if (c != nullptr) {
923 push = c->isPush();
924 }
925 if (push) {
926 err = "Outputs not allowed";
927 allowed = false;
928 }
929 }
930 } else if (rpc) {
931 if (m_dataOutputCount >= 1 && !is_log) {
932 err = "RPC output already connected";
933 allowed = false;
934 }
935 }
936
937 // If we found a relevant restriction, abort.
938 if (!allowed) {
939 bw.appendLine(err);
940 if (os != nullptr) {
941 bw.write(*os);
942 }
943 return false;
944 }
945
946 // Ok! We can go ahead and make a connection.
947 OutputProtocol* op = nullptr;
948 if (m_timeout > 0) {
949 address.setTimeout(m_timeout);
950 }
951 op = Carriers::connect(address);
952 if (op != nullptr) {
953 op->attachPort(m_contactable);
954 if (m_timeout > 0) {
955 op->setTimeout(m_timeout);
956 }
957
958 bool ok = op->open(r);
959 if (!ok) {
960 yCIDebug(PORTCORE, getName(), "open route error");
961 delete op;
962 op = nullptr;
963 }
964 }
965
966 // No connection, abort.
967 if (op == nullptr) {
968 bw.appendLine(std::string("Cannot connect to ") + dest);
969 if (os != nullptr) {
970 bw.write(*os);
971 }
972 return false;
973 }
974
975 // Ok, we have a connection, now add it to PortCore#units
976 if (op->getConnection().isPush()) {
977 // This is the normal case
978 addOutput(op);
979 } else {
980 // This is the case for connections that are initiated
981 // in the opposite direction to native YARP connections.
982 // Native YARP has push connections, initiated by the
983 // sender. HTTP and ROS have pull connections, initiated
984 // by the receiver.
985 // We invert the route, flip the protocol direction, and add.
986 r.swapNames();
987 op->rename(r);
988 InputProtocol* ip = &(op->getInput());
989 if (!m_finished.load()) {
990 std::lock_guard<std::mutex> lock(m_stateMutex);
992 getNextIndex(),
993 ip,
994 true);
995 yCIAssert(PORTCORE, getName(), unit != nullptr);
996 unit->start();
997 m_units.push_back(unit);
998 }
999 }
1000
1001 // Communicated the good news.
1002 bw.appendLine(std::string("Added connection from ") + getName() + " to " + dest + append);
1003 if (os != nullptr) {
1004 bw.write(*os);
1005 }
1006 cleanUnits();
1007 return true;
1008}
1009
1010
1011void PortCore::removeOutput(const std::string& dest, void* id, OutputStream* os)
1012{
1013 YARP_UNUSED(id);
1014 yCIDebug(PORTCORE, getName(), "asked to remove output to %s", dest.c_str());
1015
1016 // All the real work done by removeUnit().
1018 if (removeUnit(Route("*", dest, "*"), true)) {
1019 bw.appendLine(std::string("Removed connection from ") + getName() + " to " + dest);
1020 } else {
1021 bw.appendLine(std::string("Could not find an outgoing connection to ") + dest);
1022 }
1023 if (os != nullptr) {
1024 bw.write(*os);
1025 }
1026 cleanUnits();
1027}
1028
1029void PortCore::removeInput(const std::string& src, void* id, OutputStream* os)
1030{
1031 YARP_UNUSED(id);
1032 yCIDebug(PORTCORE, getName(), "asked to remove input to %s", src.c_str());
1033
1034 // All the real work done by removeUnit().
1036 if (removeUnit(Route(src, "*", "*"), true)) {
1037 bw.appendLine(std::string("Removing connection from ") + src + " to " + getName());
1038 } else {
1039 bw.appendLine(std::string("Could not find an incoming connection from ") + src);
1040 }
1041 if (os != nullptr) {
1042 bw.write(*os);
1043 }
1044 cleanUnits();
1045}
1046
1048{
1049 YARP_UNUSED(id);
1050 cleanUnits();
1051
1052 // Buffer to store a human-readable description of the port's
1053 // state.
1055
1056 {
1057 // Critical section
1058 std::lock_guard<std::mutex> lock(m_stateMutex);
1059
1060 // Report name and address.
1061 bw.appendLine(std::string("This is ") + m_address.getRegName() + " at " + m_address.toURI());
1062
1063 // Report outgoing connections.
1064 int oct = 0;
1065 for (auto* unit : m_units) {
1066 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1067 Route route = unit->getRoute();
1068 std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1069 bw.appendLine(msg);
1070 oct++;
1071 }
1072 }
1073 if (oct < 1) {
1074 bw.appendLine("There are no outgoing connections");
1075 }
1076
1077 // Report incoming connections.
1078 int ict = 0;
1079 for (auto* unit : m_units) {
1080 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1081 Route route = unit->getRoute();
1082 if (!route.getCarrierName().empty()) {
1083 std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1084 bw.appendLine(msg);
1085 ict++;
1086 }
1087 }
1088 }
1089 if (ict < 1) {
1090 bw.appendLine("There are no incoming connections");
1091 }
1092 }
1093
1094 // Send description across network, or print it.
1095 if (os != nullptr) {
1096 bw.write(*os);
1097 } else {
1099 bw.write(sos);
1100 printf("%s\n", sos.toString().c_str());
1101 }
1102}
1103
1104
1106{
1107 cleanUnits();
1108
1109 std::lock_guard<std::mutex> lock(m_stateMutex);
1110
1111 // Report name and address of port.
1114 std::string portName = m_address.getRegName();
1115 baseInfo.message = std::string("This is ") + portName + " at " + m_address.toURI();
1116 reporter.report(baseInfo);
1117
1118 // Report outgoing connections.
1119 int oct = 0;
1120 for (auto* unit : m_units) {
1121 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1122 Route route = unit->getRoute();
1123 std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1124 PortInfo info;
1125 info.message = msg;
1127 info.incoming = false;
1128 info.portName = portName;
1129 info.sourceName = route.getFromName();
1130 info.targetName = route.getToName();
1131 info.carrierName = route.getCarrierName();
1132 reporter.report(info);
1133 oct++;
1134 }
1135 }
1136 if (oct < 1) {
1137 PortInfo info;
1139 info.message = "There are no outgoing connections";
1140 reporter.report(info);
1141 }
1142
1143 // Report incoming connections.
1144 int ict = 0;
1145 for (auto* unit : m_units) {
1146 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1147 Route route = unit->getRoute();
1148 std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1149 PortInfo info;
1150 info.message = msg;
1152 info.incoming = true;
1153 info.portName = portName;
1154 info.sourceName = route.getFromName();
1155 info.targetName = route.getToName();
1156 info.carrierName = route.getCarrierName();
1157 reporter.report(info);
1158 ict++;
1159 }
1160 }
1161 if (ict < 1) {
1162 PortInfo info;
1164 info.message = "There are no incoming connections";
1165 reporter.report(info);
1166 }
1167}
1168
1169
1171{
1172 std::lock_guard<std::mutex> lock(m_stateMutex);
1173 if (reporter != nullptr) {
1174 m_eventReporter = reporter;
1175 }
1176}
1177
1179{
1180 std::lock_guard<std::mutex> lock(m_stateMutex);
1181 m_eventReporter = nullptr;
1182}
1183
1185{
1186 // We are in the context of one of the input or output threads,
1187 // so our contact with the PortCore must be absolutely minimal.
1188 //
1189 // It is safe to pick up the address of the reporter if this is
1190 // kept constant over the lifetime of the input/output threads.
1191
1192 if (m_eventReporter != nullptr) {
1193 m_eventReporter->report(info);
1194 }
1195}
1196
1197
1199{
1200 YARP_UNUSED(id);
1201 YARP_UNUSED(os);
1202 bool result = true;
1203
1204 // We are in the context of one of the input threads,
1205 // so our contact with the PortCore must be absolutely minimal.
1206 //
1207 // It is safe to pick up the address of the reader since this is
1208 // constant over the lifetime of the input threads.
1209
1210 if (m_reader != nullptr && !m_interrupted) {
1211 m_interruptable = false; // No mutexing; user of interrupt() has to be careful.
1212
1213 bool haveOutputs = (m_outputCount != 0); // No mutexing, but failure modes are benign.
1214
1215 if (m_logNeeded && haveOutputs) {
1216 // Normally, yarp doesn't pay attention to the content of
1217 // messages received by the client. Likewise, the content
1218 // of replies are not monitored. However it may sometimes
1219 // be useful to log this traffic.
1220
1222 recorder.init(&reader);
1223 lockCallback();
1224 result = m_reader->read(recorder);
1226 recorder.fini();
1227 // send off a log of this transaction to whoever wants it
1229 } else {
1230 // YARP is not needed as a middleman
1231 lockCallback();
1232 result = m_reader->read(reader);
1234 }
1235
1236 m_interruptable = true;
1237 } else {
1238 // Read and ignore message, there is no where to send it.
1239 yCIDebug(PORTCORE, getName(), "data received, no reader for it");
1240 Bottle b;
1241 result = b.read(reader);
1242 }
1243 return result;
1244}
1245
1246
1247bool PortCore::send(const PortWriter& writer,
1248 PortReader* reader,
1249 const PortWriter* callback)
1250{
1251 // check if there is any modifier
1252 // we need to protect this part while the modifier
1253 // plugin is loading or unloading!
1254 m_modifier.outputMutex.lock();
1255 if (m_modifier.outputModifier != nullptr) {
1256 if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1257 m_modifier.outputMutex.unlock();
1258 return false;
1259 }
1260 m_modifier.outputModifier->modifyOutgoingData(writer);
1261 }
1262 m_modifier.outputMutex.unlock();
1263 if (!m_logNeeded) {
1264 return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1265 }
1266 // logging is desired, so we need to wrap up and log this send
1267 // (and any reply it gets) -- TODO not yet supported
1268 return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1269}
1270
1272 int mode,
1273 PortReader* reader,
1274 const PortWriter* callback)
1275{
1276 if (m_interrupted || m_finishing) {
1277 return false;
1278 }
1279
1280 bool all_ok = true;
1281 bool gotReply = false;
1282 int logCount = 0;
1283 std::string envelopeString = m_envelope;
1284
1285 // Pass a message to all output units for sending on. We could
1286 // be doing more here to cache the serialization of the message
1287 // and reuse it across output connections. However, one key
1288 // optimization is present: external blocks written by
1289 // yarp::os::ConnectionWriter::appendExternalBlock are never
1290 // copied. So for example the core image array in a yarp::sig::Image
1291 // is untouched by the port communications code.
1292
1293 yCITrace(PORTCORE, getName(), "------- send in real");
1294
1295 // Give user the chance to know that this object is about to be
1296 // written.
1297 writer.onCommencement();
1298
1299 // All user-facing parts of this port will be blocked on this
1300 // operation, so we'll want to be snappy. How long the
1301 // operation lasts will depend on these flags:
1302 // * waitAfterSend
1303 // * waitBeforeSend
1304 // set by setWaitAfterSend() and setWaitBeforeSend().
1305 std::lock_guard<std::mutex> lock(m_stateMutex);
1306
1307 // If the port is shutting down, abort.
1308 if (m_finished.load()) {
1309 return false;
1310 }
1311
1312 yCITrace(PORTCORE, getName(), "------- send in");
1313 // Prepare a "packet" for tracking a single message which
1314 // may travel by multiple outputs.
1315 m_packetMutex.lock();
1316 PortCorePacket* packet = m_packets.getFreePacket();
1317 yCIAssert(PORTCORE, getName(), packet != nullptr);
1318 packet->setContent(&writer, false, callback);
1319 m_packetMutex.unlock();
1320
1321 // Scan connections, placing message everywhere we can.
1322 for (auto* unit : m_units) {
1323 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1324 bool log = (!unit->getMode().empty());
1325 if (log) {
1326 // Some connections are for logging only.
1327 logCount++;
1328 }
1329 bool ok = (mode == PORTCORE_SEND_NORMAL) ? (!log) : (log);
1330 if (!ok) {
1331 continue;
1332 }
1333 bool waiter = m_waitAfterSend || (mode == PORTCORE_SEND_LOG);
1334 yCITrace(PORTCORE, getName(), "------- -- inc");
1335 m_packetMutex.lock();
1336 packet->inc(); // One more connection carrying message.
1337 m_packetMutex.unlock();
1338 yCITrace(PORTCORE, getName(), "------- -- pre-send");
1339 bool gotReplyOne = false;
1340 // Send the message off on this connection.
1341 void* out = unit->send(writer,
1342 reader,
1343 (callback != nullptr) ? callback : (&writer),
1344 reinterpret_cast<void*>(packet),
1346 waiter,
1347 m_waitBeforeSend,
1348 &gotReplyOne);
1350 yCITrace(PORTCORE, getName(), "------- -- send");
1351 if (out != nullptr) {
1352 // We got back a report of a message already sent.
1353 m_packetMutex.lock();
1354 (static_cast<PortCorePacket*>(out))->dec(); // Message on one fewer connections.
1355 m_packets.checkPacket(static_cast<PortCorePacket*>(out));
1356 m_packetMutex.unlock();
1357 }
1358 if (waiter) {
1359 if (unit->isFinished()) {
1360 all_ok = false;
1361 }
1362 }
1363 yCITrace(PORTCORE, getName(), "------- -- dec");
1364 }
1365 }
1366 yCITrace(PORTCORE, getName(), "------- pack check");
1367 m_packetMutex.lock();
1368
1369 // We no longer concern ourselves with the message.
1370 // It may or may not be traveling on some connections.
1371 // But that is not our problem anymore.
1372 packet->dec();
1373
1374 m_packets.checkPacket(packet);
1375 m_packetMutex.unlock();
1376 yCITrace(PORTCORE, getName(), "------- packed");
1377 yCITrace(PORTCORE, getName(), "------- send out");
1378 if (mode == PORTCORE_SEND_LOG) {
1379 if (logCount == 0) {
1380 m_logNeeded = false;
1381 }
1382 }
1383
1384 yCITrace(PORTCORE, getName(), "------- send out real");
1385
1386 if (m_waitAfterSend && reader != nullptr) {
1387 all_ok = all_ok && gotReply;
1388 }
1389
1390 return all_ok;
1391}
1392
1393
1395{
1396 bool writing = false;
1397
1398 // Check if any port is currently writing. TODO optimize
1399 // this query by counting down with notifyCompletion().
1400 if (!m_finished.load()) {
1401 std::lock_guard<std::mutex> lock(m_stateMutex);
1402 for (auto* unit : m_units) {
1403 if ((unit != nullptr) && !unit->isFinished() && unit->isBusy()) {
1404 writing = true;
1405 }
1406 }
1407 }
1408
1409 return writing;
1410}
1411
1412
1414{
1415 cleanUnits(false);
1416 m_packetMutex.lock();
1417 int result = m_inputCount;
1418 m_packetMutex.unlock();
1419 return result;
1420}
1421
1423{
1424 cleanUnits(false);
1425 m_packetMutex.lock();
1426 int result = m_outputCount;
1427 m_packetMutex.unlock();
1428 return result;
1429}
1430
1431
1433{
1434 yCITrace(PORTCORE, getName(), "starting notifyCompletion");
1435 m_packetMutex.lock();
1436 if (tracker != nullptr) {
1437 (static_cast<PortCorePacket*>(tracker))->dec();
1438 m_packets.checkPacket(static_cast<PortCorePacket*>(tracker));
1439 }
1440 m_packetMutex.unlock();
1441 yCITrace(PORTCORE, getName(), "stopping notifyCompletion");
1442}
1443
1444
1446{
1447 m_envelopeWriter.restart();
1448 bool ok = envelope.write(m_envelopeWriter);
1449 if (ok) {
1450 setEnvelope(m_envelopeWriter.toString());
1451 }
1452 return ok;
1453}
1454
1455
1456void PortCore::setEnvelope(const std::string& envelope)
1457{
1458 m_envelope = envelope;
1459 for (size_t i = 0; i < m_envelope.length(); i++) {
1460 // It looks like envelopes are constrained to be printable ASCII?
1461 // I'm not sure why this would be. TODO check.
1462 if (m_envelope[i] < 32) {
1463 m_envelope = m_envelope.substr(0, i);
1464 break;
1465 }
1466 }
1467 yCIDebug(PORTCORE, getName(), "set envelope to %s", m_envelope.c_str());
1468}
1469
1471{
1472 return m_envelope;
1473}
1474
1476{
1478 sis.add(m_envelope);
1479 sis.add("\r\n");
1481 Route route;
1482 sbr.reset(sis, nullptr, route, 0, true);
1483 return envelope.read(sbr);
1484}
1485
1486// Make an RPC connection to talk to a ROS API, send a message, get reply.
1487// NOTE: ROS support can now be moved out of here, once all documentation
1488// of older ways to interoperate with it are purged and people stop
1489// doing it.
1490static bool __pc_rpc(const Contact& c,
1491 const char* carrier,
1492 Bottle& writer,
1493 Bottle& reader,
1494 bool verbose)
1495{
1496 ContactStyle style;
1497 style.quiet = !verbose;
1498 style.timeout = 4;
1499 style.carrier = carrier;
1500 bool ok = Network::write(c, writer, reader, style);
1501 return ok;
1502}
1503
1504// ACE is sometimes confused by localhost aliases, in a ROS-incompatible
1505// way. This method does a quick sanity check if we are using ROS.
1506static bool __tcp_check(const Contact& c)
1507{
1508#ifdef YARP_HAS_ACE
1510 int result = addr.set(c.getPort(), c.getHost().c_str());
1511 if (result != 0) {
1512 yCWarning(PORTCORE, "ACE choked on %s:%d\n", c.getHost().c_str(), c.getPort());
1513 }
1514 result = addr.set(c.getPort(), "127.0.0.1");
1515 if (result != 0) {
1516 yCWarning(PORTCORE, "ACE choked on 127.0.0.1:%d\n", c.getPort());
1517 }
1518 result = addr.set(c.getPort(), "127.0.1.1");
1519 if (result != 0) {
1520 yCWarning(PORTCORE, "ACE choked on 127.0.1.1:%d\n", c.getPort());
1521 }
1522#endif
1523 return true;
1524}
1525
1526namespace {
1527enum class PortCoreCommand : yarp::conf::vocab32_t
1528{
1529 Unknown = 0,
1530 Help = yarp::os::createVocab32('h', 'e', 'l', 'p'),
1531 Ver = yarp::os::createVocab32('v', 'e', 'r'),
1532 Pray = yarp::os::createVocab32('p', 'r', 'a', 'y'),
1533 Add = yarp::os::createVocab32('a', 'd', 'd'),
1534 Del = yarp::os::createVocab32('d', 'e', 'l'),
1535 Atch = yarp::os::createVocab32('a', 't', 'c', 'h'),
1536 Dtch = yarp::os::createVocab32('d', 't', 'c', 'h'),
1537 List = yarp::os::createVocab32('l', 'i', 's', 't'),
1538 Set = yarp::os::createVocab32('s', 'e', 't'),
1539 Get = yarp::os::createVocab32('g', 'e', 't'),
1540 Prop = yarp::os::createVocab32('p', 'r', 'o', 'p'),
1541};
1542
1543enum class PortCoreConnectionDirection : yarp::conf::vocab32_t
1544{
1545 Error = 0,
1546 Out = yarp::os::createVocab32('o', 'u', 't'),
1547 In = yarp::os::createVocab32('i', 'n'),
1548};
1549
1550enum class PortCorePropertyAction : yarp::conf::vocab32_t
1551{
1552 Error = 0,
1553 Get = yarp::os::createVocab32('g', 'e', 't'),
1554 Set = yarp::os::createVocab32('s', 'e', 't')
1555};
1556
1557PortCoreCommand parseCommand(const yarp::os::Value& v)
1558{
1559 auto cmd = static_cast<PortCoreCommand>(v.asVocab32());
1560 switch (cmd) {
1561 case PortCoreCommand::Help:
1562 case PortCoreCommand::Ver:
1563 case PortCoreCommand::Pray:
1564 case PortCoreCommand::Add:
1565 case PortCoreCommand::Del:
1566 case PortCoreCommand::Atch:
1567 case PortCoreCommand::Dtch:
1568 case PortCoreCommand::List:
1569 case PortCoreCommand::Set:
1570 case PortCoreCommand::Get:
1571 case PortCoreCommand::Prop:
1572 return cmd;
1573 default:
1574 return PortCoreCommand::Unknown;
1575 }
1576}
1577
1578PortCoreConnectionDirection parseConnectionDirection(yarp::conf::vocab32_t v, bool errorIsOut = false)
1579{
1580 auto dir = static_cast<PortCoreConnectionDirection>(v);
1581 switch (dir) {
1582 case PortCoreConnectionDirection::In:
1583 case PortCoreConnectionDirection::Out:
1584 return dir;
1585 default:
1586 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1587 }
1588}
1589
1590PortCorePropertyAction parsePropertyAction(yarp::conf::vocab32_t v)
1591{
1592 auto action = static_cast<PortCorePropertyAction>(v);
1593 switch (action) {
1594 case PortCorePropertyAction::Get:
1595 case PortCorePropertyAction::Set:
1596 return action;
1597 default:
1598 return PortCorePropertyAction::Error;
1599 }
1600}
1601
1602void describeRoute(const Route& route, Bottle& result)
1603{
1604 Bottle& bfrom = result.addList();
1605 bfrom.addString("from");
1606 bfrom.addString(route.getFromName());
1607
1608 Bottle& bto = result.addList();
1609 bto.addString("to");
1610 bto.addString(route.getToName());
1611
1612 Bottle& bcarrier = result.addList();
1613 bcarrier.addString("carrier");
1614 bcarrier.addString(route.getCarrierName());
1615
1617 if (carrier->isConnectionless()) {
1618 Bottle& bconnectionless = result.addList();
1619 bconnectionless.addString("connectionless");
1620 bconnectionless.addInt32(1);
1621 }
1622 if (!carrier->isPush()) {
1623 Bottle& breverse = result.addList();
1624 breverse.addString("push");
1625 breverse.addInt32(0);
1626 }
1627 delete carrier;
1628}
1629
1630} // namespace
1631
1633 void* id)
1634{
1635 Bottle cmd;
1636 Bottle result;
1637
1638 // We've received a message to the port that is marked as administrative.
1639 // That means that instead of passing it along as data to the user of the
1640 // port, the port itself is responsible for reading and responding to
1641 // it. So let's read the message and see what we're supposed to do.
1642 cmd.read(reader);
1643
1644 yCIDebug(PORTCORE, getName(), "Port %s received command %s", getName().c_str(), cmd.toString().c_str());
1645
1646 auto handleAdminHelpCmd = []() {
1647 Bottle result;
1648 // We give a list of the most useful administrative commands.
1649 result.addVocab32('m', 'a', 'n', 'y');
1650 result.addString("[help] # give this help");
1651 result.addString("[ver] # report protocol version information");
1652 result.addString("[add] $portname # add an output connection");
1653 result.addString("[add] $portname $car # add an output with a given protocol");
1654 result.addString("[del] $portname # remove an input or output connection");
1655 result.addString("[list] [in] # list input connections");
1656 result.addString("[list] [out] # list output connections");
1657 result.addString("[list] [in] $portname # give details for input");
1658 result.addString("[list] [out] $portname # give details for output");
1659 result.addString("[prop] [get] # get all user-defined port properties");
1660 result.addString("[prop] [get] $prop # get a user-defined port property (prop, val)");
1661 result.addString("[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1662 result.addString("[prop] [get] $portname # get Qos properties of a connection to/from a port");
1663 result.addString("[prop] [set] $portname # set Qos properties of a connection to/from a port");
1664 result.addString("[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1665 result.addString("[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1666 result.addString("[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1667 result.addString("[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1668 result.addString("[dtch] [out] # detach portmonitor plug-in from the port's output");
1669 result.addString("[dtch] [in] # detach portmonitor plug-in from the port's input");
1670 //result.addString("[atch] $portname $prop # attach a portmonitor plug-in to the connection to/from $portname");
1671 //result.addString("[dtch] $portname # detach any portmonitor plug-in from the connection to/from $portname");
1672 return result;
1673 };
1674
1675 auto handleAdminVerCmd = []() {
1676 // Gives a version number for the administrative commands.
1677 // It is distinct from YARP library versioning.
1678 Bottle result;
1679 result.addVocab32("ver");
1680 result.addInt32(1);
1681 result.addInt32(2);
1682 result.addInt32(3);
1683 return result;
1684 };
1685
1686 auto handleAdminPrayCmd = [this]() {
1687 // Strongly inspired by nethack #pray command:
1688 // https://nethackwiki.com/wiki/Prayer
1689 // http://www.steelypips.org/nethack/pray.html
1690
1691 Bottle result;
1692
1693 bool found = false;
1694 std::string name = yarp::conf::environment::get_string("YARP_ROBOT_NAME", &found);
1695 if (!found) {
1696 name = getName();
1697 // Remove initial "/"
1698 while (name[0] == '/') {
1699 name = name.substr(1);
1700 }
1701 // Keep only the first part of the port name
1702 auto i = name.find('/');
1703 if (i != std::string::npos) {
1704 name = name.substr(0, i);
1705 }
1706 }
1707
1708 std::random_device rd;
1709 std::mt19937 mt(rd());
1710 std::uniform_int_distribution<int> dist2(0,1);
1711 auto d2 = std::bind(dist2, mt);
1712
1713 result.addString("You begin praying to " + name + ".");
1714 result.addString("You finish your prayer.");
1715
1716 static const char* godvoices[] = {
1717 "booms out",
1718 "thunders",
1719 "rings out",
1720 "booms",
1721 };
1722 std::uniform_int_distribution<int> godvoices_dist(0, (sizeof(godvoices) / sizeof(godvoices[0])) - 1);
1723 auto godvoice = [&]() {
1724 return std::string(godvoices[godvoices_dist(mt)]);
1725 };
1726
1727 static const char* creatures[] = {
1728 "mortal",
1729 "creature",
1730 "robot",
1731 };
1732 std::uniform_int_distribution<int> creatures_dist(0, (sizeof(creatures) / sizeof(creatures[0])) - 1);
1733 auto creature = [&]() {
1734 return std::string(creatures[creatures_dist(mt)]);
1735 };
1736
1737 static const char* auras[] = {
1738 "amber",
1739 "light blue",
1740 "golden",
1741 "white",
1742 "orange",
1743 "black",
1744 };
1745 std::uniform_int_distribution<int> auras_dist(0, (sizeof(auras) / sizeof(auras[0])) - 1);
1746 auto aura = [&]() {
1747 return std::string(auras[auras_dist(mt)]);
1748 };
1749
1750 static const char* items[] = {
1751 "keyboard",
1752 "mouse",
1753 "monitor",
1754 "headphones",
1755 "smartphone",
1756 "wallet",
1757 "eyeglasses",
1758 "shirt",
1759 };
1760 std::uniform_int_distribution<int> items_dist(0, (sizeof(items) / sizeof(items[0])) - 1);
1761 auto item = [&]() {
1762 return std::string(items[items_dist(mt)]);
1763 };
1764
1765 static const char* blessings[] = {
1766 "You feel more limber.",
1767 "The slime disappears.",
1768 "Your amulet vanishes! You can breathe again.",
1769 "You can breathe again.",
1770 "You are back on solid ground.",
1771 "Your stomach feels content.",
1772 "You feel better.",
1773 "You feel much better.",
1774 "Your surroundings change.",
1775 "Your shape becomes uncertain.",
1776 "Your chain disappears.",
1777 "There's a tiger in your tank.",
1778 "You feel in good health again.",
1779 "Your eye feels better.",
1780 "Your eyes feel better.",
1781 "Looks like you are back in Kansas.",
1782 "Your <ITEM> softly glows <AURA>.",
1783 };
1784 std::uniform_int_distribution<int> blessings_dist(0, (sizeof(blessings) / sizeof(blessings[0])) - 1);
1785 auto blessing = [&](){
1786 auto blessing = std::string(blessings[blessings_dist(mt)]);
1787 blessing = std::regex_replace(blessing, std::regex("<ITEM>"), item());
1788 blessing = std::regex_replace(blessing, std::regex("<AURA>"), aura());
1789 return blessing;
1790 };
1791
1792 std::uniform_int_distribution<int> dist13(0,12);
1793 switch(dist13(mt)) {
1794 case 0:
1795 case 1:
1796 result.addString("You feel that " + name + " is " + (d2() ? "bummed" : "displeased") + ".");
1797 break;
1798 case 2:
1799 case 3:
1800 result.addString("The voice of " + name + " " + godvoice() +
1801 ": \"Thou " + (d2() ? "hast strayed from the path" : "art arrogant") +
1802 ", " + creature() + ". Thou must relearn thy lessons!\"");
1803 break;
1804 case 4:
1805 case 5:
1806 result.addString("The voice of " + name + " " + godvoice() +
1807 ": \"Thou hast angered me.\"");
1808 result.addString("A black glow surrounds you.");
1809 break;
1810 case 6:
1811 result.addString("The voice of " + name + " " + godvoice() +
1812 ": \"Thou hast angered me.\"");
1813 break;
1814 case 7:
1815 case 8:
1816 result.addString("The voice of " + name + " " + godvoice() +
1817 ": \"Thou durst " + (d2() ? "scorn" : "call upon") +
1818 " me? Then die, " + creature() + "!\"");
1819 break;
1820 case 9:
1821 result.addString("You feel that " + name + " is " + (d2() ? "pleased as punch" : "well-pleased") + ".");
1822 result.addString(blessing());
1823 break;
1824 case 10:
1825 result.addString("You feel that " + name + " is " + (d2() ? "ticklish" : "pleased") + ".");
1826 result.addString(blessing());
1827 break;
1828 case 11:
1829 result.addString("You feel that " + name + " is " + (d2() ? "full" : "satisfied") + ".");
1830 result.addString(blessing());
1831 break;
1832 default:
1833 result.addString("The voice of " + name + " " + godvoice() +
1834 ": \"Thou hast angered me.\"");
1835 result.addString("Suddenly, a bolt of lightning strikes you!");
1836 result.addString("You fry to a crisp!");
1837 break;
1838 }
1839
1840 return result;
1841 };
1842
1843 auto handleAdminAddCmd = [this, id](std::string output,
1844 const std::string& carrier) {
1845 // Add an output to the port.
1846 Bottle result;
1847 StringOutputStream cache;
1848 if (!carrier.empty()) {
1849 output = carrier + ":/" + output;
1850 }
1851 addOutput(output, id, &cache, false);
1852 std::string r = cache.toString();
1853 int v = (r[0] == 'A') ? 0 : -1;
1854 result.addInt32(v);
1855 result.addString(r);
1856 return result;
1857 };
1858
1859 auto handleAdminDelCmd = [this, id](const std::string& dest) {
1860 // Delete any inputs or outputs involving the named port.
1861 Bottle result;
1862 StringOutputStream cache;
1863 removeOutput(dest, id, &cache);
1864 std::string r1 = cache.toString();
1865 cache.reset();
1866 removeInput(dest, id, &cache);
1867 std::string r2 = cache.toString();
1868 int v = (r1[0] == 'R' || r2[0] == 'R') ? 0 : -1;
1869 result.addInt32(v);
1870 if (r1[0] == 'R' && r2[0] != 'R') {
1871 result.addString(r1);
1872 } else if (r1[0] != 'R' && r2[0] == 'R') {
1873 result.addString(r2);
1874 } else {
1875 result.addString(r1 + r2);
1876 }
1877 return result;
1878 };
1879
1880 auto handleAdminAtchCmd = [this](PortCoreConnectionDirection direction,
1881 Property prop) {
1882 Bottle result;
1883 switch (direction) {
1884 case PortCoreConnectionDirection::Out: {
1885 std::string errMsg;
1886 if (!attachPortMonitor(prop, true, errMsg)) {
1887 result.addVocab32("fail");
1888 result.addString(errMsg);
1889 } else {
1890 result.addVocab32("ok");
1891 }
1892 } break;
1893 case PortCoreConnectionDirection::In: {
1894 std::string errMsg;
1895 if (!attachPortMonitor(prop, false, errMsg)) {
1896 result.addVocab32("fail");
1897 result.addString(errMsg);
1898 } else {
1899 result.addVocab32("ok");
1900 }
1901 } break;
1902 case PortCoreConnectionDirection::Error:
1903 result.addVocab32("fail");
1904 result.addString("attach command must be followed by [out] or [in]");
1905 }
1906 return result;
1907 };
1908
1909 auto handleAdminDtchCmd = [this](PortCoreConnectionDirection direction) {
1910 Bottle result;
1911 switch (direction) {
1912 case PortCoreConnectionDirection::Out: {
1913 if (detachPortMonitor(true)) {
1914 result.addVocab32("ok");
1915 } else {
1916 result.addVocab32("fail");
1917 }
1918 } break;
1919 case PortCoreConnectionDirection::In: {
1920 if (detachPortMonitor(false)) {
1921 result.addVocab32("ok");
1922 } else {
1923 result.addVocab32("fail");
1924 }
1925 } break;
1926 case PortCoreConnectionDirection::Error:
1927 result.addVocab32("fail");
1928 result.addString("detach command must be followed by [out] or [in]");
1929 };
1930 return result;
1931 };
1932
1933 auto handleAdminListCmd = [this](const PortCoreConnectionDirection direction,
1934 const std::string& target) {
1935 Bottle result;
1936 switch (direction) {
1937 case PortCoreConnectionDirection::In: {
1938 // Return a list of all input connections.
1939 std::lock_guard<std::mutex> lock(m_stateMutex);
1940 for (auto* unit : m_units) {
1941 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1942 Route route = unit->getRoute();
1943 if (target.empty()) {
1944 const std::string& name = route.getFromName();
1945 if (!name.empty()) {
1946 result.addString(name);
1947 }
1948 } else if (route.getFromName() == target) {
1949 describeRoute(route, result);
1950 }
1951 }
1952 }
1953 } break;
1954 case PortCoreConnectionDirection::Out: {
1955 // Return a list of all output connections.
1956 std::lock_guard<std::mutex> lock(m_stateMutex);
1957 for (auto* unit : m_units) {
1958 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1959 Route route = unit->getRoute();
1960 if (target.empty()) {
1961 result.addString(route.getToName());
1962 } else if (route.getToName() == target) {
1963 describeRoute(route, result);
1964 }
1965 }
1966 }
1967 } break;
1968 case PortCoreConnectionDirection::Error:
1969 // Should never happen
1970 yCIAssert(PORTCORE, getName(), false);
1971 break;
1972 }
1973 return result;
1974 };
1975
1976 auto handleAdminSetInCmd = [this](const std::string& target,
1977 const Property& property) {
1978 Bottle result;
1979 // Set carrier parameters on a given input connection.
1980 std::lock_guard<std::mutex> lock(m_stateMutex);
1981 if (target.empty()) {
1982 result.addInt32(-1);
1983 result.addString("target port is not specified.\r\n");
1984 } else {
1985 if (target == getName()) {
1986 std::string errMsg;
1987 if (!setParamPortMonitor(property, false, errMsg)) {
1988 result.addVocab32("fail");
1989 result.addString(errMsg);
1990 } else {
1991 result.addVocab32("ok");
1992 }
1993 } else {
1994 for (auto* unit : m_units) {
1995 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1996 Route route = unit->getRoute();
1997 if (route.getFromName() == target) {
1998 unit->setCarrierParams(property);
1999 result.addInt32(0);
2000 std::string msg = "Configured connection from ";
2001 msg += route.getFromName();
2002 msg += "\r\n";
2003 result.addString(msg);
2004 break;
2005 }
2006 }
2007 }
2008 }
2009 if (result.size() == 0) {
2010 result.addInt32(-1);
2011 std::string msg = "Could not find an incoming connection from ";
2012 msg += target;
2013 msg += "\r\n";
2014 result.addString(msg);
2015 }
2016 }
2017 return result;
2018 };
2019
2020 auto handleAdminSetOutCmd = [this](const std::string& target,
2021 const Property& property) {
2022 Bottle result;
2023 // Set carrier parameters on a given output connection.
2024 std::lock_guard<std::mutex> lock(m_stateMutex);
2025 if (target.empty()) {
2026 result.addInt32(-1);
2027 result.addString("target port is not specified.\r\n");
2028 } else {
2029 if (target == getName()) {
2030 std::string errMsg;
2031 if (!setParamPortMonitor(property, true, errMsg)) {
2032 result.addVocab32("fail");
2033 result.addString(errMsg);
2034 } else {
2035 result.addVocab32("ok");
2036 }
2037 } else {
2038 for (auto* unit : m_units) {
2039 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2040 Route route = unit->getRoute();
2041 if (route.getToName() == target) {
2042 unit->setCarrierParams(property);
2043 result.addInt32(0);
2044 std::string msg = "Configured connection to ";
2045 msg += route.getToName();
2046 msg += "\r\n";
2047 result.addString(msg);
2048 break;
2049 }
2050 }
2051 }
2052 }
2053 if (result.size() == 0) {
2054 result.addInt32(-1);
2055 std::string msg = "Could not find an incoming connection to ";
2056 msg += target;
2057 msg += "\r\n";
2058 result.addString(msg);
2059 }
2060 }
2061 return result;
2062 };
2063
2064 auto handleAdminGetInCmd = [this](const std::string& target) {
2065 Bottle result;
2066 // Get carrier parameters for a given input connection.
2067 std::lock_guard<std::mutex> lock(m_stateMutex);
2068 if (target.empty()) {
2069 result.addInt32(-1);
2070 result.addString("target port is not specified.\r\n");
2071 } else if (target == getName()) {
2072 yarp::os::Property property;
2073 std::string errMsg;
2074 if (!getParamPortMonitor(property, false, errMsg)) {
2075 result.addVocab32("fail");
2076 result.addString(errMsg);
2077 } else {
2078 result.addDict() = property;
2079 }
2080 } else {
2081 for (auto* unit : m_units) {
2082 if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2083 Route route = unit->getRoute();
2084 if (route.getFromName() == target) {
2085 yarp::os::Property property;
2086 unit->getCarrierParams(property);
2087 result.addDict() = property;
2088 break;
2089 }
2090 }
2091 }
2092 if (result.size() == 0) {
2093 result.addInt32(-1);
2094 std::string msg = "Could not find an incoming connection from ";
2095 msg += target;
2096 msg += "\r\n";
2097 result.addString(msg);
2098 }
2099 }
2100 return result;
2101 };
2102
2103 auto handleAdminGetOutCmd = [this](const std::string& target) {
2104 Bottle result;
2105 // Get carrier parameters for a given output connection.
2106 std::lock_guard<std::mutex> lock(m_stateMutex);
2107 if (target.empty()) {
2108 result.addInt32(-1);
2109 result.addString("target port is not specified.\r\n");
2110 } else if (target == getName()) {
2111 yarp::os::Property property;
2112 std::string errMsg;
2113 if (!getParamPortMonitor(property, true, errMsg)) {
2114 result.addVocab32("fail");
2115 result.addString(errMsg);
2116 } else {
2117 result.addDict() = property;
2118 }
2119 } else {
2120 for (auto* unit : m_units) {
2121 if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2122 Route route = unit->getRoute();
2123 if (route.getToName() == target) {
2124 yarp::os::Property property;
2125 unit->getCarrierParams(property);
2126 result.addDict() = property;
2127 break;
2128 }
2129 }
2130 }
2131 if (result.size() == 0) {
2132 result.addInt32(-1);
2133 std::string msg = "Could not find an incoming connection to ";
2134 msg += target;
2135 msg += "\r\n";
2136 result.addString(msg);
2137 }
2138 }
2139 return result;
2140 };
2141
2142 auto handleAdminPropGetCmd = [this](const std::string& key) {
2143 Bottle result;
2144 Property* p = acquireProperties(false);
2145 if (p != nullptr) {
2146 if (key.empty()) {
2147 result.fromString(p->toString());
2148 } else {
2149 // request: "prop get /portname"
2150 if (key[0] == '/') {
2151 bool bFound = false;
2152 // check for their own name
2153 if (key == getName()) {
2154 bFound = true;
2155 Bottle& sched = result.addList();
2156 sched.addString("sched");
2157 Property& sched_prop = sched.addDict();
2158 sched_prop.put("tid", static_cast<int>(this->getTid()));
2159 sched_prop.put("priority", this->getPriority());
2160 sched_prop.put("policy", this->getPolicy());
2161
2163 Bottle& proc = result.addList();
2164 proc.addString("process");
2165 Property& proc_prop = proc.addDict();
2166 proc_prop.put("pid", info.pid);
2167 proc_prop.put("name", (info.pid != -1) ? info.name : "unknown");
2168 proc_prop.put("arguments", (info.pid != -1) ? info.arguments : "unknown");
2169 proc_prop.put("priority", info.schedPriority);
2170 proc_prop.put("policy", info.schedPolicy);
2171
2173 Bottle& platform = result.addList();
2174 platform.addString("platform");
2175 Property& platform_prop = platform.addDict();
2176 platform_prop.put("os", pinfo.name);
2177 platform_prop.put("hostname", m_address.getHost());
2178
2179 unsigned int f = getFlags();
2180 bool is_input = (f & PORTCORE_IS_INPUT) != 0;
2181 bool is_output = (f & PORTCORE_IS_OUTPUT) != 0;
2182 bool is_rpc = (f & PORTCORE_IS_RPC) != 0;
2183 Bottle& port = result.addList();
2184 port.addString("port");
2185 Property& port_prop = port.addDict();
2186 port_prop.put("is_input", is_input);
2187 port_prop.put("is_output", is_output);
2188 port_prop.put("is_rpc", is_rpc);
2189 port_prop.put("type", getType().getName());
2190 } else {
2191 for (auto* unit : m_units) {
2192 if ((unit != nullptr) && !unit->isFinished()) {
2193 Route route = unit->getRoute();
2194 std::string coreName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2195 if (key == coreName) {
2196 bFound = true;
2197 int priority = unit->getPriority();
2198 int policy = unit->getPolicy();
2199 int tos = getTypeOfService(unit);
2200 int tid = static_cast<int>(unit->getTid());
2201 Bottle& sched = result.addList();
2202 sched.addString("sched");
2203 Property& sched_prop = sched.addDict();
2204 sched_prop.put("tid", tid);
2205 sched_prop.put("priority", priority);
2206 sched_prop.put("policy", policy);
2207 Bottle& qos = result.addList();
2208 qos.addString("qos");
2209 Property& qos_prop = qos.addDict();
2210 qos_prop.put("tos", tos);
2211 }
2212 } // end isFinished()
2213 } // end for loop
2214 } // end portName == getname()
2215
2216 if (!bFound) { // cannot find any port matches the requested one
2217 result.addVocab32("fail");
2218 std::string msg = "cannot find any connection to/from ";
2219 msg = msg + key;
2220 result.addString(msg);
2221 }
2222 // end of (portName[0] == '/')
2223 } else {
2224 result.add(p->find(key));
2225 }
2226 }
2227 }
2229 return result;
2230 };
2231
2232 auto handleAdminPropSetCmd = [this](const std::string& key,
2233 const Value& value,
2234 const Bottle& process,
2235 const Bottle& sched,
2236 const Bottle& qos) {
2237 Bottle result;
2238 Property* p = acquireProperties(false);
2239 bool bOk = true;
2240 if (p != nullptr) {
2241 p->put(key, value);
2242 // setting scheduling properties of all threads within the process
2243 // scope through the admin port
2244 // e.g. prop set /current_port (process ((priority 30) (policy 1)))
2245 if (!process.isNull()) {
2246 std::string portName = key;
2247 if ((!portName.empty()) && (portName[0] == '/')) {
2248 // check for their own name
2249 if (portName == getName()) {
2250 bOk = false;
2251 Bottle* process_prop = process.find("process").asList();
2252 if (process_prop != nullptr) {
2253 int prio = -1;
2254 int policy = -1;
2255 if (process_prop->check("priority")) {
2256 prio = process_prop->find("priority").asInt32();
2257 }
2258 if (process_prop->check("policy")) {
2259 policy = process_prop->find("policy").asInt32();
2260 }
2261 bOk = setProcessSchedulingParam(prio, policy);
2262 }
2263 }
2264 }
2265 }
2266 // check if we need to set the PortCoreUnit scheduling policy
2267 // e.g., "prop set /portname (sched ((priority 30) (policy 1)))"
2268 // The priority and policy values on Linux are:
2269 // SCHED_OTHER : policy=0, priority=[0 .. 0]
2270 // SCHED_FIFO : policy=1, priority=[1 .. 99]
2271 // SCHED_RR : policy=2, priority=[1 .. 99]
2272 if (!sched.isNull()) {
2273 if ((!key.empty()) && (key[0] == '/')) {
2274 bOk = false;
2275 for (auto* unit : m_units) {
2276 if ((unit != nullptr) && !unit->isFinished()) {
2277 Route route = unit->getRoute();
2278 std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2279
2280 if (portName == key) {
2281 Bottle* sched_prop = sched.find("sched").asList();
2282 if (sched_prop != nullptr) {
2283 int prio = -1;
2284 int policy = -1;
2285 if (sched_prop->check("priority")) {
2286 prio = sched_prop->find("priority").asInt32();
2287 }
2288 if (sched_prop->check("policy")) {
2289 policy = sched_prop->find("policy").asInt32();
2290 }
2291 bOk = (unit->setPriority(prio, policy) != -1);
2292 } else {
2293 bOk = false;
2294 }
2295 break;
2296 }
2297 }
2298 }
2299 }
2300 }
2301 // check if we need to set the packet QOS policy
2302 // e.g., "prop set /portname (qos ((priority HIGH)))"
2303 // e.g., "prop set /portname (qos ((dscp AF12)))"
2304 // e.g., "prop set /portname (qos ((tos 12)))"
2305 if (!qos.isNull()) {
2306 if ((!key.empty()) && (key[0] == '/')) {
2307 bOk = false;
2308 for (auto* unit : m_units) {
2309 if ((unit != nullptr) && !unit->isFinished()) {
2310 Route route = unit->getRoute();
2311 std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2312 if (portName == key) {
2313 Bottle* qos_prop = qos.find("qos").asList();
2314 if (qos_prop != nullptr) {
2315 int tos = -1;
2316 if (qos_prop->check("priority")) {
2317 // set the packet TOS value on the socket based on some predefined
2318 // priority levels.
2319 // the expected levels are: LOW, NORM, HIGH, CRIT
2320 NetInt32 priority = qos_prop->find("priority").asVocab32();
2321 int dscp;
2322 switch (priority) {
2323 case yarp::os::createVocab32('L', 'O', 'W'):
2324 dscp = 10;
2325 break;
2326 case yarp::os::createVocab32('N', 'O', 'R', 'M'):
2327 dscp = 0;
2328 break;
2329 case yarp::os::createVocab32('H', 'I', 'G', 'H'):
2330 dscp = 36;
2331 break;
2332 case yarp::os::createVocab32('C', 'R', 'I', 'T'):
2333 dscp = 44;
2334 break;
2335 default:
2336 dscp = -1;
2337 }
2338 if (dscp >= 0) {
2339 tos = (dscp << 2);
2340 }
2341 } else if (qos_prop->check("dscp")) {
2342 // Set the packet TOS value on the socket based on the DSCP level
2344 int dscp = -1;
2346 auto dscp_val = qos_prop->find("dscp");
2347 if (dscp_val.isInt32()) {
2348 dscp = dscp_val.asInt32();
2349 }
2350 } else {
2351 dscp = static_cast<int>(dscp_class);
2352 }
2353 if ((dscp >= 0) && (dscp < 64)) {
2354 tos = (dscp << 2);
2355 }
2356 } else if (qos_prop->check("tos")) {
2357 // Set the TOS value directly
2358 auto tos_val = qos_prop->find("tos");
2359 if (tos_val.isInt32()) {
2360 tos = tos_val.asInt32();
2361 }
2362 }
2363 if (tos >= 0) {
2364 bOk = setTypeOfService(unit, tos);
2365 }
2366 } else {
2367 bOk = false;
2368 }
2369 break;
2370 }
2371 }
2372 }
2373 }
2374 }
2375 }
2377 result.addVocab32((bOk) ? "ok" : "fail");
2378 return result;
2379 };
2380
2381 auto handleAdminUnknownCmd = [this](const Bottle& cmd) {
2382 Bottle result;
2383 bool ok = false;
2384 if (m_adminReader != nullptr) {
2385 DummyConnector con;
2386 cmd.write(con.getWriter());
2387 lockCallback();
2388 ok = m_adminReader->read(con.getReader());
2390 if (ok) {
2391 result.read(con.getReader());
2392 }
2393 }
2394 if (!ok) {
2395 result.addVocab32("fail");
2396 result.addString("send [help] for list of valid commands");
2397 }
2398 return result;
2399 };
2400
2401 const PortCoreCommand command = parseCommand(cmd.get(0));
2402 switch (command) {
2403 case PortCoreCommand::Help:
2404 result = handleAdminHelpCmd();
2405 break;
2406 case PortCoreCommand::Ver:
2407 result = handleAdminVerCmd();
2408 break;
2409 case PortCoreCommand::Pray:
2410 result = handleAdminPrayCmd();
2411 break;
2412 case PortCoreCommand::Add: {
2413 std::string output = cmd.get(1).asString();
2414 std::string carrier = cmd.get(2).asString();
2415 result = handleAdminAddCmd(std::move(output), carrier);
2416 } break;
2417 case PortCoreCommand::Del: {
2418 const std::string dest = cmd.get(1).asString();
2419 result = handleAdminDelCmd(dest);
2420 } break;
2421 case PortCoreCommand::Atch: {
2422 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32());
2423 Property prop(cmd.get(2).asString().c_str());
2424 result = handleAdminAtchCmd(direction, std::move(prop));
2425 } break;
2426 case PortCoreCommand::Dtch: {
2427 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32());
2428 result = handleAdminDtchCmd(direction);
2429 } break;
2430 case PortCoreCommand::List: {
2431 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2432 const std::string target = cmd.get(2).asString();
2433 result = handleAdminListCmd(direction, target);
2434 } break;
2435 case PortCoreCommand::Set: {
2436 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2437 const std::string target = cmd.get(2).asString();
2438 yarp::os::Property property;
2439 property.fromString(cmd.toString());
2440 switch (direction) {
2441 case PortCoreConnectionDirection::In:
2442 result = handleAdminSetInCmd(target, property);
2443 break;
2444 case PortCoreConnectionDirection::Out:
2445 result = handleAdminSetOutCmd(target, property);
2446 break;
2447 case PortCoreConnectionDirection::Error:
2448 yCIAssert(PORTCORE, getName(), false); // Should never happen (error is out)
2449 break;
2450 }
2451 } break;
2452 case PortCoreCommand::Get: {
2453 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2454 const std::string target = cmd.get(2).asString();
2455 switch (direction) {
2456 case PortCoreConnectionDirection::In:
2457 result = handleAdminGetInCmd(target);
2458 break;
2459 case PortCoreConnectionDirection::Out:
2460 result = handleAdminGetOutCmd(target);
2461 break;
2462 case PortCoreConnectionDirection::Error:
2463 yCIAssert(PORTCORE, getName(), false); // Should never happen (error is out)
2464 break;
2465 }
2466 } break;
2467 case PortCoreCommand::Prop: {
2468 PortCorePropertyAction action = parsePropertyAction(cmd.get(1).asVocab32());
2469 const std::string key = cmd.get(2).asString();
2470 // Set/get arbitrary properties on a port.
2471 switch (action) {
2472 case PortCorePropertyAction::Get:
2473 result = handleAdminPropGetCmd(key);
2474 break;
2475 case PortCorePropertyAction::Set: {
2476 const Value& value = cmd.get(3);
2477 const Bottle& process = cmd.findGroup("process");
2478 const Bottle& sched = cmd.findGroup("sched");
2479 const Bottle& qos = cmd.findGroup("qos");
2480 result = handleAdminPropSetCmd(key, value, process, sched, qos);
2481 } break;
2482 case PortCorePropertyAction::Error:
2483 result.addVocab32("fail");
2484 result.addString("property action not known");
2485 break;
2486 }
2487 } break;
2488 case PortCoreCommand::Unknown:
2489 result = handleAdminUnknownCmd(cmd);
2490 break;
2491 }
2492
2493 ConnectionWriter* writer = reader.getWriter();
2494 if (writer != nullptr) {
2495 result.write(*writer);
2496 }
2497
2498 return true;
2499}
2500
2501
2502bool PortCore::setTypeOfService(PortCoreUnit* unit, int tos)
2503{
2504 if (unit == nullptr) {
2505 return false;
2506 }
2507
2508 yCIDebug(PORTCORE, getName(), "Trying to set TOS = %d", tos);
2509
2510 if (unit->isOutput()) {
2511 auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2512 if (outUnit != nullptr) {
2513 OutputProtocol* op = outUnit->getOutPutProtocol();
2514 if (op != nullptr) {
2515 yCIDebug(PORTCORE, getName(), "Trying to set TOS = %d on output unit", tos);
2516 bool ok = op->getOutputStream().setTypeOfService(tos);
2517 if (!ok) {
2518 yCIWarning(PORTCORE, getName(), "Setting TOS on output unit failed");
2519 }
2520 return ok;
2521 }
2522 }
2523 }
2524
2525 // Some of the input units may have output stream object to write back to
2526 // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2527 // also configured for them.
2528
2529
2530 if (unit->isInput()) {
2531 auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2532 if (inUnit != nullptr) {
2533 InputProtocol* ip = inUnit->getInPutProtocol();
2534 if ((ip != nullptr) && ip->getOutput().isOk()) {
2535 yCIDebug(PORTCORE, getName(), "Trying to set TOS = %d on input unit", tos);
2536 bool ok = ip->getOutput().getOutputStream().setTypeOfService(tos);
2537 if (!ok) {
2538 yCIWarning(PORTCORE, getName(), "Setting TOS on input unit failed");
2539 }
2540 return ok;
2541 }
2542 }
2543 }
2544 // if there is nothing to be set, returns true
2545 return true;
2546}
2547
2548int PortCore::getTypeOfService(PortCoreUnit* unit)
2549{
2550 if (unit == nullptr) {
2551 return -1;
2552 }
2553
2554 if (unit->isOutput()) {
2555 auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2556 if (outUnit != nullptr) {
2557 OutputProtocol* op = outUnit->getOutPutProtocol();
2558 if (op != nullptr) {
2559 return op->getOutputStream().getTypeOfService();
2560 }
2561 }
2562 }
2563
2564 // Some of the input units may have output stream object to write back to
2565 // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2566 // also configured for them.
2567
2568
2569 if (unit->isInput()) {
2570 auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2571 if (inUnit != nullptr) {
2572 InputProtocol* ip = inUnit->getInPutProtocol();
2573 if ((ip != nullptr) && ip->getOutput().isOk()) {
2574 return ip->getOutput().getOutputStream().getTypeOfService();
2575 }
2576 }
2577 }
2578 return -1;
2579}
2580
2581// attach a portmonitor plugin to the port or to a specific connection
2582bool PortCore::attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg)
2583{
2584 // attach to the current port
2586 if (portmonitor == nullptr) {
2587 errMsg = "Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2588 return false;
2589 }
2590
2591 if (isOutput) {
2592 detachPortMonitor(true);
2593 prop.put("source", getName());
2594 prop.put("destination", "");
2595 prop.put("sender_side", 1);
2596 prop.put("receiver_side", 0);
2597 prop.put("carrier", "");
2598 m_modifier.outputMutex.lock();
2599 m_modifier.outputModifier = portmonitor;
2600 if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2601 m_modifier.releaseOutModifier();
2602 errMsg = "Failed to configure the portmonitor plug-in";
2603 m_modifier.outputMutex.unlock();
2604 return false;
2605 }
2606 m_modifier.outputMutex.unlock();
2607 } else {
2608 detachPortMonitor(false);
2609 prop.put("source", "");
2610 prop.put("destination", getName());
2611 prop.put("sender_side", 0);
2612 prop.put("receiver_side", 1);
2613 prop.put("carrier", "");
2614 m_modifier.inputMutex.lock();
2615 m_modifier.inputModifier = portmonitor;
2616 if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2617 m_modifier.releaseInModifier();
2618 errMsg = "Failed to configure the portmonitor plug-in";
2619 m_modifier.inputMutex.unlock();
2620 return false;
2621 }
2622 m_modifier.inputMutex.unlock();
2623 }
2624 return true;
2625}
2626
2627// detach the portmonitor from the port or specific connection
2628bool PortCore::detachPortMonitor(bool isOutput)
2629{
2630 if (isOutput) {
2631 m_modifier.outputMutex.lock();
2632 m_modifier.releaseOutModifier();
2633 m_modifier.outputMutex.unlock();
2634 } else {
2635 m_modifier.inputMutex.lock();
2636 m_modifier.releaseInModifier();
2637 m_modifier.inputMutex.unlock();
2638 }
2639 return true;
2640}
2641
2642bool PortCore::setParamPortMonitor(const yarp::os::Property& param,
2643 bool isOutput,
2644 std::string& errMsg)
2645{
2646 if (isOutput) {
2647 m_modifier.outputMutex.lock();
2648 if (m_modifier.outputModifier == nullptr) {
2649 errMsg = "No port modifier is attached to the output";
2650 m_modifier.outputMutex.unlock();
2651 return false;
2652 }
2653 m_modifier.outputModifier->setCarrierParams(param);
2654 m_modifier.outputMutex.unlock();
2655 } else {
2656 m_modifier.inputMutex.lock();
2657 if (m_modifier.inputModifier == nullptr) {
2658 errMsg = "No port modifier is attached to the input";
2659 m_modifier.inputMutex.unlock();
2660 return false;
2661 }
2662 m_modifier.inputModifier->setCarrierParams(param);
2663 m_modifier.inputMutex.unlock();
2664 }
2665 return true;
2666}
2667
2668bool PortCore::getParamPortMonitor(yarp::os::Property& param,
2669 bool isOutput,
2670 std::string& errMsg)
2671{
2672 if (isOutput) {
2673 m_modifier.outputMutex.lock();
2674 if (m_modifier.outputModifier == nullptr) {
2675 errMsg = "No port modifier is attached to the output";
2676 m_modifier.outputMutex.unlock();
2677 return false;
2678 }
2679 m_modifier.outputModifier->getCarrierParams(param);
2680 m_modifier.outputMutex.unlock();
2681 } else {
2682 m_modifier.inputMutex.lock();
2683 if (m_modifier.inputModifier == nullptr) {
2684 errMsg = "No port modifier is attached to the input";
2685 m_modifier.inputMutex.unlock();
2686 return false;
2687 }
2688 m_modifier.inputModifier->getCarrierParams(param);
2689 m_modifier.inputMutex.unlock();
2690 }
2691 return true;
2692}
2693
2695{
2696 YARP_UNUSED(active);
2697 if (unit != nullptr) {
2698 bool isLog = (!unit->getMode().empty());
2699 if (isLog) {
2700 m_logNeeded = true;
2701 }
2702 }
2703}
2704
2705bool PortCore::setProcessSchedulingParam(int priority, int policy)
2706{
2707#if defined(__linux__)
2708 // set the sched properties of all threads within the process
2709 struct sched_param sch_param;
2710 sch_param.__sched_priority = priority;
2711
2712 DIR* dir;
2713 char path[PATH_MAX];
2714 sprintf(path, "/proc/%d/task/", yarp::os::impl::getpid());
2715
2716 dir = opendir(path);
2717 if (dir == nullptr) {
2718 return false;
2719 }
2720
2721 struct dirent* d;
2722 char* end;
2723 long tid = 0;
2724 bool ret = true;
2725 while ((d = readdir(dir)) != nullptr) {
2726 if (isdigit(static_cast<unsigned char>(*d->d_name)) == 0) {
2727 continue;
2728 }
2729
2730 tid = strtol(d->d_name, &end, 10);
2731 if (d->d_name == end || ((end != nullptr) && (*end != 0))) {
2732 closedir(dir);
2733 return false;
2734 }
2735 ret &= (sched_setscheduler(static_cast<pid_t>(tid), policy, &sch_param) == 0);
2736 }
2737 closedir(dir);
2738 return ret;
2739#elif defined(YARP_HAS_ACE) // for other platforms
2740 // TODO: Check how to set the scheduling properties of all process's threads in Windows
2741 ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2742 int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2743 return (ret != -1);
2744#else
2745 return false;
2746#endif
2747}
2748
2750{
2751 m_stateMutex.lock();
2752 if (!readOnly) {
2753 if (m_prop == nullptr) {
2754 m_prop = new Property();
2755 }
2756 }
2757 return m_prop;
2758}
2759
2761{
2762 YARP_UNUSED(prop);
2763 m_stateMutex.unlock();
2764}
2765
2766bool PortCore::removeIO(const Route& route, bool synch)
2767{
2768 return removeUnit(route, synch);
2769}
2770
2771void PortCore::setName(const std::string& name)
2772{
2773 m_name = name;
2774}
2775
2777{
2778 return m_name;
2779}
2780
2781int PortCore::getNextIndex()
2782{
2783 int result = m_counter;
2784 m_counter++;
2785 if (m_counter < 0) {
2786 m_counter = 1;
2787 }
2788 return result;
2789}
2790
2792{
2793 return m_address;
2794}
2795
2796void PortCore::resetPortName(const std::string& str)
2797{
2798 m_address.setName(str);
2799}
2800
2802{
2803 return m_readableCreator;
2804}
2805
2807{
2808 m_controlRegistration = flag;
2809}
2810
2812{
2813 return m_listening.load();
2814}
2815
2817{
2818 return m_manual;
2819}
2820
2822{
2823 return m_interrupted;
2824}
2825
2826void PortCore::setTimeout(float timeout)
2827{
2828 m_timeout = timeout;
2829}
2830
2831bool PortCore::setCallbackLock(std::mutex* mutex)
2832{
2834 if (mutex != nullptr) {
2835 m_mutex = mutex;
2836 m_mutexOwned = false;
2837 } else {
2838 m_mutex = new std::mutex;
2839 m_mutexOwned = true;
2840 }
2841 return true;
2842}
2843
2845{
2846 if (m_mutexOwned && (m_mutex != nullptr)) {
2847 delete m_mutex;
2848 }
2849 m_mutex = nullptr;
2850 m_mutexOwned = false;
2851 return true;
2852}
2853
2855{
2856 if (m_mutex == nullptr) {
2857 return false;
2858 }
2859 m_mutex->lock();
2860 return true;
2861}
2862
2864{
2865 if (m_mutex == nullptr) {
2866 return true;
2867 }
2868 return m_mutex->try_lock();
2869}
2870
2872{
2873 if (m_mutex == nullptr) {
2874 return;
2875 }
2876 m_mutex->unlock();
2877}
2878
2883
2885{
2886 m_typeMutex.lock();
2887 if (!m_checkedType) {
2888 if (!m_type.isValid()) {
2889 m_type = reader.getReadType();
2890 }
2891 m_checkedType = true;
2892 }
2893 m_typeMutex.unlock();
2894}
2895
2897{
2898 m_typeMutex.lock();
2899 Type t = m_type;
2900 m_typeMutex.unlock();
2901 return t;
2902}
2903
2905{
2906 m_typeMutex.lock();
2907 m_type = typ;
2908 m_typeMutex.unlock();
2909}
bool ret
#define yAssert(x)
Definition Log.h:388
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
Definition PortCore.h:40
#define PORTCORE_SEND_LOG
Definition PortCore.h:35
#define PORTCORE_IS_RPC
Definition PortCore.h:39
#define PORTCORE_IS_OUTPUT
Definition PortCore.h:41
#define PORTCORE_SEND_NORMAL
Definition PortCore.h:34
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
Definition Bottle.cpp:309
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
Definition Bottle.cpp:164
void fromString(const std::string &text)
Initializes bottle from a string.
Definition Bottle.cpp:204
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Definition Bottle.cpp:188
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition Bottle.cpp:182
size_type size() const
Gets the number of elements in the bottle.
Definition Bottle.cpp:251
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition Bottle.cpp:240
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition Bottle.cpp:246
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition Bottle.cpp:293
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition Bottle.cpp:230
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition Bottle.cpp:211
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition Carrier.h:44
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
void getCarrierParams(Property &params) const override
Get carrier configuration and deliver it by port administrative commands.
Definition Carrier.cpp:127
virtual bool configureFromProperty(yarp::os::Property &options)
Definition Carrier.cpp:115
bool isPush() const override
Check if carrier is "push" or "pull" style.
Definition Carrier.cpp:23
bool acceptOutgoingData(const PortWriter &writer) override
Determine whether outgoing data should be accepted.
Definition Carrier.cpp:102
const PortWriter & modifyOutgoingData(const PortWriter &writer) override
Modify outgoing payload data, if appropriate.
Definition Carrier.cpp:77
void setCarrierParams(const Property &params) override
Configure carrier from port administrative commands.
Definition Carrier.cpp:122
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
Definition Carriers.cpp:250
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
Definition Carriers.cpp:238
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition Carriers.cpp:233
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
Definition Carriers.cpp:282
An interface for reading from a network connection.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
Preferences for how to communicate with a contact.
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
bool quiet
Suppress all outputs and warnings.
std::string carrier
Request that communication be made using a particular carrier.
Represents how to reach a part of a YARP network.
Definition Contact.h:33
bool isValid() const
Checks if a Contact is tagged as valid.
Definition Contact.cpp:298
std::string getRegName() const
Get the name associated with this Contact.
Definition Contact.cpp:217
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition Contact.cpp:239
void setName(const std::string &name)
Set the name associated with this Contact.
Definition Contact.cpp:222
void setTimeout(float timeout)
Set timeout for this Contact.
Definition Contact.cpp:282
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition Contact.cpp:250
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition Contact.cpp:228
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
virtual InputProtocol * read()=0
Block and wait for someone to talk to us.
virtual Contact getLocalAddress() const
Get address after open(), if more specific that the address provided to open() - otherwise an invalid...
Definition Face.h:82
virtual OutputProtocol * write(const Contact &address)=0
Try to reach out and talk to someone.
virtual void close()=0
Stop listening.
The input side of an active connection between two ports.
virtual void close()=0
Negotiate an end to operations.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
Simple abstraction for a YARP port name.
Definition Name.h:18
Contact toAddress() const
Create an address from the name.
Definition Name.cpp:27
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition Name.cpp:44
static bool initialized()
Returns true if YARP has been fully initialized.
Definition Network.cpp:1382
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
Definition Network.cpp:1054
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition Network.cpp:1023
static NameStore * getQueryBypass()
Definition Network.cpp:1405
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition Network.cpp:995
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition Network.cpp:1511
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
Definition Network.cpp:1942
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition Network.cpp:700
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition Network.cpp:1219
The output side of an active connection between two ports.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
Simple specification of the minimum functions needed from output streams.
Information about a port connection or event.
Definition PortInfo.h:25
std::string message
A human-readable description of contents.
Definition PortInfo.h:68
int tag
Type of information.
Definition PortInfo.h:47
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition PortInfo.h:39
@ PORTINFO_MISC
Unspecified information.
Definition PortInfo.h:42
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
Definition PortReport.h:25
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
Definition Property.h:33
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
std::string toString() const override
Return a standard text representation of the content of the object.
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition Property.cpp:987
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
Definition QosStyle.h:45
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
Definition QosStyle.cpp:155
Information about a connection between two ports.
Definition Route.h:28
const std::string & getToName() const
Get the destination of the route.
Definition Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition Route.cpp:93
void swapNames()
Swap from and to names.
Definition Route.cpp:133
void setToContact(const Contact &toContact)
Set the destination contact of the route.
Definition Route.cpp:118
An InputStream that reads from a string.
void add(const std::string &txt)
An OutputStream that produces a string.
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
bool isValid() const
Definition Type.cpp:154
std::string getName() const
Definition Type.cpp:139
A single value (typically within a Bottle).
Definition Value.h:43
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
Definition Value.cpp:228
virtual std::string asString() const
Get string value.
Definition Value.cpp:234
A helper for creating cached object descriptions.
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
A helper for recording entire message/reply transactions.
Manager for a single input to a port.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
bool checkPacket(PortCorePacket *packet)
Move a packet to the inactive state if it has finished being sent on all connections.
This manages a single threaded resource related to a single input or output connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
Definition PortCore.cpp:147
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
Definition PortCore.cpp:164
bool start() override
Begin main thread.
Definition PortCore.cpp:276
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
Definition PortCore.cpp:324
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
Definition PortCore.cpp:330
bool setCallbackLock(std::mutex *mutex=nullptr)
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
Definition PortCore.cpp:67
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
Definition PortCore.cpp:307
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
Definition PortCore.cpp:139
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
Definition PortCore.cpp:531
void close() override
Shut down port.
Definition PortCore.cpp:263
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
yarp::os::Type getType()
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
unsigned int getFlags()
Check current configuration of port.
Definition PortCore.h:298
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
Definition PortCore.cpp:155
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition PortCore.cpp:841
This is the heart of a yarp port.
Definition PortCore.h:102
yarp::os::Carrier * outputModifier
Definition PortCore.h:135
yarp::os::Carrier * inputModifier
Definition PortCore.h:136
Lets Readable objects read from the underlying InputStream associated with the connection between two...
int join(double seconds=-1)
#define yCIAssert(component, id, x)
#define yCWarning(component,...)
#define yCIError(component, id,...)
#define yCITrace(component, id,...)
#define yCIDebug(component, id,...)
#define yCIWarning(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition environment.h:66
std::string to_string(IntegerType x)
Definition numeric.h:115
std::int32_t vocab32_t
Definition numeric.h:78
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition NetInt32.h:29
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
Definition Vocab.h:27
The main, catch-all namespace for YARP.
Definition dirs.h:16
The PlatformInfo struct holds the operating system information.
Definition SystemInfo.h:80
The ProcessInfo struct provides the operating system process information.
Definition SystemInfo.h:112
#define YARP_UNUSED(var)
Definition api.h:162