36# include <ace/INET_Addr.h>
37# include <ace/Sched_Params.h>
46# include <sys/types.h>
82 std::lock_guard<std::mutex> lock(m_stateMutex);
105 if (m_face ==
nullptr) {
110 if (m_address.
getPort() <= 0) {
119 m_listening.store(
true);
153 m_adminReader = &reader;
187 std::lock_guard<std::mutex> lock(m_stateMutex);
188 m_running.store(
true);
189 m_starting.store(
false);
193 m_stateCv.notify_one();
201 bool shouldStop =
false;
202 while (!shouldStop) {
209 std::lock_guard<std::mutex> lock(m_stateMutex);
221 shouldStop |= m_closing.load();
249 std::lock_guard<std::mutex> lock(m_stateMutex);
250 m_connectionListeners = 0;
251 m_connectionChangeCv.notify_all();
257 std::lock_guard<std::mutex> lock(m_stateMutex);
258 m_connectionListeners = 0;
259 m_connectionChangeCv.notify_all();
260 m_finished.store(
true);
268 if (m_prop !=
nullptr) {
282 std::unique_lock<std::mutex> lock(m_stateMutex);
291 m_starting.store(
true);
301 m_stateCv.wait(lock, [&]{
return m_running.load(); });
318 m_interruptable =
false;
328 m_interrupted =
false;
336 if (!m_listening.load()) {
341 m_interrupted =
true;
346 if (!m_interruptable) {
356 std::lock_guard<std::mutex> lock(m_stateMutex);
357 if (m_reader !=
nullptr) {
368void PortCore::closeMain()
374 std::lock_guard<std::mutex> lock(m_stateMutex);
377 if (m_finishing || !(m_running.load() || m_manual)) {
402 std::lock_guard<std::mutex> lock(m_stateMutex);
403 for (
auto*
unit : m_units) {
404 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isDoomed()) {
407 if (s.length() >= 1 && s[0] ==
'/' && s !=
getName() && s !=
prevName) {
437 std::lock_guard<std::mutex> lock(m_stateMutex);
438 for (
auto*
unit : m_units) {
439 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
458 m_closing.store(
true);
481 std::lock_guard<std::mutex> lock(m_stateMutex);
482 m_finished.store(
false);
483 m_closing.store(
false);
484 m_running.store(
false);
490 if (m_listening.load()) {
495 m_listening.store(
false);
501 if (m_reader !=
nullptr) {
511 if (name != std::string(
"")) {
512 if (m_controlRegistration) {
535 std::lock_guard<std::mutex> lock(m_stateMutex);
541void PortCore::closeUnits()
549 for (
auto&
i : m_units) {
551 if (
unit !=
nullptr) {
566void PortCore::reapUnits()
570 if (!m_finished.load()) {
571 std::lock_guard<std::mutex> lock(m_stateMutex);
572 for (
auto*
unit : m_units) {
573 if ((
unit !=
nullptr) &&
unit->isDoomed() && !
unit->isFinished()) {
574 std::string s =
unit->getRoute().toString();
586void PortCore::cleanUnits(
bool blocking)
593 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
609 if (!m_finished.load()) {
612 for (
auto&
i : m_units) {
614 if (
unit !=
nullptr) {
616 if (
unit->isFinished()) {
617 std::string con =
unit->getRoute().toString();
626 if (!
unit->isDoomed()) {
627 if (
unit->isOutput()) {
629 if (
unit->getMode().empty()) {
633 if (
unit->isInput()) {
634 if (
unit->getRoute().getFromName() !=
"admin") {
647 for (
size_t i2 = 0;
i2 < m_units.size();
i2++) {
648 if (m_units[
i2] !=
nullptr) {
650 m_units[
rem] = m_units[
i2];
651 m_units[
i2] =
nullptr;
658 for (
size_t i3 = 0;
i3 < m_units.size() -
rem;
i3++) {
667 m_packetMutex.lock();
670 m_packetMutex.unlock();
685 std::lock_guard<std::mutex> lock(m_stateMutex);
692 m_units.push_back(
unit);
709 if (!m_finished.load()) {
710 std::lock_guard<std::mutex> lock(m_stateMutex);
714 m_units.push_back(
unit);
719bool PortCore::isUnit(
const Route& route,
int index)
723 if (!m_finished.load()) {
724 for (
auto*
unit : m_units) {
725 if (
unit !=
nullptr) {
727 std::string
wild =
"*";
730 ok = ok && (
unit->getIndex() == index);
754bool PortCore::removeUnit(
const Route& route,
bool synch,
bool*
except)
771 if (!m_finished.load()) {
772 std::lock_guard<std::mutex> lock(m_stateMutex);
773 for (
auto*
unit : m_units) {
774 if (
unit !=
nullptr) {
776 std::string
wild =
"*";
829 std::unique_lock<std::mutex> lock(m_stateMutex);
831 m_connectionListeners++;
832 m_connectionChangeCv.wait(lock, [&]{
return m_connectionListeners == 0; });
861 bw.appendLine(std::string(
"Do not know how to connect to ") + dest);
879 bw.appendLine(std::string(
"Desired connection already present from ") +
getName() +
" to " + dest);
910 bool is_log = (!mode.empty());
913 err =
"Logger configured as log." + mode +
", but only log.in is supported";
916 append =
"; " + r.
getFromName() +
" will forward messages and replies (if any) to " + r.
getToName();
927 err =
"Outputs not allowed";
932 if (m_dataOutputCount >= 1 && !
is_log) {
933 err =
"RPC output already connected";
959 bool ok = op->
open(r);
969 bw.appendLine(std::string(
"Cannot connect to ") + dest);
990 if (!m_finished.load()) {
991 std::lock_guard<std::mutex> lock(m_stateMutex);
998 m_units.push_back(
unit);
1003 bw.appendLine(std::string(
"Added connection from ") +
getName() +
" to " + dest + append);
1004 if (os !=
nullptr) {
1019 if (removeUnit(
Route(
"*", dest,
"*"),
true)) {
1020 bw.appendLine(std::string(
"Removed connection from ") +
getName() +
" to " + dest);
1022 bw.appendLine(std::string(
"Could not find an outgoing connection to ") + dest);
1024 if (os !=
nullptr) {
1037 if (removeUnit(
Route(src,
"*",
"*"),
true)) {
1038 bw.appendLine(std::string(
"Removing connection from ") + src +
" to " +
getName());
1040 bw.appendLine(std::string(
"Could not find an incoming connection from ") + src);
1042 if (os !=
nullptr) {
1059 std::lock_guard<std::mutex> lock(m_stateMutex);
1062 bw.appendLine(std::string(
"This is ") + m_address.
getRegName() +
" at " + m_address.
toURI());
1066 for (
auto*
unit : m_units) {
1067 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1075 bw.appendLine(
"There are no outgoing connections");
1080 for (
auto*
unit : m_units) {
1081 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1091 bw.appendLine(
"There are no incoming connections");
1096 if (os !=
nullptr) {
1110 std::lock_guard<std::mutex> lock(m_stateMutex);
1115 std::string portName = m_address.
getRegName();
1116 baseInfo.message = std::string(
"This is ") + portName +
" at " + m_address.
toURI();
1121 for (
auto*
unit : m_units) {
1122 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1128 info.incoming =
false;
1129 info.portName = portName;
1140 info.message =
"There are no outgoing connections";
1146 for (
auto*
unit : m_units) {
1147 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1153 info.incoming =
true;
1154 info.portName = portName;
1165 info.message =
"There are no incoming connections";
1173 std::lock_guard<std::mutex> lock(m_stateMutex);
1181 std::lock_guard<std::mutex> lock(m_stateMutex);
1182 m_eventReporter =
nullptr;
1193 if (m_eventReporter !=
nullptr) {
1211 if (m_reader !=
nullptr && !m_interrupted) {
1212 m_interruptable =
false;
1233 result = m_reader->
read(reader);
1237 m_interruptable =
true;
1242 result = b.
read(reader);
1277 if (m_interrupted || m_finishing) {
1306 std::lock_guard<std::mutex> lock(m_stateMutex);
1309 if (m_finished.load()) {
1316 m_packetMutex.lock();
1319 packet->
setContent(&writer,
false, callback);
1320 m_packetMutex.unlock();
1323 for (
auto*
unit : m_units) {
1324 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1325 bool log = (!
unit->getMode().empty());
1336 m_packetMutex.lock();
1338 m_packetMutex.unlock();
1342 void* out =
unit->send(writer,
1344 (callback !=
nullptr) ? callback : (&writer),
1345 reinterpret_cast<void*
>(packet),
1352 if (out !=
nullptr) {
1354 m_packetMutex.lock();
1357 m_packetMutex.unlock();
1360 if (
unit->isFinished()) {
1368 m_packetMutex.lock();
1376 m_packetMutex.unlock();
1381 m_logNeeded =
false;
1387 if (m_waitAfterSend && reader !=
nullptr) {
1397 bool writing =
false;
1401 if (!m_finished.load()) {
1402 std::lock_guard<std::mutex> lock(m_stateMutex);
1403 for (
auto*
unit : m_units) {
1404 if ((
unit !=
nullptr) && !
unit->isFinished() &&
unit->isBusy()) {
1417 m_packetMutex.lock();
1418 int result = m_inputCount;
1419 m_packetMutex.unlock();
1426 m_packetMutex.lock();
1427 int result = m_outputCount;
1428 m_packetMutex.unlock();
1436 m_packetMutex.lock();
1441 m_packetMutex.unlock();
1449 bool ok = envelope.
write(m_envelopeWriter);
1459 m_envelope = envelope;
1460 for (
size_t i = 0;
i < m_envelope.length();
i++) {
1463 if (m_envelope[
i] < 32) {
1464 m_envelope = m_envelope.substr(0,
i);
1479 sis.
add(m_envelope);
1483 sbr.reset(sis,
nullptr, route, 0,
true);
1492 const char* carrier,
1498 style.
quiet = !verbose;
1511 int result =
addr.set(
c.getPort(),
c.getHost().c_str());
1515 result =
addr.set(
c.getPort(),
"127.0.0.1");
1519 result =
addr.set(
c.getPort(),
"127.0.1.1");
1568 if (cmd ==
"publisherUpdate") {
1569 return PortCoreCommand::RosPublisherUpdate;
1571 if (cmd ==
"requestTopic") {
1572 return PortCoreCommand::RosRequestTopic;
1574 if (cmd ==
"getPid") {
1575 return PortCoreCommand::RosGetPid;
1577 if (cmd ==
"getBusInfo") {
1578 return PortCoreCommand::RosGetBusInfo;
1582 auto cmd =
static_cast<PortCoreCommand
>(v.
asVocab32());
1584 case PortCoreCommand::Help:
1585 case PortCoreCommand::Ver:
1586 case PortCoreCommand::Pray:
1587 case PortCoreCommand::Add:
1588 case PortCoreCommand::Del:
1589 case PortCoreCommand::Atch:
1590 case PortCoreCommand::Dtch:
1591 case PortCoreCommand::List:
1592 case PortCoreCommand::Set:
1593 case PortCoreCommand::Get:
1594 case PortCoreCommand::Prop:
1595 case PortCoreCommand::RosPublisherUpdate:
1596 case PortCoreCommand::RosRequestTopic:
1597 case PortCoreCommand::RosGetPid:
1598 case PortCoreCommand::RosGetBusInfo:
1601 return PortCoreCommand::Unknown;
1607 auto dir =
static_cast<PortCoreConnectionDirection
>(v);
1609 case PortCoreConnectionDirection::In:
1610 case PortCoreConnectionDirection::Out:
1613 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1619 auto action =
static_cast<PortCorePropertyAction
>(v);
1621 case PortCorePropertyAction::Get:
1622 case PortCorePropertyAction::Set:
1625 return PortCorePropertyAction::Error;
1632 bfrom.addString(
"from");
1636 bto.addString(
"to");
1649 if (!carrier->
isPush()) {
1677 result.
addString(
"[help] # give this help");
1678 result.
addString(
"[ver] # report protocol version information");
1679 result.
addString(
"[add] $portname # add an output connection");
1680 result.
addString(
"[add] $portname $car # add an output with a given protocol");
1681 result.
addString(
"[del] $portname # remove an input or output connection");
1682 result.
addString(
"[list] [in] # list input connections");
1683 result.
addString(
"[list] [out] # list output connections");
1684 result.
addString(
"[list] [in] $portname # give details for input");
1685 result.
addString(
"[list] [out] $portname # give details for output");
1686 result.
addString(
"[prop] [get] # get all user-defined port properties");
1687 result.
addString(
"[prop] [get] $prop # get a user-defined port property (prop, val)");
1688 result.
addString(
"[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1689 result.
addString(
"[prop] [get] $portname # get Qos properties of a connection to/from a port");
1690 result.
addString(
"[prop] [set] $portname # set Qos properties of a connection to/from a port");
1691 result.
addString(
"[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1692 result.
addString(
"[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1693 result.
addString(
"[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1694 result.
addString(
"[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1695 result.
addString(
"[dtch] [out] # detach portmonitor plug-in from the port's output");
1696 result.
addString(
"[dtch] [in] # detach portmonitor plug-in from the port's input");
1725 while (name[0] ==
'/') {
1726 name = name.substr(1);
1729 auto i = name.find(
'/');
1730 if (
i != std::string::npos) {
1731 name = name.substr(0,
i);
1735 std::random_device
rd;
1736 std::mt19937
mt(
rd());
1737 std::uniform_int_distribution<int>
dist2(0,1);
1740 result.
addString(
"You begin praying to " + name +
".");
1741 result.
addString(
"You finish your prayer.");
1764 static const char*
auras[] = {
1777 static const char*
items[] = {
1793 "You feel more limber.",
1794 "The slime disappears.",
1795 "Your amulet vanishes! You can breathe again.",
1796 "You can breathe again.",
1797 "You are back on solid ground.",
1798 "Your stomach feels content.",
1800 "You feel much better.",
1801 "Your surroundings change.",
1802 "Your shape becomes uncertain.",
1803 "Your chain disappears.",
1804 "There's a tiger in your tank.",
1805 "You feel in good health again.",
1806 "Your eye feels better.",
1807 "Your eyes feel better.",
1808 "Looks like you are back in Kansas.",
1809 "Your <ITEM> softly glows <AURA>.",
1819 std::uniform_int_distribution<int>
dist13(0,12);
1823 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"bummed" :
"displeased") +
".");
1828 ": \"Thou " + (
d2() ?
"hast strayed from the path" :
"art arrogant") +
1829 ", " +
creature() +
". Thou must relearn thy lessons!\"");
1834 ": \"Thou hast angered me.\"");
1835 result.
addString(
"A black glow surrounds you.");
1839 ": \"Thou hast angered me.\"");
1844 ": \"Thou durst " + (
d2() ?
"scorn" :
"call upon") +
1845 " me? Then die, " +
creature() +
"!\"");
1848 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"pleased as punch" :
"well-pleased") +
".");
1852 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"ticklish" :
"pleased") +
".");
1856 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"full" :
"satisfied") +
".");
1861 ": \"Thou hast angered me.\"");
1862 result.
addString(
"Suddenly, a bolt of lightning strikes you!");
1863 result.
addString(
"You fry to a crisp!");
1871 const std::string& carrier) {
1875 if (!carrier.empty()) {
1876 output = carrier +
":/" + output;
1880 int v = (r[0] ==
'A') ? 0 : -1;
1895 int v = (
r1[0] ==
'R' ||
r2[0] ==
'R') ? 0 : -1;
1897 if (
r1[0] ==
'R' &&
r2[0] !=
'R') {
1899 }
else if (
r1[0] !=
'R' &&
r2[0] ==
'R') {
1910 switch (direction) {
1911 case PortCoreConnectionDirection::Out: {
1913 if (!attachPortMonitor(prop,
true,
errMsg)) {
1920 case PortCoreConnectionDirection::In: {
1922 if (!attachPortMonitor(prop,
false,
errMsg)) {
1929 case PortCoreConnectionDirection::Error:
1931 result.
addString(
"attach command must be followed by [out] or [in]");
1938 switch (direction) {
1939 case PortCoreConnectionDirection::Out: {
1940 if (detachPortMonitor(
true)) {
1946 case PortCoreConnectionDirection::In: {
1947 if (detachPortMonitor(
false)) {
1953 case PortCoreConnectionDirection::Error:
1955 result.
addString(
"detach command must be followed by [out] or [in]");
1961 const std::string& target) {
1963 switch (direction) {
1964 case PortCoreConnectionDirection::In: {
1966 std::lock_guard<std::mutex> lock(m_stateMutex);
1967 for (
auto*
unit : m_units) {
1968 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1970 if (target.empty()) {
1972 if (!name.empty()) {
1981 case PortCoreConnectionDirection::Out: {
1983 std::lock_guard<std::mutex> lock(m_stateMutex);
1984 for (
auto*
unit : m_units) {
1985 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1987 if (target.empty()) {
1989 }
else if (route.
getToName() == target) {
1995 case PortCoreConnectionDirection::Error:
2007 std::lock_guard<std::mutex> lock(m_stateMutex);
2008 if (target.empty()) {
2010 result.
addString(
"target port is not specified.\r\n");
2014 if (!setParamPortMonitor(property,
false,
errMsg)) {
2021 for (
auto*
unit : m_units) {
2022 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
2025 unit->setCarrierParams(property);
2027 std::string msg =
"Configured connection from ";
2036 if (result.
size() == 0) {
2038 std::string msg =
"Could not find an incoming connection from ";
2051 std::lock_guard<std::mutex> lock(m_stateMutex);
2052 if (target.empty()) {
2054 result.
addString(
"target port is not specified.\r\n");
2058 if (!setParamPortMonitor(property,
true,
errMsg)) {
2065 for (
auto*
unit : m_units) {
2066 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
2069 unit->setCarrierParams(property);
2071 std::string msg =
"Configured connection to ";
2080 if (result.
size() == 0) {
2082 std::string msg =
"Could not find an incoming connection to ";
2094 std::lock_guard<std::mutex> lock(m_stateMutex);
2095 if (target.empty()) {
2097 result.
addString(
"target port is not specified.\r\n");
2098 }
else if (target ==
getName()) {
2101 if (!getParamPortMonitor(property,
false,
errMsg)) {
2108 for (
auto*
unit : m_units) {
2109 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
2113 unit->getCarrierParams(property);
2119 if (result.
size() == 0) {
2121 std::string msg =
"Could not find an incoming connection from ";
2133 std::lock_guard<std::mutex> lock(m_stateMutex);
2134 if (target.empty()) {
2136 result.
addString(
"target port is not specified.\r\n");
2137 }
else if (target ==
getName()) {
2140 if (!getParamPortMonitor(property,
true,
errMsg)) {
2147 for (
auto*
unit : m_units) {
2148 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
2152 unit->getCarrierParams(property);
2158 if (result.
size() == 0) {
2160 std::string msg =
"Could not find an incoming connection to ";
2177 if (key[0] ==
'/') {
2183 sched.addString(
"sched");
2218 for (
auto*
unit : m_units) {
2219 if ((
unit !=
nullptr) && !
unit->isFinished()) {
2224 int priority =
unit->getPriority();
2225 int policy =
unit->getPolicy();
2226 int tos = getTypeOfService(
unit);
2227 int tid =
static_cast<int>(
unit->getTid());
2229 sched.addString(
"sched");
2235 qos.addString(
"qos");
2245 std::string msg =
"cannot find any connection to/from ";
2251 result.
add(
p->find(key));
2272 if (!process.isNull()) {
2273 std::string portName = key;
2274 if ((!portName.empty()) && (portName[0] ==
'/')) {
2288 bOk = setProcessSchedulingParam(
prio, policy);
2299 if (!
sched.isNull()) {
2300 if ((!key.empty()) && (key[0] ==
'/')) {
2302 for (
auto*
unit : m_units) {
2303 if ((
unit !=
nullptr) && !
unit->isFinished()) {
2307 if (portName == key) {
2316 policy =
sched_prop->find(
"policy").asInt32();
2332 if (!
qos.isNull()) {
2333 if ((!key.empty()) && (key[0] ==
'/')) {
2335 for (
auto*
unit : m_units) {
2336 if ((
unit !=
nullptr) && !
unit->isFinished()) {
2339 if (portName == key) {
2368 }
else if (
qos_prop->check(
"dscp")) {
2383 }
else if (
qos_prop->check(
"tos")) {
2419 if (
pubs !=
nullptr) {
2421 for (
size_t i = 0;
i <
pubs->size();
i++) {
2422 std::string pub =
pubs->get(
i).asString();
2428 std::lock_guard<std::mutex> lock(m_stateMutex);
2429 for (
auto*
unit : m_units) {
2430 if ((
unit !=
nullptr) &&
unit->isPupped()) {
2431 std::string
me =
unit->getPupString();
2439 for (
size_t i = 0;
i <
pubs->size();
i++) {
2440 std::string pub =
pubs->get(
i).asString();
2451 sublst.addString(
"TCPROS");
2454 if (!
__pc_rpc(
c,
"xmlrpc", req, reply,
false)) {
2455 fprintf(
stderr,
"Cannot connect to ROS subscriber %s\n", pub.c_str());
2457 __pc_rpc(
c,
"xmlrpc", req, reply,
true);
2461 std::string hostname;
2462 std::string carrier;
2466 }
else if (
pref ==
nullptr) {
2467 fprintf(
stderr,
"Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2468 }
else if (
pref->get(0).asString() !=
"TCPROS") {
2469 fprintf(
stderr,
"Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(),
pref->get(0).asString().c_str());
2475 carrier =
"tcpros+role.pub+topic.";
2484 if (op ==
nullptr) {
2497 std::lock_guard<std::mutex> lock(m_stateMutex);
2503 unit->setPupped(pub);
2505 m_units.push_back(
unit);
2536 result.
addInt32(yarp::os::impl::getpid());
2553 if (m_adminReader !=
nullptr) {
2565 result.
addString(
"send [help] for list of valid commands");
2572 case PortCoreCommand::Help:
2575 case PortCoreCommand::Ver:
2578 case PortCoreCommand::Pray:
2581 case PortCoreCommand::Add: {
2586 case PortCoreCommand::Del: {
2590 case PortCoreCommand::Atch: {
2595 case PortCoreCommand::Dtch: {
2599 case PortCoreCommand::List: {
2604 case PortCoreCommand::Set: {
2609 switch (direction) {
2610 case PortCoreConnectionDirection::In:
2613 case PortCoreConnectionDirection::Out:
2616 case PortCoreConnectionDirection::Error:
2621 case PortCoreCommand::Get: {
2624 switch (direction) {
2625 case PortCoreConnectionDirection::In:
2628 case PortCoreConnectionDirection::Out:
2631 case PortCoreConnectionDirection::Error:
2636 case PortCoreCommand::Prop: {
2641 case PortCorePropertyAction::Get:
2644 case PortCorePropertyAction::Set: {
2651 case PortCorePropertyAction::Error:
2653 result.
addString(
"property action not known");
2657 case PortCoreCommand::RosPublisherUpdate: {
2665 case PortCoreCommand::RosRequestTopic:
2673 case PortCoreCommand::RosGetPid:
2678 case PortCoreCommand::RosGetBusInfo:
2683 case PortCoreCommand::Unknown:
2689 if (writer !=
nullptr) {
2690 result.
write(*writer);
2699 if (
unit ==
nullptr) {
2705 if (
unit->isOutput()) {
2709 if (op !=
nullptr) {
2725 if (
unit->isInput()) {
2729 if ((ip !=
nullptr) && ip->
getOutput().isOk()) {
2731 bool ok = ip->
getOutput().getOutputStream().setTypeOfService(
tos);
2745 if (
unit ==
nullptr) {
2749 if (
unit->isOutput()) {
2753 if (op !=
nullptr) {
2764 if (
unit->isInput()) {
2768 if ((ip !=
nullptr) && ip->
getOutput().isOk()) {
2769 return ip->
getOutput().getOutputStream().getTypeOfService();
2782 errMsg =
"Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2787 detachPortMonitor(
true);
2789 prop.
put(
"destination",
"");
2790 prop.
put(
"sender_side", 1);
2791 prop.
put(
"receiver_side", 0);
2792 prop.
put(
"carrier",
"");
2797 errMsg =
"Failed to configure the portmonitor plug-in";
2803 detachPortMonitor(
false);
2804 prop.
put(
"source",
"");
2806 prop.
put(
"sender_side", 0);
2807 prop.
put(
"receiver_side", 1);
2808 prop.
put(
"carrier",
"");
2813 errMsg =
"Failed to configure the portmonitor plug-in";
2823bool PortCore::detachPortMonitor(
bool isOutput)
2844 errMsg =
"No port modifier is attached to the output";
2853 errMsg =
"No port modifier is attached to the input";
2870 errMsg =
"No port modifier is attached to the output";
2879 errMsg =
"No port modifier is attached to the input";
2892 if (
unit !=
nullptr) {
2893 bool isLog = (!
unit->getMode().empty());
2900bool PortCore::setProcessSchedulingParam(
int priority,
int policy)
2902#if defined(__linux__)
2909 sprintf(path,
"/proc/%d/task/", yarp::os::impl::getpid());
2912 if (
dir ==
nullptr) {
2921 if (
isdigit(
static_cast<unsigned char>(*
d->d_name)) == 0) {
2926 if (
d->d_name == end || ((end !=
nullptr) && (*end != 0))) {
2934#elif defined(YARP_HAS_ACE)
2937 int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2946 m_stateMutex.lock();
2948 if (m_prop ==
nullptr) {
2958 m_stateMutex.unlock();
2963 return removeUnit(route, synch);
2976int PortCore::getNextIndex()
2978 int result = m_counter;
2980 if (m_counter < 0) {
2998 return m_readableCreator;
3003 m_controlRegistration =
flag;
3008 return m_listening.load();
3018 return m_interrupted;
3023 m_timeout = timeout;
3029 if (mutex !=
nullptr) {
3031 m_mutexOwned =
false;
3033 m_mutex =
new std::mutex;
3034 m_mutexOwned =
true;
3041 if (m_mutexOwned && (m_mutex !=
nullptr)) {
3045 m_mutexOwned =
false;
3051 if (m_mutex ==
nullptr) {
3060 if (m_mutex ==
nullptr) {
3063 return m_mutex->try_lock();
3068 if (m_mutex ==
nullptr) {
3082 if (!m_checkedType) {
3086 m_checkedType =
true;
3088 m_typeMutex.unlock();
3095 m_typeMutex.unlock();
3103 m_typeMutex.unlock();
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
#define PORTCORE_SEND_LOG
#define PORTCORE_IS_OUTPUT
#define PORTCORE_SEND_NORMAL
A simple collection of objects that can be described and transmitted in a portable way.
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
void fromString(const std::string &text)
Initializes bottle from a string.
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
void getCarrierParams(Property ¶ms) const override
Get carrier configuration and deliver it by port administrative commands.
virtual bool configureFromProperty(yarp::os::Property &options)
bool isPush() const override
Check if carrier is "push" or "pull" style.
bool acceptOutgoingData(const PortWriter &writer) override
Determine whether outgoing data should be accepted.
const PortWriter & modifyOutgoingData(const PortWriter &writer) override
Modify outgoing payload data, if appropriate.
void setCarrierParams(const Property ¶ms) override
Configure carrier from port administrative commands.
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
virtual InputProtocol * read()=0
Block and wait for someone to talk to us.
virtual Contact getLocalAddress() const
Get address after open(), if more specific that the address provided to open() - otherwise an invalid...
virtual OutputProtocol * write(const Contact &address)=0
Try to reach out and talk to someone.
virtual void close()=0
Stop listening.
Simple abstraction for a YARP port name.
Contact toAddress() const
Create an address from the name.
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
static bool initialized()
Returns true if YARP has been fully initialized.
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
static NameStore * getQueryBypass()
static Contact queryName(const std::string &name)
Find out information about a registered name.
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
The output side of an active connection between two ports.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual const Route & getRoute() const =0
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
Simple specification of the minimum functions needed from output streams.
Information about a port connection or event.
std::string message
A human-readable description of contents.
int tag
Type of information.
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
@ PORTINFO_MISC
Unspecified information.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
const std::string & getToName() const
Get the destination of the route.
const std::string & getCarrierName() const
Get the carrier type of the route.
std::string toString() const
Render a text form of the route, "source->carrier->dest".
const std::string & getFromName() const
Get the source of the route.
void swapNames()
Swap from and to names.
void setToContact(const Contact &toContact)
Set the destination contact of the route.
An OutputStream that produces a string.
std::string toString() const
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
std::string getName() const
A single value (typically within a Bottle).
virtual bool isString() const
Checks if value is a string.
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
virtual Bottle * asList() const
Get list value.
virtual std::string asString() const
Get string value.
A helper for creating cached object descriptions.
std::string toString() const
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
A helper for recording entire message/reply transactions.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
bool checkPacket(PortCorePacket *packet)
Move a packet to the inactive state if it has finished being sent on all connections.
This manages a single threaded resource related to a single input or output connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
bool start() override
Begin main thread.
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
bool setCallbackLock(std::mutex *mutex=nullptr)
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
bool isInterrupted() const
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
void close() override
Shut down port.
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
unsigned int getFlags()
Check current configuration of port.
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool removeCallbackLock()
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
This is the heart of a yarp port.
yarp::os::Carrier * outputModifier
yarp::os::Carrier * inputModifier
void releaseOutModifier()
Lets Readable objects read from the underlying InputStream associated with the connection between two...
int join(double seconds=-1)
#define yCIAssert(component, id, x)
#define yCWarning(component,...)
#define yCIError(component, id,...)
#define yCITrace(component, id,...)
#define yCIDebug(component, id,...)
#define yCIWarning(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
std::string to_string(IntegerType x)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
The main, catch-all namespace for YARP.
The ProcessInfo struct provides the operating system process information.