YARP  2.3.68+272-20170522.1+git50f0ae7
Yet Another Robot Platform
Creating carriers for new kinds of connections
Author
Paul Fitzpatrick

In YARP, ports talk to each other via connections. Connection data is carried by TCP by default, but this can be switched very easily to other "carriers". See Configuring YARP Connections for information about how to configure connections using existing carriers. Here, we are going to look at creating entirely new kinds of carriers. Why might you want to do this?

  • You need to use an unusual kind of network that no YARP user has yet worked with.
  • You don't like some aspect of YARP's current carriers. No problem, just make your own variation and plug it in.
  • You want to support network-level interoperation with non-YARP based programs.

The steps

Carriers are created by implementing the yarp::os::Carrier interface. If you are creating a carrier that uses TCP, that is all you need to do.

If you are creating a carrier that uses a very different kind of network than anything already in YARP, you may need to go down a level further. Before you can make a carrier, you need to have a "stream" class (implementing yarp::os::TwoWayStream) that supports shipping raw data back and forth on that network. You don't need to worry about YARP protocol issues when making this class, it is just about being able to read and write raw data.

If you want to replace TCP entirely, for the initial handshaking done when setting up a connection, you'll need to do a bit more work. You'll need to squeeze any addressing information for your network into the yarp::os::Contact class (or else you can extend that class). And you'll need to implement a subclass of yarp::os::impl::Face that operates like yarp::os::impl::TcpFace. So that's a bit more work. I wouldn't recommend that being the first new carrier you make, try something simpler first to get used to how things work.

Coding preliminaries

The implementation of carriers is in the namespace yarp::os::impl, rather than the regular namespace yarp::os. Be warned that classes in the yarp::os::impl ("implementation") namespace may change more from release to release than those in yarp::os (intended to be a stable user API).

Header files for implementation classes may have dependencies on header files from the ACE library. This means that you need to take some care to make sure that the ACE header files on your system match those used to compile YARP.

Important classes

You should skim the documentation of the following classes:

You'll be able to include them in your code as follows:

using namespace yarp::os;
using namespace yarp::os::impl;

The basics

A few things you need to know. An important property of YARP carriers is that they do not need to be constant throughout a connection. So, for example, a connection may begin using a TCP carrier (and in practice all connections currently do in fact start with TCP, though this is not required), and then switch to something more efficient after some handshaking. So watch out for that. In the YARP Guts tutorial, you can see a little more how a carrier and connections relate.

When a port is registered with the YARP name server, an initial carrier type is specified. For example, the port associated with the name server itself might be registered as:

  registration name /root ip 192.168.1.3 port 10000 type tcp

The "tcp" type specified there does not necessarily mean that this port can only accept "tcp" carriers, it just means that that is how one should begin a connection to this port.

The first eight bytes sent on a connection are special in YARP. They act as a "magic number" for identifying the carrier to switch to. Some flags for identifying variants of a carrier might be included as well. This is a lot like how common file formats work and other network protocols work. See Port and connection protocols for a table of "official" YARP magic numbers.

Once you've made up a new eight-byte sequence to name your new carrier, you then have a huge amount of freedom in how it works. The abstract phases of YARP communication are specified, but their representation "on the wire" is basically up to you. This means you have a decent shot at matching some other protocol you might have to interface with. Even the required eight-byte sequence requirement can be loosened, with a bit of care.

A first example

Here is a example that takes an existing carrier, the ordinary text-mode carrier, and modifies it slightly (see example/carrier/carrier_stub.cpp). The change is very small; we name our new carrier "test" (as opposed to "text") and change its identifying 8-byte header from "CONNECT " to "TESTTEST".

/*
* Copyright: (C) 2010 RobotCub Consortium
* Author: Paul Fitzpatrick
* CopyPolicy: Released under the terms of the LGPLv2.1 or later, see LGPL.TXT
*/
#include <stdio.h>
#include <yarp/os/all.h>
class TestCarrier : public yarp::os::impl::TextCarrier {
public:
return "test";
}
return "TESTTEST";
}
virtual Carrier *create() {
return new TestCarrier();
}
};
int main(int argc, char *argv[]) {
out.open("/test/out");
in.open("/test/in");
yarp::os::Network::connect("/test/out","/test/in","test");
out.prepare().fromString("1 2 3");
out.write();
yarp::os::Bottle * bot = in.read();
if (bot!=NULL) {
printf("Got message %s\n", bot->toString().c_str());
}
out.close();
in.close();
return 0;
}

The key steps are:

  • We make whatever customizations we want to that carrier. It is very important to override the "factory" method yarp::os::Carrier::create to return an instance of our new class. We also define the name associated with our class by overriding yarp::os::Carrier::getName.

Having taken those steps, we can make connections using our new carrier (called "test" in the example code):

Network::connect(...,...,"test");

To make connections of this type from the command line with the yarp companion (the "yarp" command), you need to insert your carrier in the YARP library, or else compile a customized version of the companion. Here is the companion's source code in its entirety:

using namespace yarp::os;
int main(int argc, char *argv[]) {
Network yarp;
return Network::main(argc,argv);
}

So there isn't much to it to just insert a call to Carriers::addCarrierPrototype in there. Otherwise, to put the new carrier in the YARP library proper, do a little pattern-matching in the constructor for the yarp::os::Carriers class at src/libYARP_OS/src/Carriers.cpp.

Customizing behavior

The previous example just shows enough to make a carrier that is a small tweak of an existing one, but doesn't help much with setting up a completely new carrier.

For that, we need to make a subclass of yarp::os::Carrier. There's a lot of methods that we need to specify. First, some easy ones:

yarp::os::Carrier::create

A factory method - just return a new instance of your carrier.

yarp::os::Carrier::getName

Give the human-readable name of your carrier - this is how you would select it in "yarp connect /from /to CARRIERNAME" for example.

yarp::os::Carrier::isConnectionless

Return a flag specifying whether your carrier is "connection-based" or "connectionless" in the tcp/udp sense. If connectionless, YARP will assume, for example, that failures of the connection on one side may not be noticed on the other side, and take appropriate action.

yarp::os::Carrier::canAccept

Return true if your implementation can handle being the destination of a connection. Normally this should be true.

yarp::os::Carrier::canOffer

Return true if your implementation can handle being the initiator of a connection. Normally this should be true.

yarp::os::Carrier::isTextMode

Return true if your carrier is text-mode, for human reading/writing. YARP will use textual variants of its administrative messages if this is the case, and objects that serialize using the YARP standard "bottle" format (see Standard data representation format) will serialize in text form.

yarp::os::Carrier::canEscape

Return true if your carrier has a way of distinguishing user payload data from administrative headers. If false, YARP will not try to send any administrative headers on your carrier (e.g. disconnection requests, for example). Normally this should be true.

yarp::os::Carrier::handleEnvelope

Carriers that do not distinguish data from administrative headers (i.e. canEscape returns false), can overload this method to handle the envelope inside the stream. On the receiving side, the InputStream will have to overload the setReadEnvelopeCallback method, and execute the callback as soon as the envelope is ready.

yarp::os::Carrier::requireAck

Does you want YARP to use flow control? If this is set, YARP will attempt to send an acknowledgement back to the sender after receiving data, even in streaming operation.

yarp::os::Carrier::supportReply

Does your carrier support bidirectional communication?

yarp::os::Carrier::isLocal

Is your carrier process-local? That is, it can only used by threads sharing the same memory space? This should normally be false (but, if true, YARP will make a lot of optimizations obviously).

yarp::os::Carrier::toString

Give a textual representation of the state of the carrier. There is no standard for this, it is not much used.

Okay, that's a lot of stuff, but it is all "meta-data" - it describes your carrier, but doesn't implement it. It is important to get right though since it controls how YARP will use your carrier.

Now let us get down to implementation. It might be worth having Port and connection protocols open on a page beside this one. Basically, a connection in YARP has several phases:

  • Initiation phase - an initial connection is been made.

  • Header phase - the initiator of the connection sends an 8 byte "magic number" identifying the desired carrier type. The recipient of the connection waits for that 8 byte number. Once that is done, the carrier implementer is free to add on any further information they'd like to send. Carriers are strongly encouraged to send the name of the port associated with the initiator of the connection, but are free to do this however they like.

  • Header reply phase - an opportunity for the recipient to talk back to the initiator, if needed. Carriers are free to do what they want here, including extended bidirectional communication.

  • Index phase - when the user (or YARP wants) to send data over the connection, the carrier is given the opportunity to send some prior information (for example, lengths).

  • Payload data phase - here the carrier encodes user (or YARP) data.

  • Acknowledgement phase - if the carrier wants acknowledgements to happen, this is when YARP will request them. If a reply from the other side is expected, this will happen before the acknowledgement.

  • ...and we loop back to the index phase.

The header and header reply phase

The first set of methods needed are those to create or check the 8-byte "magic number" identifying the carrier type. We label the methods as INITIATOR for methods that matter on the side that initiates a connection, and RECIPIENT for the other side.

yarp::os::Carrier::getHeader (INITIATOR)

Place an 8-byte "magic number" describing the carrier in the block of memory provided.

yarp::os::Carrier::checkHeader (RECIPIENT)

Check whether the "magic number" passed describes your carrier.

yarp::os::Carrier::setParameters (RECIPIENT)

Configure the carrier based on parameters that might be embedded within the "magic number" passed. You can assume that the Carrier::checkHeader test has already succeeded. For carriers with no variants, this method will do nothing.

Now we need to set up a series of callbacks which are invoked during the header and header reply phases.

yarp::os::Carrier::prepareSend (INITIATOR)

Called just before we start writing on a connection. It is fine to do nothing here, this method is not really needed and is there for historical reasons.

yarp::os::Carrier::sendHeader (INITIATOR)

Finally, we send some data! This method should first send the 8 bytes determined by yarp::os::Carrier::getHeader. The "proto" object you are passed has an OutputStream object accessible as yarp::os::ConnectionState::os() upon which you can write (you are passed a protocol object as an argument in this and most other callbacks). Be sure to flush after writing these 8 bytes. After that, you're free to send anything else you feel you need to. You should definitely pass the name of the originating port, if there is one, so that the recipient knows who it is talking to. You can get that information from proto.getRoute().getFromName() (yarp::os::ConnectionState::getRoute(), yarp::os::Route::getFromName()).

yarp::os::Carrier::expectSenderSpecifier (RECIPIENT)

On the receiving side, the first 8 bytes are received by YARP and used to set up your carrier (via yarp::os::Carrier::checkHeader and yarp::os::Carrier::setParameters). After that is done, the yarp::os::Carrier::expectSenderSpecifier is called. Your job is to pick up the name of the originating port, however you chose to send it in yarp::os::Carrier::sendHeader, and then inform YARP about it by calling proto.setRoute(proto.getRoute().addFromName(PORT_NAME)). You have access to an InputStream as proto.is() [SIDE NOTE: yes, the API between Carrier and Protocol is a bit cumbersome and over-exposes the implementation. Sorry!]

yarp::os::Carrier::expectExtraHeader (RECIPIENT)

Called after yarp::os::Carrier::expectSenderSpecifier. You can process more header information here if there is any.

yarp::os::Carrier::respondToHeader (RECIPIENT)

An opportunity to send data back to the initiator, if needed. Use the proto.os() stream. Fine to do nothing.

This is also the moment to optionally drop the current input/output streams associated with the connection (probably TCP) and create an entirely different set (e.g. MCAST, UDP, SHMEM, or whatever kind of communication you are going to use). The easiest way to do that is to create suitable yarp::os::InputStream and yarp::os::OutputStream implementations, and then substitute them in by calling proto.takeStreams (see yarp::os::ConnectionState::takeStreams). Look at UdpCarrier.h or McastCarrier.h for examples.

yarp::os::Carrier::expectReplyToHeader (INITIATOR)

If the recipient does yarp::os::Carrier::respondToHeader, the initiator needs to read that data here. Further rounds of handshaking are possible, but not formalized with callbacks - just do matching read/write sequences in yarp::os::Carrier::respondToHeader and yarp::os::Carrier::expectReplyToHeader.

This is also the moment for the initiator to optionally switch streams, in the same way as the recipient does in respondToHeader.

yarp::os::Carrier::isActive

Return true if your carrier should be given data to transmit, or false if it should be ignored. Almost always, returning true is what you want. However if there is a low-level implementation of broadcasting that you can exploit (as for multicast carrier), a set of "logical" connections may be identical underneath - so this method can be useful to avoid duplicate messages.

If you didn't understand that, don't worry - just return true.

Okay! We are all set up!

The index, payload, and ack stage

The methods we've defined so far are sufficient to get through setting up a connection to use our new carrier, and switching our input/output stream implementation if needed. Now, we just need to push any data we are asked to transmit.

One note: we've talked about an INITIATOR and a RECIPIENT carrier, based on who started the connection. After our carriers exchange payload data, you should not assume that the carriers retain the same identity. YARP reserves the right to switch around which carrier is the initiator (there is a port-level command to do this). However, you can assume that there is always exactly one initiator.

When it comes time to send/receive payload data, YARP will call one of the following commands:

yarp::os::Carrier::write (INITIATOR)

Here, you are given the usual "proto" protocol object, and an extra yarp::os::SizedWriter object that knows about the size of what is to be sent, and also its content.

Note: historically, the write operation was broken down into an index phase first, with yarp::os::Carrier::sendIndex. You still have to define that method, but it can do nothing. The write method is guaranteed to be called first.

The last act of the write method should call in turn the write method on the SizedWriter object, passing the output stream associated with the protocol object.

yarp::os::Carrier::expectIndex (RECIPIENT)

On the receiving side, this method should wait for a message to arrive. If you wrote the message with some kind of header, you could go ahead and consume that now.

After expectIndex, YARP will do one or a series of reads from the input stream associated with the protocol object to acquire the actual payload. You are not responsible for that, unlike on the writing side - there is a small asymmetry here.

Ok, at this point user data has been transmitted! Note that if you want to transform how user data itself is expressed, you need to do that with a custom InputStream/OutputStream class.

If your carrier will send/expect acknowledgements of data transmission, you should define the following two methods appropriately - otherwise they should do nothing:

yarp::os::Carrier::sendAck (RECIPIENT)

Prepare and send an acknowledgement message to the output stream associated with the protocol object.

yarp::os::Carrier::expectAck (INITIATOR)

Wait for and read an acknowledgement message from the input stream associated with the protocol object.

At this point, YARP will call write/expectIndex methods again, and we loop.

An important detail: packets

YARP divides transmitted data into logical packets. If your streams encounter a non-disastrous failure while sending data, they are expected to have a mechanism for skipping to the next logical packet. Your streams are informed about packet boundaries through the startPacket/endPacket methods.

If your streams are connection-based, you can ignore this, but if they are connectionless then it is important. Take a look at the UDP/MCAST streams (yarp::os::impl::DgramTwoWayStream) implementation, and the DgramTwoWayStreamTest.cpp regression tests.

A complete example

Here's code to create a completely new carrier called "human". It relies on a human reading information on one terminal and typing it on another (see example/carrier/carrier_human.cpp).

dot_inline_dotgraph_1.png
/*
* Copyright: (C) 2010 RobotCub Consortium
* Author: Paul Fitzpatrick
* CopyPolicy: Released under the terms of the LGPLv2.1 or later, see LGPL.TXT
*/
#include <stdio.h>
#include <yarp/os/all.h>
#include <yarp/os/Bytes.h>
#include <iostream>
#include <string>
private:
std::string inputCache;
std::string outputCache;
public:
interrupting = false;
needInterrupt = false;
inputCache = outputCache = "";
}
virtual void close() {
yInfo() << "Bye bye";
}
virtual bool isOk() {
return true;
}
virtual void interrupt() {
interrupting = true;
while (needInterrupt) {
yInfo() << "*** INTERRUPT: Please hit enter ***";
for (int i=0; i<10 && needInterrupt; i++) {
}
}
}
// InputStream
virtual ssize_t read(const yarp::os::Bytes& b) {
if (interrupting) { return -1; }
while (inputCache.size() < b.length()) {
yInfo() <<"*** CHECK OTHER TERMINAL FOR SOMETHING TO TYPE:";
char buf[1000];
needInterrupt = true; // should be mutexed, in real implementation
std::cin.getline(buf,1000);
needInterrupt = false;
if (interrupting) { return -1; }
inputCache += buf;
inputCache += "\r\n";
yInfo() << "Thank you";
}
memcpy(b.get(),inputCache.c_str(),b.length());
inputCache = inputCache.substr(b.length());
return b.length();
}
// OutputStream
virtual void write(const yarp::os::Bytes& b) {
outputCache.append(b.get(),b.length());
while (outputCache.find("\n")!=std::string::npos) {
size_t idx = outputCache.find("\n");
std::string show;
show.append(outputCache.c_str(),idx);
yInfo() << "*** TYPE THIS ON THE OTHER TERMINAL: " << show;
outputCache = outputCache.substr(idx+1);
}
}
// TwoWayStream
return *this;
}
virtual OutputStream& getOutputStream() {
return *this;
}
// left undefined
return local;
}
// left undefined
return remote;
}
virtual void reset() {
inputCache = outputCache = "";
yInfo() << "Stream reset";
}
virtual void beginPacket() {
yInfo() << "Packet begins";
inputCache = "";
outputCache = "";
}
virtual void endPacket() {
yInfo() << "Packet ends";
}
private:
};
public:
// First, the easy bits...
virtual yarp::os::Carrier *create() {
return new HumanCarrier();
}
virtual yarp::os::ConstString getName() {
return "human";
}
virtual bool isConnectionless() {
return true;
}
virtual bool canAccept() {
return true;
}
virtual bool canOffer() {
return true;
}
virtual bool isTextMode() {
// let's be text mode, for human-friendliness
return true;
}
virtual bool canEscape() {
return true;
}
virtual bool requireAck() {
return true;
}
virtual bool supportReply() {
return true;
}
virtual bool isLocal() {
return false;
}
return "humans are handy";
}
virtual void getHeader(const yarp::os::Bytes& header) {
const char *target = "HUMANITY";
for (int i=0; i<8 && i<header.length(); i++) {
header.get()[i] = target[i];
}
}
virtual bool checkHeader(const yarp::os::Bytes& header) {
if (header.length()!=8) {
return false;
}
const char *target = "HUMANITY";
for (int i=0; i<8; i++) {
if (header.get()[i] != target[i]) {
return false;
}
}
return true;
}
virtual void setParameters(const yarp::os::Bytes& header) {
// no parameters - no carrier variants
}
// Now, the initial hand-shaking
virtual bool prepareSend(yarp::os::ConnectionState& proto) {
// nothing special to do
return true;
}
virtual bool sendHeader(yarp::os::ConnectionState& proto) {
// Send the "magic number" for this carrier
getHeader(header.bytes());
proto.os().write(header.bytes());
if (!proto.os().isOk()) return false;
// Now we can do whatever we want, as long as somehow
// we also send the name of the originating port
// let's just send the port name in plain text terminated with a
// carriage-return / line-feed
yarp::os::Bytes b2((char*)from.c_str(),from.length());
proto.os().write(b2);
proto.os().write('\r');
proto.os().write('\n');
proto.os().flush();
return proto.os().isOk();
}
virtual bool expectSenderSpecifier(yarp::os::ConnectionState& proto) {
// interpret everything that sendHeader wrote
proto.setRoute(proto.getRoute().addFromName(proto.is().readLine()));
return proto.is().isOk();
}
virtual bool expectExtraHeader(yarp::os::ConnectionState& proto) {
// interpret any extra header information sent - optional
return true;
}
virtual bool respondToHeader(yarp::os::ConnectionState& proto) {
// SWITCH TO NEW STREAM TYPE
HumanStream *stream = new HumanStream();
if (stream==NULL) { return false; }
proto.takeStreams(stream);
return true;
}
virtual bool expectReplyToHeader(yarp::os::ConnectionState& proto) {
// SWITCH TO NEW STREAM TYPE
HumanStream *stream = new HumanStream();
if (stream==NULL) { return false; }
proto.takeStreams(stream);
return true;
}
virtual bool isActive() {
return true;
}
// Payload time!
virtual bool write(yarp::os::ConnectionState& proto, yarp::os::SizedWriter& writer) {
bool ok = sendIndex(proto);
if (!ok) return false;
writer.write(proto.os());
return proto.os().isOk();
}
virtual bool sendIndex(yarp::os::ConnectionState& proto) {
yarp::os::ConstString prefix = "human says ";
yarp::os::Bytes b2((char*)prefix.c_str(),prefix.length());
proto.os().write(b2);
return true;
}
virtual bool expectIndex(yarp::os::ConnectionState& proto) {
yarp::os::ConstString prefix = "human says ";
yarp::os::ConstString compare = prefix;
yarp::os::Bytes b2((char*)prefix.c_str(),prefix.length());
proto.is().read(b2);
bool ok = proto.is().isOk() && (prefix==compare);
if (!ok) yInfo() << "YOU DID NOT SAY 'human says '";
return ok;
}
// Acknowledgements, we don't do them
virtual bool sendAck(yarp::os::ConnectionState& proto) {
yarp::os::ConstString prefix = "computers rule!\r\n";
yarp::os::Bytes b2((char*)prefix.c_str(),prefix.length());
proto.os().write(b2);
return true;
}
virtual bool expectAck(yarp::os::ConnectionState& proto) {
yarp::os::ConstString prefix = "computers rule!\r\n";
yarp::os::ConstString compare = prefix;
yarp::os::Bytes b2((char*)prefix.c_str(),prefix.length());
proto.is().read(b2);
bool ok = proto.is().isOk() && (prefix==compare);
if (!ok) yInfo() << "YOU DID NOT SAY 'computers rule!'";
return ok;
}
};
int main(int argc, char *argv[]) {
if (argc<2) {
yInfo() << "Please run in two terminals as:\n";
yInfo() << " carrier_human --server\n";
yInfo() << " carrier_human --client\n";
}
std::string mode = argv[1];
if (mode == "--server") {
out.open("/test/out");
bool connected = false;
while (!connected) {
connected = yarp::os::Network::connect("/test/out","/test/in","human");
if (!connected) yarp::os::Time::delay(1);
}
bot.fromString("1 2 3");
out.write(bot);
out.close();
}
if (mode == "--client") {
in.open("/test/in");
in.read(bot);
yInfo() << "Got message " << bot.toString().c_str();
in.close();
}
return 0;
}

Here's how the carrier works.

In one terminal we run the test program as a "client" that will sit and wait for a message:

./carrier_human --client

Here's what we see:

yarp: Port /test/in active at tcp://192.168.1.3:10002

Now in another terminal we run the test program as a "server" that will connect to the client and then try to send it the message "1 2 3":

./carrier_human --server

Here's what we see:

yarp: Port /test/out active at tcp://192.168.1.3:10012
yarp: Sending output from /test/out to /test/in using human
Packet begins
*** TYPE THIS ON THE OTHER TERMINAL: human says do
*** TYPE THIS ON THE OTHER TERMINAL: 1 2 3
Packet ends
*** CHECK OTHER TERMINAL FOR SOMETHING TO TYPE:

And on the client side here's what we see:

yarp: Receiving input from /test/out to /test/in using human
Packet begins
*** CHECK OTHER TERMINAL FOR SOMETHING TO TYPE:

What has happened is that the server has made a connection to the client, and then switched over to using our "human" protocol for transmitting data. This protocol relies on reading and writing instructions to the console for a human to follow. If we follow those instructions exactly, and in order, the program will successfully complete just as for any other carrier.