35# include <ace/INET_Addr.h>
36# include <ace/Sched_Params.h>
45# include <sys/types.h>
81 std::lock_guard<std::mutex> lock(m_stateMutex);
104 if (m_face ==
nullptr) {
109 if (m_address.
getPort() <= 0) {
118 m_listening.store(
true);
152 m_adminReader = &reader;
186 std::lock_guard<std::mutex> lock(m_stateMutex);
187 m_running.store(
true);
188 m_starting.store(
false);
192 m_stateCv.notify_one();
200 bool shouldStop =
false;
201 while (!shouldStop) {
208 std::lock_guard<std::mutex> lock(m_stateMutex);
220 shouldStop |= m_closing.load();
248 std::lock_guard<std::mutex> lock(m_stateMutex);
249 m_connectionListeners = 0;
250 m_connectionChangeCv.notify_all();
256 std::lock_guard<std::mutex> lock(m_stateMutex);
257 m_connectionListeners = 0;
258 m_connectionChangeCv.notify_all();
259 m_finished.store(
true);
267 if (m_prop !=
nullptr) {
281 std::unique_lock<std::mutex> lock(m_stateMutex);
290 m_starting.store(
true);
300 m_stateCv.wait(lock, [&]{
return m_running.load(); });
317 m_interruptable =
false;
327 m_interrupted =
false;
335 if (!m_listening.load()) {
340 m_interrupted =
true;
345 if (!m_interruptable) {
355 std::lock_guard<std::mutex> lock(m_stateMutex);
356 if (m_reader !=
nullptr) {
367void PortCore::closeMain()
373 std::lock_guard<std::mutex> lock(m_stateMutex);
376 if (m_finishing || !(m_running.load() || m_manual)) {
401 std::lock_guard<std::mutex> lock(m_stateMutex);
402 for (
auto*
unit : m_units) {
403 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isDoomed()) {
406 if (s.length() >= 1 && s[0] ==
'/' && s !=
getName() && s !=
prevName) {
436 std::lock_guard<std::mutex> lock(m_stateMutex);
437 for (
auto*
unit : m_units) {
438 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
457 m_closing.store(
true);
480 std::lock_guard<std::mutex> lock(m_stateMutex);
481 m_finished.store(
false);
482 m_closing.store(
false);
483 m_running.store(
false);
489 if (m_listening.load()) {
494 m_listening.store(
false);
500 if (m_reader !=
nullptr) {
510 if (name != std::string(
"")) {
511 if (m_controlRegistration) {
534 std::lock_guard<std::mutex> lock(m_stateMutex);
540void PortCore::closeUnits()
548 for (
auto& i : m_units) {
550 if (
unit !=
nullptr) {
565void PortCore::reapUnits()
569 if (!m_finished.load()) {
570 std::lock_guard<std::mutex> lock(m_stateMutex);
571 for (
auto*
unit : m_units) {
572 if ((
unit !=
nullptr) &&
unit->isDoomed() && !
unit->isFinished()) {
573 std::string s =
unit->getRoute().toString();
585void PortCore::cleanUnits(
bool blocking)
592 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
608 if (!m_finished.load()) {
611 for (
auto& i : m_units) {
613 if (
unit !=
nullptr) {
615 if (
unit->isFinished()) {
616 std::string con =
unit->getRoute().toString();
625 if (!
unit->isDoomed()) {
626 if (
unit->isOutput()) {
628 if (
unit->getMode().empty()) {
632 if (
unit->isInput()) {
633 if (
unit->getRoute().getFromName() !=
"admin") {
646 for (
size_t i2 = 0;
i2 < m_units.size();
i2++) {
647 if (m_units[
i2] !=
nullptr) {
649 m_units[
rem] = m_units[
i2];
650 m_units[
i2] =
nullptr;
657 for (
size_t i3 = 0;
i3 < m_units.size() -
rem;
i3++) {
666 m_packetMutex.lock();
669 m_packetMutex.unlock();
684 std::lock_guard<std::mutex> lock(m_stateMutex);
691 m_units.push_back(
unit);
708 if (!m_finished.load()) {
709 std::lock_guard<std::mutex> lock(m_stateMutex);
713 m_units.push_back(
unit);
718bool PortCore::isUnit(
const Route& route,
int index)
722 if (!m_finished.load()) {
723 for (
auto*
unit : m_units) {
724 if (
unit !=
nullptr) {
726 std::string
wild =
"*";
729 ok = ok && (
unit->getIndex() == index);
753bool PortCore::removeUnit(
const Route& route,
bool synch,
bool*
except)
770 if (!m_finished.load()) {
771 std::lock_guard<std::mutex> lock(m_stateMutex);
772 for (
auto*
unit : m_units) {
773 if (
unit !=
nullptr) {
775 std::string
wild =
"*";
828 std::unique_lock<std::mutex> lock(m_stateMutex);
830 m_connectionListeners++;
831 m_connectionChangeCv.wait(lock, [&]{
return m_connectionListeners == 0; });
860 bw.appendLine(std::string(
"Do not know how to connect to ") + dest);
878 bw.appendLine(std::string(
"Desired connection already present from ") +
getName() +
" to " + dest);
909 bool is_log = (!mode.empty());
912 err =
"Logger configured as log." + mode +
", but only log.in is supported";
915 append =
"; " + r.
getFromName() +
" will forward messages and replies (if any) to " + r.
getToName();
926 err =
"Outputs not allowed";
931 if (m_dataOutputCount >= 1 && !
is_log) {
932 err =
"RPC output already connected";
958 bool ok = op->
open(r);
968 bw.appendLine(std::string(
"Cannot connect to ") + dest);
989 if (!m_finished.load()) {
990 std::lock_guard<std::mutex> lock(m_stateMutex);
997 m_units.push_back(
unit);
1002 bw.appendLine(std::string(
"Added connection from ") +
getName() +
" to " + dest + append);
1003 if (os !=
nullptr) {
1018 if (removeUnit(
Route(
"*", dest,
"*"),
true)) {
1019 bw.appendLine(std::string(
"Removed connection from ") +
getName() +
" to " + dest);
1021 bw.appendLine(std::string(
"Could not find an outgoing connection to ") + dest);
1023 if (os !=
nullptr) {
1036 if (removeUnit(
Route(src,
"*",
"*"),
true)) {
1037 bw.appendLine(std::string(
"Removing connection from ") + src +
" to " +
getName());
1039 bw.appendLine(std::string(
"Could not find an incoming connection from ") + src);
1041 if (os !=
nullptr) {
1058 std::lock_guard<std::mutex> lock(m_stateMutex);
1061 bw.appendLine(std::string(
"This is ") + m_address.
getRegName() +
" at " + m_address.
toURI());
1065 for (
auto*
unit : m_units) {
1066 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1074 bw.appendLine(
"There are no outgoing connections");
1079 for (
auto*
unit : m_units) {
1080 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1090 bw.appendLine(
"There are no incoming connections");
1095 if (os !=
nullptr) {
1109 std::lock_guard<std::mutex> lock(m_stateMutex);
1114 std::string portName = m_address.
getRegName();
1115 baseInfo.message = std::string(
"This is ") + portName +
" at " + m_address.
toURI();
1120 for (
auto*
unit : m_units) {
1121 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1127 info.incoming =
false;
1128 info.portName = portName;
1139 info.message =
"There are no outgoing connections";
1145 for (
auto*
unit : m_units) {
1146 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1152 info.incoming =
true;
1153 info.portName = portName;
1164 info.message =
"There are no incoming connections";
1172 std::lock_guard<std::mutex> lock(m_stateMutex);
1173 if (reporter !=
nullptr) {
1174 m_eventReporter = reporter;
1180 std::lock_guard<std::mutex> lock(m_stateMutex);
1181 m_eventReporter =
nullptr;
1192 if (m_eventReporter !=
nullptr) {
1210 if (m_reader !=
nullptr && !m_interrupted) {
1211 m_interruptable =
false;
1232 result = m_reader->
read(reader);
1236 m_interruptable =
true;
1241 result = b.
read(reader);
1276 if (m_interrupted || m_finishing) {
1305 std::lock_guard<std::mutex> lock(m_stateMutex);
1308 if (m_finished.load()) {
1315 m_packetMutex.lock();
1318 packet->setContent(&writer,
false, callback);
1319 m_packetMutex.unlock();
1322 for (
auto*
unit : m_units) {
1323 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1324 bool log = (!
unit->getMode().empty());
1335 m_packetMutex.lock();
1337 m_packetMutex.unlock();
1341 void* out =
unit->send(writer,
1343 (callback !=
nullptr) ? callback : (&writer),
1344 reinterpret_cast<void*
>(
packet),
1351 if (out !=
nullptr) {
1353 m_packetMutex.lock();
1356 m_packetMutex.unlock();
1359 if (
unit->isFinished()) {
1367 m_packetMutex.lock();
1375 m_packetMutex.unlock();
1380 m_logNeeded =
false;
1386 if (m_waitAfterSend && reader !=
nullptr) {
1396 bool writing =
false;
1400 if (!m_finished.load()) {
1401 std::lock_guard<std::mutex> lock(m_stateMutex);
1402 for (
auto*
unit : m_units) {
1403 if ((
unit !=
nullptr) && !
unit->isFinished() &&
unit->isBusy()) {
1416 m_packetMutex.lock();
1417 int result = m_inputCount;
1418 m_packetMutex.unlock();
1425 m_packetMutex.lock();
1426 int result = m_outputCount;
1427 m_packetMutex.unlock();
1435 m_packetMutex.lock();
1440 m_packetMutex.unlock();
1448 bool ok = envelope.
write(m_envelopeWriter);
1458 m_envelope = envelope;
1459 for (
size_t i = 0; i < m_envelope.length(); i++) {
1462 if (m_envelope[i] < 32) {
1463 m_envelope = m_envelope.substr(0, i);
1478 sis.
add(m_envelope);
1482 sbr.reset(sis,
nullptr, route, 0,
true);
1491 const char* carrier,
1497 style.
quiet = !verbose;
1510 int result =
addr.set(
c.getPort(),
c.getHost().c_str());
1514 result =
addr.set(
c.getPort(),
"127.0.0.1");
1518 result =
addr.set(
c.getPort(),
"127.0.1.1");
1559 auto cmd =
static_cast<PortCoreCommand
>(v.
asVocab32());
1561 case PortCoreCommand::Help:
1562 case PortCoreCommand::Ver:
1563 case PortCoreCommand::Pray:
1564 case PortCoreCommand::Add:
1565 case PortCoreCommand::Del:
1566 case PortCoreCommand::Atch:
1567 case PortCoreCommand::Dtch:
1568 case PortCoreCommand::List:
1569 case PortCoreCommand::Set:
1570 case PortCoreCommand::Get:
1571 case PortCoreCommand::Prop:
1574 return PortCoreCommand::Unknown;
1580 auto dir =
static_cast<PortCoreConnectionDirection
>(v);
1582 case PortCoreConnectionDirection::In:
1583 case PortCoreConnectionDirection::Out:
1586 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1592 auto action =
static_cast<PortCorePropertyAction
>(v);
1594 case PortCorePropertyAction::Get:
1595 case PortCorePropertyAction::Set:
1598 return PortCorePropertyAction::Error;
1605 bfrom.addString(
"from");
1609 bto.addString(
"to");
1622 if (!carrier->
isPush()) {
1650 result.
addString(
"[help] # give this help");
1651 result.
addString(
"[ver] # report protocol version information");
1652 result.
addString(
"[add] $portname # add an output connection");
1653 result.
addString(
"[add] $portname $car # add an output with a given protocol");
1654 result.
addString(
"[del] $portname # remove an input or output connection");
1655 result.
addString(
"[list] [in] # list input connections");
1656 result.
addString(
"[list] [out] # list output connections");
1657 result.
addString(
"[list] [in] $portname # give details for input");
1658 result.
addString(
"[list] [out] $portname # give details for output");
1659 result.
addString(
"[prop] [get] # get all user-defined port properties");
1660 result.
addString(
"[prop] [get] $prop # get a user-defined port property (prop, val)");
1661 result.
addString(
"[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1662 result.
addString(
"[prop] [get] $portname # get Qos properties of a connection to/from a port");
1663 result.
addString(
"[prop] [set] $portname # set Qos properties of a connection to/from a port");
1664 result.
addString(
"[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1665 result.
addString(
"[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1666 result.
addString(
"[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1667 result.
addString(
"[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1668 result.
addString(
"[dtch] [out] # detach portmonitor plug-in from the port's output");
1669 result.
addString(
"[dtch] [in] # detach portmonitor plug-in from the port's input");
1698 while (name[0] ==
'/') {
1699 name = name.substr(1);
1702 auto i = name.find(
'/');
1703 if (i != std::string::npos) {
1704 name = name.substr(0, i);
1708 std::random_device
rd;
1709 std::mt19937
mt(
rd());
1710 std::uniform_int_distribution<int>
dist2(0,1);
1713 result.
addString(
"You begin praying to " + name +
".");
1714 result.
addString(
"You finish your prayer.");
1737 static const char*
auras[] = {
1750 static const char*
items[] = {
1766 "You feel more limber.",
1767 "The slime disappears.",
1768 "Your amulet vanishes! You can breathe again.",
1769 "You can breathe again.",
1770 "You are back on solid ground.",
1771 "Your stomach feels content.",
1773 "You feel much better.",
1774 "Your surroundings change.",
1775 "Your shape becomes uncertain.",
1776 "Your chain disappears.",
1777 "There's a tiger in your tank.",
1778 "You feel in good health again.",
1779 "Your eye feels better.",
1780 "Your eyes feel better.",
1781 "Looks like you are back in Kansas.",
1782 "Your <ITEM> softly glows <AURA>.",
1792 std::uniform_int_distribution<int>
dist13(0,12);
1796 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"bummed" :
"displeased") +
".");
1801 ": \"Thou " + (
d2() ?
"hast strayed from the path" :
"art arrogant") +
1802 ", " +
creature() +
". Thou must relearn thy lessons!\"");
1807 ": \"Thou hast angered me.\"");
1808 result.
addString(
"A black glow surrounds you.");
1812 ": \"Thou hast angered me.\"");
1817 ": \"Thou durst " + (
d2() ?
"scorn" :
"call upon") +
1818 " me? Then die, " +
creature() +
"!\"");
1821 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"pleased as punch" :
"well-pleased") +
".");
1825 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"ticklish" :
"pleased") +
".");
1829 result.
addString(
"You feel that " + name +
" is " + (
d2() ?
"full" :
"satisfied") +
".");
1834 ": \"Thou hast angered me.\"");
1835 result.
addString(
"Suddenly, a bolt of lightning strikes you!");
1836 result.
addString(
"You fry to a crisp!");
1844 const std::string& carrier) {
1848 if (!carrier.empty()) {
1849 output = carrier +
":/" + output;
1853 int v = (r[0] ==
'A') ? 0 : -1;
1868 int v = (
r1[0] ==
'R' ||
r2[0] ==
'R') ? 0 : -1;
1870 if (
r1[0] ==
'R' &&
r2[0] !=
'R') {
1872 }
else if (
r1[0] !=
'R' &&
r2[0] ==
'R') {
1883 switch (direction) {
1884 case PortCoreConnectionDirection::Out: {
1886 if (!attachPortMonitor(prop,
true,
errMsg)) {
1893 case PortCoreConnectionDirection::In: {
1895 if (!attachPortMonitor(prop,
false,
errMsg)) {
1902 case PortCoreConnectionDirection::Error:
1904 result.
addString(
"attach command must be followed by [out] or [in]");
1911 switch (direction) {
1912 case PortCoreConnectionDirection::Out: {
1913 if (detachPortMonitor(
true)) {
1919 case PortCoreConnectionDirection::In: {
1920 if (detachPortMonitor(
false)) {
1926 case PortCoreConnectionDirection::Error:
1928 result.
addString(
"detach command must be followed by [out] or [in]");
1934 const std::string& target) {
1936 switch (direction) {
1937 case PortCoreConnectionDirection::In: {
1939 std::lock_guard<std::mutex> lock(m_stateMutex);
1940 for (
auto*
unit : m_units) {
1941 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1943 if (target.empty()) {
1945 if (!name.empty()) {
1954 case PortCoreConnectionDirection::Out: {
1956 std::lock_guard<std::mutex> lock(m_stateMutex);
1957 for (
auto*
unit : m_units) {
1958 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
1960 if (target.empty()) {
1962 }
else if (route.
getToName() == target) {
1968 case PortCoreConnectionDirection::Error:
1980 std::lock_guard<std::mutex> lock(m_stateMutex);
1981 if (target.empty()) {
1983 result.
addString(
"target port is not specified.\r\n");
1987 if (!setParamPortMonitor(property,
false,
errMsg)) {
1994 for (
auto*
unit : m_units) {
1995 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
1998 unit->setCarrierParams(property);
2000 std::string
msg =
"Configured connection from ";
2009 if (result.
size() == 0) {
2011 std::string
msg =
"Could not find an incoming connection from ";
2024 std::lock_guard<std::mutex> lock(m_stateMutex);
2025 if (target.empty()) {
2027 result.
addString(
"target port is not specified.\r\n");
2031 if (!setParamPortMonitor(property,
true,
errMsg)) {
2038 for (
auto*
unit : m_units) {
2039 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
2042 unit->setCarrierParams(property);
2044 std::string
msg =
"Configured connection to ";
2053 if (result.
size() == 0) {
2055 std::string
msg =
"Could not find an incoming connection to ";
2067 std::lock_guard<std::mutex> lock(m_stateMutex);
2068 if (target.empty()) {
2070 result.
addString(
"target port is not specified.\r\n");
2071 }
else if (target ==
getName()) {
2074 if (!getParamPortMonitor(property,
false,
errMsg)) {
2081 for (
auto*
unit : m_units) {
2082 if ((
unit !=
nullptr) &&
unit->isInput() && !
unit->isFinished()) {
2086 unit->getCarrierParams(property);
2092 if (result.
size() == 0) {
2094 std::string
msg =
"Could not find an incoming connection from ";
2106 std::lock_guard<std::mutex> lock(m_stateMutex);
2107 if (target.empty()) {
2109 result.
addString(
"target port is not specified.\r\n");
2110 }
else if (target ==
getName()) {
2113 if (!getParamPortMonitor(property,
true,
errMsg)) {
2120 for (
auto*
unit : m_units) {
2121 if ((
unit !=
nullptr) &&
unit->isOutput() && !
unit->isFinished()) {
2125 unit->getCarrierParams(property);
2131 if (result.
size() == 0) {
2133 std::string
msg =
"Could not find an incoming connection to ";
2150 if (key[0] ==
'/') {
2156 sched.addString(
"sched");
2191 for (
auto*
unit : m_units) {
2192 if ((
unit !=
nullptr) && !
unit->isFinished()) {
2197 int priority =
unit->getPriority();
2198 int policy =
unit->getPolicy();
2199 int tos = getTypeOfService(
unit);
2200 int tid =
static_cast<int>(
unit->getTid());
2202 sched.addString(
"sched");
2208 qos.addString(
"qos");
2218 std::string
msg =
"cannot find any connection to/from ";
2245 if (!process.isNull()) {
2246 std::string portName = key;
2247 if ((!portName.empty()) && (portName[0] ==
'/')) {
2261 bOk = setProcessSchedulingParam(
prio, policy);
2272 if (!
sched.isNull()) {
2273 if ((!key.empty()) && (key[0] ==
'/')) {
2275 for (
auto*
unit : m_units) {
2276 if ((
unit !=
nullptr) && !
unit->isFinished()) {
2280 if (portName == key) {
2289 policy =
sched_prop->find(
"policy").asInt32();
2305 if (!
qos.isNull()) {
2306 if ((!key.empty()) && (key[0] ==
'/')) {
2308 for (
auto*
unit : m_units) {
2309 if ((
unit !=
nullptr) && !
unit->isFinished()) {
2312 if (portName == key) {
2341 }
else if (
qos_prop->check(
"dscp")) {
2356 }
else if (
qos_prop->check(
"tos")) {
2384 if (m_adminReader !=
nullptr) {
2396 result.
addString(
"send [help] for list of valid commands");
2403 case PortCoreCommand::Help:
2406 case PortCoreCommand::Ver:
2409 case PortCoreCommand::Pray:
2412 case PortCoreCommand::Add: {
2417 case PortCoreCommand::Del: {
2421 case PortCoreCommand::Atch: {
2426 case PortCoreCommand::Dtch: {
2430 case PortCoreCommand::List: {
2435 case PortCoreCommand::Set: {
2440 switch (direction) {
2441 case PortCoreConnectionDirection::In:
2444 case PortCoreConnectionDirection::Out:
2447 case PortCoreConnectionDirection::Error:
2452 case PortCoreCommand::Get: {
2455 switch (direction) {
2456 case PortCoreConnectionDirection::In:
2459 case PortCoreConnectionDirection::Out:
2462 case PortCoreConnectionDirection::Error:
2467 case PortCoreCommand::Prop: {
2472 case PortCorePropertyAction::Get:
2475 case PortCorePropertyAction::Set: {
2482 case PortCorePropertyAction::Error:
2484 result.
addString(
"property action not known");
2488 case PortCoreCommand::Unknown:
2494 if (writer !=
nullptr) {
2495 result.
write(*writer);
2504 if (
unit ==
nullptr) {
2510 if (
unit->isOutput()) {
2514 if (op !=
nullptr) {
2530 if (
unit->isInput()) {
2534 if ((ip !=
nullptr) && ip->
getOutput().isOk()) {
2536 bool ok = ip->
getOutput().getOutputStream().setTypeOfService(
tos);
2550 if (
unit ==
nullptr) {
2554 if (
unit->isOutput()) {
2558 if (op !=
nullptr) {
2569 if (
unit->isInput()) {
2573 if ((ip !=
nullptr) && ip->
getOutput().isOk()) {
2574 return ip->
getOutput().getOutputStream().getTypeOfService();
2587 errMsg =
"Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2592 detachPortMonitor(
true);
2594 prop.
put(
"destination",
"");
2595 prop.
put(
"sender_side", 1);
2596 prop.
put(
"receiver_side", 0);
2597 prop.
put(
"carrier",
"");
2602 errMsg =
"Failed to configure the portmonitor plug-in";
2608 detachPortMonitor(
false);
2609 prop.
put(
"source",
"");
2611 prop.
put(
"sender_side", 0);
2612 prop.
put(
"receiver_side", 1);
2613 prop.
put(
"carrier",
"");
2618 errMsg =
"Failed to configure the portmonitor plug-in";
2628bool PortCore::detachPortMonitor(
bool isOutput)
2649 errMsg =
"No port modifier is attached to the output";
2658 errMsg =
"No port modifier is attached to the input";
2675 errMsg =
"No port modifier is attached to the output";
2684 errMsg =
"No port modifier is attached to the input";
2697 if (
unit !=
nullptr) {
2698 bool isLog = (!
unit->getMode().empty());
2705bool PortCore::setProcessSchedulingParam(
int priority,
int policy)
2707#if defined(__linux__)
2714 sprintf(path,
"/proc/%d/task/", yarp::os::impl::getpid());
2717 if (
dir ==
nullptr) {
2726 if (
isdigit(
static_cast<unsigned char>(*d->d_name)) == 0) {
2731 if (d->d_name == end || ((end !=
nullptr) && (*end != 0))) {
2739#elif defined(YARP_HAS_ACE)
2742 int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2751 m_stateMutex.lock();
2753 if (m_prop ==
nullptr) {
2763 m_stateMutex.unlock();
2768 return removeUnit(route, synch);
2781int PortCore::getNextIndex()
2783 int result = m_counter;
2785 if (m_counter < 0) {
2803 return m_readableCreator;
2808 m_controlRegistration =
flag;
2813 return m_listening.load();
2823 return m_interrupted;
2828 m_timeout = timeout;
2834 if (mutex !=
nullptr) {
2836 m_mutexOwned =
false;
2838 m_mutex =
new std::mutex;
2839 m_mutexOwned =
true;
2846 if (m_mutexOwned && (m_mutex !=
nullptr)) {
2850 m_mutexOwned =
false;
2856 if (m_mutex ==
nullptr) {
2865 if (m_mutex ==
nullptr) {
2868 return m_mutex->try_lock();
2873 if (m_mutex ==
nullptr) {
2887 if (!m_checkedType) {
2891 m_checkedType =
true;
2893 m_typeMutex.unlock();
2900 m_typeMutex.unlock();
2908 m_typeMutex.unlock();
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
#define PORTCORE_SEND_LOG
#define PORTCORE_IS_OUTPUT
#define PORTCORE_SEND_NORMAL
A simple collection of objects that can be described and transmitted in a portable way.
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
void fromString(const std::string &text)
Initializes bottle from a string.
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
void getCarrierParams(Property ¶ms) const override
Get carrier configuration and deliver it by port administrative commands.
virtual bool configureFromProperty(yarp::os::Property &options)
bool isPush() const override
Check if carrier is "push" or "pull" style.
bool acceptOutgoingData(const PortWriter &writer) override
Determine whether outgoing data should be accepted.
const PortWriter & modifyOutgoingData(const PortWriter &writer) override
Modify outgoing payload data, if appropriate.
void setCarrierParams(const Property ¶ms) override
Configure carrier from port administrative commands.
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
An interface for reading from a network connection.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
virtual InputProtocol * read()=0
Block and wait for someone to talk to us.
virtual Contact getLocalAddress() const
Get address after open(), if more specific that the address provided to open() - otherwise an invalid...
virtual OutputProtocol * write(const Contact &address)=0
Try to reach out and talk to someone.
virtual void close()=0
Stop listening.
Simple abstraction for a YARP port name.
Contact toAddress() const
Create an address from the name.
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
static bool initialized()
Returns true if YARP has been fully initialized.
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
static NameStore * getQueryBypass()
static Contact queryName(const std::string &name)
Find out information about a registered name.
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
The output side of an active connection between two ports.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
Simple specification of the minimum functions needed from output streams.
Information about a port connection or event.
std::string message
A human-readable description of contents.
int tag
Type of information.
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
@ PORTINFO_MISC
Unspecified information.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
std::string toString() const override
Return a standard text representation of the content of the object.
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
Information about a connection between two ports.
const std::string & getToName() const
Get the destination of the route.
const std::string & getCarrierName() const
Get the carrier type of the route.
std::string toString() const
Render a text form of the route, "source->carrier->dest".
const std::string & getFromName() const
Get the source of the route.
void swapNames()
Swap from and to names.
void setToContact(const Contact &toContact)
Set the destination contact of the route.
An OutputStream that produces a string.
std::string toString() const
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
std::string getName() const
A single value (typically within a Bottle).
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
virtual std::string asString() const
Get string value.
A helper for creating cached object descriptions.
std::string toString() const
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
A helper for recording entire message/reply transactions.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
bool checkPacket(PortCorePacket *packet)
Move a packet to the inactive state if it has finished being sent on all connections.
This manages a single threaded resource related to a single input or output connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
bool start() override
Begin main thread.
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
bool setCallbackLock(std::mutex *mutex=nullptr)
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
bool isInterrupted() const
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
void close() override
Shut down port.
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
unsigned int getFlags()
Check current configuration of port.
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool removeCallbackLock()
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
This is the heart of a yarp port.
yarp::os::Carrier * outputModifier
yarp::os::Carrier * inputModifier
void releaseOutModifier()
Lets Readable objects read from the underlying InputStream associated with the connection between two...
int join(double seconds=-1)
#define yCIAssert(component, id, x)
#define yCWarning(component,...)
#define yCIError(component, id,...)
#define yCITrace(component, id,...)
#define yCIDebug(component, id,...)
#define yCIWarning(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
std::string to_string(IntegerType x)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
The main, catch-all namespace for YARP.
The ProcessInfo struct provides the operating system process information.