12#define YARPRUN_NORESPONSE 1
13#define YARPRUN_NOCONNECTION 2
14#define YARPRUN_CONNECTION_TIMOUT 3
15#define YARPRUN_SEMAPHORE_PARAM 4
16#define YARPRUN_UNDEF 5
18#define CONNECTION_TIMEOUT 2.0
19#define RUN_TIMEOUT 10.0
20#define STOP_TIMEOUT 15.0
21#define KILL_TIMEOUT 10.0
22#define EVENT_THREAD_PERIOD 0.5
29 " (Remote host does not respond) ",
30 " (Remote host does no exist) ",
31 " (Timeout while connecting to the remote host) ",
32 " (Blocked in broker semaphor) ",
33 " (Undefined message) " };
42 bOnlyConnector = bInitialized =
false;
68 strError =
"YARP network server is not up.";
72 bOnlyConnector =
true;
104 strError =
"command is not specified.";
111 strError =
"remote host port is not specified.";
117 strHost = std::string(
"/") + std::string(
szhost);
133 strStdio = std::string(
"/") + std::string(
szstdio);
145 strTag = strHost + strCmd + strParam + strEnv +
sstrID.str();
146 std::string::iterator itr;
147 for (itr = strTag.begin(); itr != strTag.end(); itr++) {
148 if (((*itr) ==
' ') || ((*itr) ==
'/')) {
153 __trace_message =
"(init) checking yarp network";
156 strError =
"YARP network server is not up.";
157 __trace_message.clear();
161 __trace_message = std::string(
"(init) checking existence of ") + strHost;
162 if(!
exists(strHost.c_str()))
165 strError +=
" does not exist. check yarprun is running as server.";
166 __trace_message.clear();
190 if (bOnlyConnector) {
195 int ret = requestServer(runProperty());
198 strError =
"cannot ask ";
200 strError +=
" to run ";
204 strError += std::string(
" due to " + __trace_message);
214 if(strStdioUUID.size())
225 strError =
"cannot run ";
237 if (bOnlyConnector) {
246 grp.addString(strHost.c_str());
249 grp.addString(
"sigterm");
250 grp.addString(strTag.c_str());
255 strError =
"cannot ask ";
257 strError +=
" to stop ";
261 strError += std::string(
" due to " + __trace_message);
276 strError =
"Timeout! Cannot stop ";
289 if (bOnlyConnector) {
298 grp.addString(strHost.c_str());
301 grp.addString(
"kill");
302 grp.addString(strTag.c_str());
308 strError =
"cannot ask ";
310 strError +=
" to kill ";
314 strError += std::string(
" due to " + __trace_message);
329 strError =
"cannot kill ";
343 if (bOnlyConnector) {
352 grp.addString(strHost.c_str());
356 grp.addString(
"isrunning");
357 grp.addString(strTag.c_str());
360 int ret = SendMsg(msg, strHost, response, 3.0);
363 strError =
"cannot ask ";
365 strError +=
" to check for status of ";
369 strError += std::string(
" due to " + __trace_message);
373 return ((response.
get(0).
asString() ==
"running")?1:0);
390 std::string cmd = strCmd + std::string(
" ") + strParam;
391 command.
put(
"cmd", cmd);
392 command.
put(
"on", strHost);
393 command.
put(
"as", strTag);
394 if (!strWorkdir.empty()) {
395 command.
put(
"workdir", strWorkdir);
397 if (!strStdio.empty()) {
398 command.
put(
"stdio", strStdio);
400 if (!strEnv.empty()) {
401 command.
put(
"env", strEnv);
411bool YarpBroker::connect(
const std::string& from,
const std::string& to,
const std::string& carrier,
bool persist)
415 strError =
"no source port is introduced.";
421 strError =
"no destination port is introduced.";
436 std::string strCarrier = carrier;
448 strError =
"cannot connect ";
450 strError +=
" to " + std::string(to);
456 std::string topic = std::string(
"topic:/") + std::string(from) + std::string(to);
461 strError =
"a persistent connection from ";
463 strError +=
" to " + std::string(to);
464 strError +=
" is created but not connected.";
478 strError =
"no source port is introduced.";
484 strError =
"no destination port is introduced.";
514 strError =
"cannot disconnect ";
516 strError +=
" from " + std::string(to);
533 if (
szport.empty() || request.empty()) {
544 if (!port.
open(
"...")) {
552 for(
int i=0;
i<10;
i++) {
595 if (!semParam.
check()) {
602 if(!port.
open(
"...")) {
603 __trace_message.clear();
610 grp.addString(
"sysinfo");
619 __trace_message =
"(getSystemInfo) connecting to " + std::string(port.
getName());
624 strError = std::string(
"Cannot connect to ") + std::string(
server);
625 __trace_message.clear();
630 __trace_message =
"(getSystemInfo) writing to " + std::string(port.
getName());
632 __trace_message =
"(getSystemInfo) disconnecting from " + std::string(port.
getName());
638 strError = std::string(
server) + std::string(
" does not respond");
639 __trace_message.clear();
645 __trace_message.clear();
661 strError =
"Failed to reach name server\n";
670 const char*
delm =
"registration name ";
672 while((
pos1 = str.find(
delm)) != std::string::npos)
675 if ((
pos2 = str.find(
' ')) != std::string::npos) {
698 int ret = SendMsg(msg,
server, response, 3.0);
701 for(
size_t i=0;
i<response.
size();
i++)
715 processes.push_back(proc);
720 strError =
"cannot ask ";
722 strError +=
" to give the list of running processes.";
725 strError += std::string(
" due to " + __trace_message);
733 std::string topic = from + to;
745bool YarpBroker::setQos(
const std::string& from,
const std::string& to,
const std::string& qosFrom,
const std::string& qosTo)
749 if (qosFrom.empty() && qosTo.empty()) {
755 if (qosFrom.empty() ==
false) {
756 if(!getQosFromString(qosFrom,
styleFrom)) {
757 strError =
"Error in parsing Qos properties of " + std::string(from);
761 if (qosTo.empty() ==
false) {
762 if(!getQosFromString(qosTo,
styleTo)) {
763 strError =
"Error in parsing Qos properties of " + std::string(to);
781 size_t p = prop.find(
':');
782 if (
p != prop.npos) {
783 std::string key = prop.substr(0,
p);
784 std::string value = prop.substr(
p+1);
785 if (key.length() > 0 && value.length() > 0) {
786 if (key ==
"LEVEL" || key==
"DSCP" || key ==
"TOS") {
791 else if (key ==
"PRIORITY") {
796 else if (key ==
"POLICY") {
798 int policy =
strtol(value.c_str(), &
p, 10);
813bool YarpBroker::timeout(
double base,
double timeout)
824 if (!strStdioUUID.size()) {
829 stdioPort.
open(
"...");
835 while(!timeout(base, 5.0))
842 strError =
"Cannot connect to stdio port ";
854 for (
size_t i = 0;
i < input->
size();
i++) {
870 if (!
exists(target.c_str())) {
874 if (!semParam.
check()) {
881 if(!port.
open(
"..."))
883 __trace_message.clear();
893 __trace_message =
"(SendMsg) connecting to " + std::string(target);
894 for(
int i=0;
i<10;
i++)
906 __trace_message.clear();
911 __trace_message =
"(SendMsg) writing to " + std::string(target);
913 __trace_message =
"(SendMsg) disconnecting from " + std::string(target);
915 __trace_message.clear();
929int YarpBroker::requestServer(
Property& config)
937 if (config.
check(
"cmd") && config.
check(
"stdio"))
949 if (config.
check(
"workdir")) {
952 if (config.
check(
"geometry")) {
955 if (config.
check(
"hold")) {
958 if (config.
check(
"env")) {
969 if (response.
size() > 2) {
980 if (config.
check(
"cmd"))
989 if (config.
check(
"workdir")) {
992 if (config.
check(
"env")) {
virtual void onBrokerStdout(const char *msg)
BrokerEventSink * eventSink
unsigned int generateID()
bool rmconnect(const std::string &from, const std::string &to)
void threadRelease() override
Release method.
bool getAllPorts(std::vector< std::string > &stingList)
bool connected(const std::string &from, const std::string &to, const std::string &carrier) override
std::string requestRpc(const std::string &szport, const std::string &request, double timeout) override
void detachStdout() override
bool setQos(const std::string &from, const std::string &to, const std::string &qosFrom, const std::string &qosTo)
bool connect(const std::string &from, const std::string &to, const std::string &carrier, bool persist=false) override
connection broker
bool getAllProcesses(const std::string &server, ProcessContainer &processes)
bool threadInit() override
Initialization method.
std::string error() override
bool disconnect(const std::string &from, const std::string &to, const std::string &carrier) override
bool exists(const std::string &port) override
void run() override
Loop function.
bool attachStdout() override
bool getSystemInfo(const std::string &server, yarp::os::SystemInfoSerializer &info)
A simple collection of objects that can be described and transmitted in a portable way.
void fromString(const std::string &text)
Initializes bottle from a string.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
A mini-server for performing network communication in the background.
std::string getName() const override
Get name of port.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
T * read(bool shouldWait=true) override
Read an available object from the port.
static Contact getNameServerContact()
Get the contact information for the port associated with the nameserver (usually "/root",...
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
static bool exists(const std::string &port, bool quiet=true, bool checkVer=true)
Check for a port to be ready and responsive.
static bool checkNetwork()
Check if the YARP Network is up and running.
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool setConnectionQos(const std::string &src, const std::string &dest, const QosStyle &srcStyle, const QosStyle &destStyle, bool quiet=true)
Adjust the Qos preferences of a connection.
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
static bool isConnected(const std::string &src, const std::string &dest, bool quiet)
Check if a connection with tcp carrier exists between two ports.
An abstraction for a periodic thread.
bool isRunning() const
Returns true when the thread is started, false otherwise.
bool start()
Call this to start the thread.
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
A mini-server for network communication.
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
bool setTimeout(float timeout)
Set a timeout on network operations.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
void clear()
Remove all associations.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Preferences for the port's Quality of Service.
void setThreadPriority(int priority)
sets the communication thread priority level
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
void wait()
Decrement the counter, even if we must wait to do that.
bool check()
Decrement the counter, unless that would require waiting.
void post()
Increment the counter.
static double nowSystem()
static void delaySystem(double seconds)
A helper class to pass the SystemInfo object around the YARP network.
virtual bool isString() const
Checks if value is a string.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
virtual std::string asString() const
Get string value.
#define CONNECTION_TIMEOUT
std::stringstream OSTRINGSTREAM
std::vector< Process > ProcessContainer
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
#define YARPRUN_CONNECTION_TIMOUT
#define YARPRUN_NORESPONSE
#define YARPRUN_NOCONNECTION
#define YARPRUN_SEMAPHORE_PARAM
const char * yarprun_err_msg[]
#define EVENT_THREAD_PERIOD