YARP
Yet Another Robot Platform
DgramTwoWayStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2019 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/conf/system.h>
13 
14 #include <yarp/os/NetType.h>
15 #include <yarp/os/Time.h>
16 #include <yarp/os/impl/Logger.h>
17 
18 #if defined(YARP_HAS_ACE)
19 # include <ace/ACE.h>
20 # include <ace/Handle_Set.h>
21 # include <ace/INET_Addr.h>
22 # include <ace/Log_Msg.h>
23 # include <ace/OS_Memory.h>
24 # include <ace/OS_NS_sys_select.h>
25 # include <ace/SOCK_Dgram.h>
26 # include <ace/SOCK_Dgram_Mcast.h>
27 # include <ace/os_include/net/os_if.h>
28 // In one the ACE headers there is a definition of "main" for WIN32
29 # ifdef main
30 # undef main
31 # endif
32 #else
33 # include <arpa/inet.h>
34 # include <netinet/in.h>
35 # include <sys/socket.h>
36 # include <sys/types.h>
37 # include <unistd.h>
38 #endif
39 
40 #include <cerrno>
41 #include <cstring>
42 
43 using namespace yarp::os::impl;
44 using namespace yarp::os;
45 
46 #define CRC_SIZE 8
47 #define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
48 
49 
50 static bool checkCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int* store_altPct = nullptr)
51 {
52  auto alt = (NetInt32)NetType::getCrc(buf + crcLength,
53  (length > crcLength) ? (length - crcLength) : 0);
54  Bytes b(buf, 4);
55  Bytes b2(buf + 4, 4);
56  NetInt32 curr = NetType::netInt(b);
57  int altPct = NetType::netInt(b2);
58  bool ok = (alt == curr && pct == altPct);
59  if (!ok) {
60  if (alt != curr) {
61  YARP_DEBUG(Logger::get(), "crc mismatch");
62  }
63  if (pct != altPct) {
64  YARP_DEBUG(Logger::get(), "packet code broken");
65  }
66  }
67  if (store_altPct != nullptr) {
68  *store_altPct = altPct;
69  }
70 
71  return ok;
72 }
73 
74 
75 static void addCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
76 {
77  auto alt = (NetInt32)NetType::getCrc(buf + crcLength,
78  (length > crcLength) ? (length - crcLength) : 0);
79  Bytes b(buf, 4);
80  Bytes b2(buf + 4, 4);
81  NetType::netInt((NetInt32)alt, b);
82  NetType::netInt((NetInt32)pct, b2);
83 }
84 
85 
86 bool DgramTwoWayStream::open(const Contact& remote)
87 {
88 #if defined(YARP_HAS_ACE)
89  ACE_INET_Addr anywhere((u_short)0, (ACE_UINT32)INADDR_ANY);
90  Contact local(anywhere.get_host_addr(),
91  anywhere.get_port_number());
92 #else
93  Contact local("localhost", -1);
94 #endif
95  return open(local, remote);
96 }
97 
98 bool DgramTwoWayStream::open(const Contact& local, const Contact& remote)
99 {
100  localAddress = local;
101  remoteAddress = remote;
102 
103 #if defined(YARP_HAS_ACE)
104  localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()), (ACE_UINT32)INADDR_ANY);
105  if (remote.isValid()) {
106  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
107  }
108  dgram = new ACE_SOCK_Dgram;
109  yAssert(dgram != nullptr);
110 
111  int result = dgram->open(localHandle,
112  ACE_PROTOCOL_FAMILY_INET,
113  0,
114  1);
115 #else
116  dgram = nullptr;
117  dgram_sockfd = -1;
118 
119  int s = -1;
120  if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
121  std::exit(1);
122  }
123  struct sockaddr_in dgram_sin;
124  memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
125  dgram_sin.sin_family = AF_INET;
126  dgram_sin.sin_addr.s_addr = htonl(INADDR_ANY);
127  dgram_sin.sin_port = htons(remote.getPort());
128  if (local.isValid()) {
129  if (inet_pton(AF_INET, remote.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
130  YARP_ERROR(Logger::get(), "could not set up udp client\n");
131  std::exit(1);
132  }
133  if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
134  YARP_ERROR(Logger::get(), "could not connect udp client\n");
135  std::exit(1);
136  }
137  } else {
138  if (bind(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
139  YARP_ERROR(Logger::get(), "could not create udp server\n");
140  std::exit(1);
141  }
142  }
143  dgram_sockfd = s;
144  dgram = this;
145  int result = -1;
146  int local_port = -1;
147 
148  struct sockaddr_in sin;
149  socklen_t len = sizeof(sin);
150  if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
151  result = 0;
152  local_port = ntohs(sin.sin_port);
153  }
154 #endif
155 
156  if (result != 0) {
157  YARP_ERROR(Logger::get(), "could not open datagram socket");
158  return false;
159  }
160 
161  configureSystemBuffers();
162 
163 #if defined(YARP_HAS_ACE)
164  dgram->get_local_addr(localHandle);
165  YARP_DEBUG(Logger::get(), std::string("starting DGRAM entity on port number ") + NetType::toString(localHandle.get_port_number()));
166  localAddress = Contact("127.0.0.1",
167  localHandle.get_port_number());
168 #else
169  localAddress = Contact("127.0.0.1", local_port);
170 #endif
171 
172  YARP_DEBUG(Logger::get(), std::string("Update: DGRAM from ") + localAddress.toURI() + " to " + remoteAddress.toURI());
173 
174  allocate();
175 
176  return true;
177 }
178 
179 void DgramTwoWayStream::allocate(int readSize, int writeSize)
180 {
181  //These are only as another default. We should modify the method to return bool
182  //and fail if we cannot read the socket size.
183 
184  int _read_size = -1;
185  int _write_size = -1;
186 
187  std::string _env_dgram = NetworkBase::getEnvironment("YARP_DGRAM_SIZE");
188  std::string _env_mode;
189  if (multiMode) {
190  _env_mode = NetworkBase::getEnvironment("YARP_MCAST_SIZE");
191  } else {
192  _env_mode = NetworkBase::getEnvironment("YARP_UDP_SIZE");
193  }
194  if (!_env_mode.empty()) {
195  _env_dgram = _env_mode;
196  }
197  if (!_env_dgram.empty()) {
198  int sz = NetType::toInt(_env_dgram);
199  if (sz != 0) {
200  _read_size = _write_size = sz;
201  }
202  YARP_INFO(Logger::get(), std::string("Datagram packet size set to ") + NetType::toString(_read_size));
203  }
204  if (readSize != 0) {
205  _read_size = readSize;
206  YARP_INFO(Logger::get(), std::string("Datagram read size reset to ") + NetType::toString(_read_size));
207  }
208  if (writeSize != 0) {
209  _write_size = writeSize;
210  YARP_INFO(Logger::get(), std::string("Datagram write size reset to ") + NetType::toString(_write_size));
211  }
212 
213  // force the size of the write buffer to be under the max size of a udp datagram.
214  if (_write_size > UDP_MAX_DATAGRAM_SIZE || _write_size < 0) {
215  _write_size = UDP_MAX_DATAGRAM_SIZE;
216  }
217 
218  if (_read_size < 0) {
219 #if defined(YARP_HAS_ACE)
220  //Defaults to socket size
221  if (dgram != nullptr) {
222  int len = sizeof(_read_size);
223  int result = dgram->get_option(SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
224  if (result < 0) {
225  YARP_ERROR(Logger::get(), std::string("Failed to read buffer size from RCVBUF socket with error: ") + std::string(strerror(errno)) + std::string(". Setting read buffer size to UDP_MAX_DATAGRAM_SIZE."));
226  _read_size = UDP_MAX_DATAGRAM_SIZE;
227  }
228  }
229 #else
230  socklen_t len = sizeof(_read_size);
231 
232  int result = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
233  if (result < 0) {
234  YARP_ERROR(Logger::get(), std::string("Failed to read buffer size from RCVBUF socket with error: ") + std::string(strerror(errno)) + std::string(". Setting read buffer size to UDP_MAX_DATAGRAM_SIZE."));
235  _read_size = UDP_MAX_DATAGRAM_SIZE;
236  }
237 #endif
238  }
239 
240  readBuffer.allocate(_read_size);
241  writeBuffer.allocate(_write_size);
242  readAt = 0;
243  readAvail = 0;
244  writeAvail = CRC_SIZE;
245  //happy = true;
246  pct = 0;
247 }
248 
249 
250 void DgramTwoWayStream::configureSystemBuffers()
251 {
252  //By default the buffers are forced to the datagram size limit.
253  //These can be overwritten by environment variables
254  //Generic variable
255  std::string socketBufferSize = NetworkBase::getEnvironment("YARP_DGRAM_BUFFER_SIZE");
256  //Specific read
257  std::string socketReadBufferSize = NetworkBase::getEnvironment("YARP_DGRAM_RECV_BUFFER_SIZE");
258  //Specific write
259  std::string socketSendBufferSize = NetworkBase::getEnvironment("YARP_DGRAM_SND_BUFFER_SIZE");
260 
261  int readBufferSize = -1;
262  if (!socketReadBufferSize.empty()) {
263  readBufferSize = NetType::toInt(socketReadBufferSize);
264  } else if (!socketBufferSize.empty()) {
265  readBufferSize = NetType::toInt(socketBufferSize);
266  }
267 
268  int writeBufferSize = -1;
269  if (!socketSendBufferSize.empty()) {
270  writeBufferSize = NetType::toInt(socketSendBufferSize);
271  } else if (!socketBufferSize.empty()) {
272  writeBufferSize = NetType::toInt(socketBufferSize);
273  }
274  // The writeBufferSize can't be set greater than udp datagram
275  // maximum size
276  if (writeBufferSize < 0 || writeBufferSize > UDP_MAX_DATAGRAM_SIZE) {
277  if (writeBufferSize > UDP_MAX_DATAGRAM_SIZE) {
278  YARP_WARN(Logger::get(), "The desired SND buffer size is too big. It is set to the max datagram size : " + NetType::toString(UDP_MAX_DATAGRAM_SIZE));
279  }
280  writeBufferSize = UDP_MAX_DATAGRAM_SIZE;
281  }
282 
283  if (readBufferSize > 0) {
284  int actualReadSize = -1;
285 
286 #if defined(YARP_HAS_ACE)
287  int intSize = sizeof(readBufferSize);
288  int setResult = dgram->set_option(SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
289 
290  int getResult = dgram->get_option(SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
291 #else
292  socklen_t intSize = sizeof(readBufferSize);
293  int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
294  int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
295 #endif
296  // in linux the value returned by getsockopt is "doubled"
297  // for some unknown reasons (see https://linux.die.net/man/7/socket)
298 #if defined(__linux__)
299  actualReadSize /= 2;
300 #endif
301  if (setResult < 0 || getResult < 0 || readBufferSize != actualReadSize) {
302  bufferAlertNeeded = true;
303  bufferAlerted = false;
304  YARP_WARN(Logger::get(), "Failed to set RECV socket buffer to desired size. Actual: " + NetType::toString(actualReadSize) + ", Desired: " + NetType::toString(readBufferSize));
305  }
306  }
307  if (writeBufferSize > 0) {
308  int actualWriteSize = -1;
309 #if defined(YARP_HAS_ACE)
310  int intSize = sizeof(writeBufferSize);
311  int setResult = dgram->set_option(SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
312  int getResult = dgram->get_option(SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
313 #else
314  socklen_t intSize = sizeof(writeBufferSize);
315  int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
316  int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
317 #endif
318  // in linux the value returned by getsockopt is "doubled"
319  // for some unknown reasons (see https://linux.die.net/man/7/socket)
320 #if defined(__linux__)
321  actualWriteSize /= 2;
322 #endif
323  if (setResult < 0 || getResult < 0 || writeBufferSize != actualWriteSize) {
324  bufferAlertNeeded = true;
325  bufferAlerted = false;
326  YARP_WARN(Logger::get(), "Failed to set SND socket buffer to desired size. Actual: " + NetType::toString(actualWriteSize) + ", Desired: " + NetType::toString(writeBufferSize));
327  }
328  }
329 }
330 
331 
332 #if defined(YARP_HAS_ACE)
333 int DgramTwoWayStream::restrictMcast(ACE_SOCK_Dgram_Mcast* dmcast,
334  const Contact& group,
335  const Contact& ipLocal,
336  bool add)
337 {
338  restrictInterfaceIp = ipLocal;
339 
341  std::string("multicast connection ") + group.getHost() + " on network interface for " + ipLocal.getHost());
342  int result = -1;
343  // There's some major damage in ACE mcast interfaces.
344  // Most require interface names, yet provide no way to query
345  // these names - and in the end, convert to IP addresses.
346  // Here we try to do an end run around ACE.
347 
348  // based on: ACE_SOCK_Dgram::set_nic
349 
350  ip_mreq multicast_address;
351  ACE_INET_Addr group_addr(group.getPort(),
352  group.getHost().c_str());
353  ACE_INET_Addr interface_addr(ipLocal.getPort(),
354  ipLocal.getHost().c_str());
355  multicast_address.imr_interface.s_addr = htonl(interface_addr.get_ip_address());
356  multicast_address.imr_multiaddr.s_addr = htonl(group_addr.get_ip_address());
357 
358  if (add) {
359  YARP_DEBUG(Logger::get(), "Trying to correct mcast membership...\n");
360  result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_ADD_MEMBERSHIP, &multicast_address, sizeof(struct ip_mreq));
361  } else {
362  YARP_DEBUG(Logger::get(), "Trying to correct mcast output...");
363  result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_MULTICAST_IF, &multicast_address.imr_interface.s_addr, sizeof(struct in_addr));
364  }
365  if (result != 0) {
366  int num = errno;
368  std::string("mcast result: ") + strerror(num));
369  if (num == 98) {
370  // our membership is already correct / Address already in use
371  result = 0;
372  }
373  result = 0; // in fact, best to proceed for Windows.
374  }
375 
376  return result;
377 }
378 #endif
379 
380 
382  const Contact& ipLocal)
383 {
384 
385  multiMode = true;
386 
387  localAddress = ipLocal;
388 
389 #if defined(YARP_HAS_ACE)
390  localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),
391  (ACE_UINT32)INADDR_ANY);
392 
393  ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
394 # if defined(__APPLE__)
395  mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
396 # endif
397 
398  auto* dmcast = new ACE_SOCK_Dgram_Mcast(mcastOptions);
399  dgram = dmcast;
400  mgram = dmcast;
401  yAssert(dgram != nullptr);
402 
403  int result = -1;
404  ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
405  result = dmcast->open(addr, nullptr, 1);
406  if (result == 0) {
407  result = restrictMcast(dmcast, group, ipLocal, false);
408  }
409 
410  if (result != 0) {
411  YARP_ERROR(Logger::get(), "could not open multicast datagram socket");
412  return false;
413  }
414 
415 #else
416  dgram = nullptr;
417  dgram_sockfd = -1;
418 
419  int s = -1;
420  struct sockaddr_in dgram_sin;
421  // create what looks like an ordinary UDP socket
422  if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
423  YARP_ERROR(Logger::get(), "could not create sender socket\n");
424  std::exit(1);
425  }
426  // set up destination address
427  memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
428  dgram_sin.sin_family = AF_INET;
429  dgram_sin.sin_port = htons(group.getPort());
430 
431 
432  if (inet_pton(AF_INET, group.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
433  YARP_ERROR(Logger::get(), "could not set up mcast client\n");
434  std::exit(1);
435  }
436  if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
437  YARP_ERROR(Logger::get(), "could not connect mcast client\n");
438  std::exit(1);
439  }
440 
441 
442  dgram_sockfd = s;
443  dgram = this;
444  int local_port = -1;
445 
446  struct sockaddr_in sin;
447  socklen_t len = sizeof(sin);
448  if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
449  local_port = ntohs(sin.sin_port);
450  }
451 
452 
453 #endif
454  configureSystemBuffers();
455  remoteAddress = group;
456 #ifdef YARP_HAS_ACE
457 
458  localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
459  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
460 #else
461 
462  remoteAddress = group;
463  localAddress = Contact("127.0.0.1", local_port);
464  localHandle = local_port;
465  remoteHandle = remoteAddress.getPort();
466 
467 
468 #endif
469  YARP_DEBUG(Logger::get(), std::string("Update: DGRAM from ") + localAddress.toURI() + " to " + remoteAddress.toURI());
470  allocate();
471 
472  return true;
473 }
474 
475 
476 bool DgramTwoWayStream::join(const Contact& group, bool sender, const Contact& ipLocal)
477 {
478 
479  YARP_DEBUG(Logger::get(), std::string("subscribing to mcast address ") + group.toURI() + " for " + (sender ? "writing" : "reading"));
480 
481  multiMode = true;
482 
483  if (sender) {
484  if (ipLocal.isValid()) {
485  return openMcast(group, ipLocal);
486  }
487  // just use udp as normal
488  return open(group);
489  }
490 
491 #if defined(YARP_HAS_ACE)
492  ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
493 # if defined(__APPLE__)
494  mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
495 # endif
496 
497  auto* dmcast = new ACE_SOCK_Dgram_Mcast(mcastOptions);
498 
499  dgram = dmcast;
500  mgram = dmcast;
501  yAssert(dgram != nullptr);
502 
503  ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
504 
505  int result = -1;
506  if (ipLocal.isValid()) {
507  result = dmcast->join(addr, 1);
508 
509  if (result == 0) {
510  result = restrictMcast(dmcast, group, ipLocal, true);
511  }
512  } else {
513  result = dmcast->join(addr, 1);
514  }
515 
516  if (result != 0) {
517  YARP_ERROR(Logger::get(), "cannot connect to multi-cast address");
518  happy = false;
519  return false;
520  }
521 #else
522  struct ip_mreq mreq;
523  int s = -1;
524  if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
525  YARP_ERROR(Logger::get(), "could not create receiver socket\n");
526  happy = false;
527  return false;
528  }
529  struct sockaddr_in addr;
530  u_int yes = 1;
531 
532  /* set up destination address */
533  memset(&addr, 0, sizeof(addr));
534  addr.sin_family = AF_INET;
535  addr.sin_addr.s_addr = htonl(INADDR_ANY);
536  addr.sin_port = htons(group.getPort());
537 
538  // allow multiple sockets to use the same PORT number
539  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(u_int)) < 0) {
540  YARP_ERROR(Logger::get(), "could not allow sockets use the same ADDRESS\n");
541  happy = false;
542  return false;
543  }
544 
545 # if defined(__APPLE__)
546  if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(u_int)) < 0) {
547  YARP_ERROR(Logger::get(), "could not allow sockets use the same PORT number\n");
548  happy = false;
549  return false;
550  }
551 # endif
552 
553  // bind to receive address
554  if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
555  YARP_ERROR(Logger::get(), "could not create mcast server\n");
556  happy = false;
557  return false;
558  }
559 
560  // use setsockopt() to request that the kernel join a multicast group
561  if (inet_pton(AF_INET, group.getHost().c_str(), &mreq.imr_multiaddr) == 0) {
562  YARP_ERROR(Logger::get(), "Could not set up the mcast server");
563  std::exit(1);
564  }
565  mreq.imr_interface.s_addr = htonl(INADDR_ANY);
566  if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
567  YARP_ERROR(Logger::get(), "could not join the multicast group\n");
568  perror("sendto");
569  happy = false;
570  return false;
571  }
572 
573  dgram_sockfd = s;
574  dgram = this;
575 #endif
576  configureSystemBuffers();
577 
578  localAddress = group;
579  remoteAddress = group;
580 #ifdef YARP_HAS_ACE
581  localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
582  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
583 #else
584  localHandle = localAddress.getPort();
585  remoteHandle = remoteAddress.getPort();
586 #endif
587  allocate();
588  return true;
589 }
590 
592 {
593  closeMain();
594 }
595 
597 {
598  bool act = false;
599  mutex.lock();
600  if ((!closed) && (!interrupting) && happy) {
601  act = true;
602  interrupting = true;
603  closed = true;
604  }
605  mutex.unlock();
606 
607  if (act) {
608  if (reader) {
609  int ct = 3;
610  while (happy && ct > 0) {
611  ct--;
612  DgramTwoWayStream tmp;
613  if (mgram != nullptr) {
615  std::string("* mcast interrupt, interface ") + restrictInterfaceIp.toString());
616  tmp.join(localAddress, true, restrictInterfaceIp);
617  } else {
618  YARP_DEBUG(Logger::get(), "* dgram interrupt");
619  tmp.open(Contact(localAddress.getHost(), 0),
620  localAddress);
621  }
623  std::string("* interrupt state ") + NetType::toString(interrupting) + " " + NetType::toString(closed) + " " + NetType::toString(happy) + " ");
624  ManagedBytes empty(10);
625  for (size_t i = 0; i < empty.length(); i++) {
626  empty.get()[i] = 0;
627  }
628 
629  // don't want this message getting into a valid packet
630  tmp.pct = -1;
631 
632  tmp.write(empty.bytes());
633  tmp.flush();
634  tmp.close();
635  if (happy) {
637  }
638  }
639  YARP_DEBUG(Logger::get(), "dgram interrupt done");
640  }
641  mutex.lock();
642  interrupting = false;
643  mutex.unlock();
644  } else {
645  // wait for interruption to be done
646  if (interrupting) {
647  while (interrupting) {
649  "waiting for dgram interrupt to be finished...");
651  }
652  }
653  }
654 }
655 
657 {
658  if (dgram != nullptr) {
659  //printf("Dgram closing, interrupt state %d\n", interrupting);
660  interrupt();
661  mutex.lock();
662  closed = true;
663  happy = false;
664  //printf("Dgram closinger, interrupt state %d\n", interrupting);
665  mutex.unlock();
666  while (interrupting) {
667  happy = false;
669  }
670  mutex.lock();
671  if (dgram != nullptr) {
672 #if defined(YARP_HAS_ACE)
673  dgram->close();
674  delete dgram;
675 #else
676  if (dgram_sockfd >= 0) {
677  ::close(dgram_sockfd);
678  }
679  dgram_sockfd = -1;
680 #endif
681  dgram = nullptr;
682  mgram = nullptr;
683  }
684  happy = false;
685  mutex.unlock();
686  }
687  happy = false;
688 }
689 
691 {
692  reader = true;
693  bool done = false;
694 
695  while (!done) {
696 
697  if (closed) {
698  happy = false;
699  return -1;
700  }
701 
702  // if nothing is available, try to grab stuff
703  if (readAvail == 0) {
704  readAt = 0;
705 
706 
707  //yAssert(dgram != nullptr);
708  //YARP_DEBUG(Logger::get(), "DGRAM Waiting for something!");
709  yarp::conf::ssize_t result = -1;
710 #if defined(YARP_HAS_ACE)
711  if ((dgram != nullptr) && restrictInterfaceIp.isValid()) {
712  /*
713  printf("Consider remote mcast\n");
714  printf("What we know:\n");
715  printf(" %s\n", restrictInterfaceIp.toString().c_str());
716  printf(" %s\n", localAddress.toString().c_str());
717  printf(" %s\n", remoteAddress.toString().c_str());
718  */
719  ACE_INET_Addr iface(restrictInterfaceIp.getPort(),
720  restrictInterfaceIp.getHost().c_str());
721  ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
722  result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
724  std::string("MCAST Got ") + NetType::toString((int)result) + " bytes");
725 
726  } else
727 #endif
728  if (dgram != nullptr) {
729  yAssert(dgram != nullptr);
730 #if defined(YARP_HAS_ACE)
731  ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
732  //YARP_DEBUG(Logger::get(), "DGRAM Waiting for something!");
733  result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
734 #else
735  result = recv(dgram_sockfd, readBuffer.get(), readBuffer.length(), 0);
736 #endif
738  std::string("DGRAM Got ") + NetType::toString((int)result) + " bytes");
739  } else {
740  onMonitorInput();
741  //printf("Monitored input of %d bytes\n", monitor.length());
742  if (monitor.length() > readBuffer.length()) {
743  printf("Too big!\n");
744  std::exit(1);
745  }
746  memcpy(readBuffer.get(), monitor.get(), monitor.length());
747  result = monitor.length();
748  }
749 
750 
751  /*
752  // this message isn't needed anymore
753  if (result>WRITE_SIZE*1.25) {
754  YARP_ERROR(Logger::get(),
755  std::string("Got big datagram: ")+NetType::toString(result)+
756  " bytes");
757  }
758  */
759  if (closed || (result < 0)) {
760  happy = false;
761  return -1;
762  }
763  readAvail = result;
764 
765  // deal with CRC
766  int altPct = 0;
767  bool crcOk = checkCrc(readBuffer.get(), readAvail, CRC_SIZE, pct, &altPct);
768  if (altPct != -1) {
769  pct++;
770  if (!crcOk) {
771  if (bufferAlertNeeded && !bufferAlerted) {
773  "*** Multicast/UDP packet dropped - checksum error ***");
775  "The UDP/MCAST system buffer limit on your system is low.");
777  "You may get packet loss under heavy conditions.");
778 #ifdef __linux__
780  "To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
782  "(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
783 #else
785  "To change the limit use: sysctl for Linux/FreeBSD, ndd for Solaris, no for AIX");
786 #endif
787  bufferAlerted = true;
788  } else {
789  errCount++;
790  double now = SystemClock::nowSystem();
791  if (now - lastReportTime > 1) {
793  std::string("*** ") + NetType::toString(errCount) + " datagram packet(s) dropped - checksum error ***");
794  lastReportTime = now;
795  errCount = 0;
796  }
797  }
798  reset();
799  return -1;
800  }
801  readAt += CRC_SIZE;
802  readAvail -= CRC_SIZE;
803  done = true;
804  } else {
805  readAvail = 0;
806  }
807  }
808 
809  // if stuff is available, take it
810  if (readAvail > 0) {
811  size_t take = readAvail;
812  if (take > b.length()) {
813  take = b.length();
814  }
815  memcpy(b.get(), readBuffer.get() + readAt, take);
816  readAt += take;
817  readAvail -= take;
818  return take;
819  }
820  }
821 
822  return 0;
823 }
824 
826 {
827  //YARP_DEBUG(Logger::get(), "DGRAM prep writing");
828  //printf("DGRAM write %d bytes\n", b.length());
829 
830  if (reader) {
831  return;
832  }
833  if (writeBuffer.get() == nullptr) {
834  return;
835  }
836 
837  Bytes local = b;
838  while (local.length() > 0) {
839  //YARP_DEBUG(Logger::get(), "DGRAM prep writing");
840  yarp::conf::ssize_t rem = local.length();
841  yarp::conf::ssize_t space = writeBuffer.length() - writeAvail;
842  bool shouldFlush = false;
843  if (rem >= space) {
844  rem = space;
845  shouldFlush = true;
846  }
847  memcpy(writeBuffer.get() + writeAvail, local.get(), rem);
848  writeAvail += rem;
849  local = Bytes(local.get() + rem, local.length() - rem);
850  if (shouldFlush) {
851  flush();
852  }
853  }
854 }
855 
856 
858 {
859  if (writeBuffer.get() == nullptr) {
860  return;
861  }
862 
863  // should set CRC
864  if (writeAvail <= CRC_SIZE) {
865  return;
866  }
867  addCrc(writeBuffer.get(), writeAvail, CRC_SIZE, pct);
868  pct++;
869 
870  if (writeAvail > 0) {
871  //yAssert(dgram != nullptr);
872  yarp::conf::ssize_t len = 0;
873 
874 #if defined(YARP_HAS_ACE)
875  if (mgram != nullptr) {
876  len = mgram->send(writeBuffer.get(), writeAvail);
878  std::string("MCAST - wrote ") + NetType::toString((int)len) + " bytes");
879  } else
880 #endif
881  if (dgram != nullptr) {
882 #if defined(YARP_HAS_ACE)
883  len = dgram->send(writeBuffer.get(), writeAvail, remoteHandle);
884 #else
885  len = send(dgram_sockfd, writeBuffer.get(), writeAvail, 0);
886 #endif
888  std::string("DGRAM - wrote ") + NetType::toString((int)len) + " bytes to " + remoteAddress.toString());
889  } else {
890  Bytes b(writeBuffer.get(), writeAvail);
891  monitor = ManagedBytes(b, false);
892  monitor.copy();
893  //printf("Monitored output of %d bytes\n", monitor.length());
894  len = monitor.length();
895  onMonitorOutput();
896  }
897  //if (len>WRITE_SIZE*0.75) {
898  if (len > writeBuffer.length() * 0.75) {
900  "long dgrams might need a little time");
901 
902  // Under heavy loads, packets could get dropped
903  // 640x480x3 images correspond to about 15 datagrams
904  // so there's not much time possible between them
905  // looked at iperf, it just does a busy-waiting delay
906  // there's an implementation below, but commented out -
907  // better solution was to increase recv buffer size
908 
909  double first = yarp::os::SystemClock::nowSystem();
910  double now;
911  int ct = 0;
912  do {
913  //printf("Busy wait... %d\n", ct);
916  ct++;
917  } while (now - first < 0.001);
918  }
919 
920  if (len < 0) {
921  happy = false;
922  YARP_DEBUG(Logger::get(), "DGRAM failed to send message with error: " + std::string(strerror(errno)));
923  return;
924  }
925  writeAvail -= len;
926 
927  if (writeAvail != 0) {
928  // well, we have a problem
929  // checksums will cause dumping
930  YARP_DEBUG(Logger::get(), "dgram/mcast send behaving badly");
931  }
932  }
933  // finally: writeAvail should be 0
934 
935  // make space for CRC
936  writeAvail = CRC_SIZE;
937 }
938 
939 
941 {
942  return happy;
943 }
944 
945 
947 {
948  readAt = 0;
949  readAvail = 0;
950  writeAvail = CRC_SIZE;
951  pct = 0;
952 }
953 
954 
956 {
957  //YARP_ERROR(Logger::get(), std::string("Packet begins: ")+(reader?"reader":"writer"));
958  pct = 0;
959 }
960 
962 {
963  //YARP_ERROR(Logger::get(), std::string("Packet ends: ")+(reader?"reader":"writer"));
964  if (!reader) {
965  pct = 0;
966  }
967 }
968 
970 {
971  return monitor.bytes();
972 }
973 
974 
976 {
977  monitor.clear();
978 }
979 
980 
982 {
983  if (dgram == nullptr) {
984  return false;
985  }
986 #if defined(YARP_HAS_ACE)
987  return (dgram->set_option(IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
988 #else
989  return (setsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
990 #endif
991 }
992 
994 {
995  int tos = -1;
996  if (dgram == nullptr) {
997  return tos;
998  }
999 #if defined(YARP_HAS_ACE)
1000  int optlen;
1001  dgram->get_option(IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
1002 #else
1003  socklen_t optlen;
1004  getsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
1005 #endif
1006  return tos;
1007 }
size_t length() const
Definition: Bytes.cpp:25
void beginPacket() override
Mark the beginning of a logical packet.
static std::string toString(int x)
Definition: NetType.cpp:48
#define YARP_INFO(log, x)
Definition: Logger.h:89
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:295
static bool checkCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int *store_altPct=nullptr)
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:310
void reset() override
Reset the stream.
void close() override
Terminate the stream.
#define YARP_DEBUG(log, x)
Definition: Logger.h:91
#define YARP_WARN(log, x)
Definition: Logger.h:88
::ssize_t ssize_t
Definition: numeric.h:60
static double nowSystem()
Definition: SystemClock.cpp:37
static unsigned long int getCrc(char *buf, size_t len)
Definition: NetType.cpp:129
A simple abstraction for a block of bytes.
Definition: Bytes.h:27
virtual bool openMcast(const Contact &group, const Contact &ipLocal)
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:225
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
const Bytes & bytes() const
static int toInt(const std::string &x)
Definition: NetType.cpp:70
static std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: Network.cpp:1334
const char * get() const
Definition: Bytes.cpp:30
#define CRC_SIZE
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:33
#define YARP_ERROR(log, x)
Definition: Logger.h:87
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
#define yAssert(x)
Definition: Log.h:112
void endPacket() override
Mark the end of a logical packet (see beginPacket).
static void addCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
void interrupt() override
Interrupt the stream.
static void delaySystem(double seconds)
Definition: SystemClock.cpp:32
bool setTypeOfService(int tos) override
void flush() override
Make sure all pending write operations are finished.
bool isOk() const override
Check if the stream is ok or in an error state.
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:236
Represents how to reach a part of a YARP network.
Definition: Contact.h:38
#define UDP_MAX_DATAGRAM_SIZE
virtual bool join(const Contact &group, bool sender, const Contact &ipLocal)
const char * get() const
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:21
An interface to the operating system, including Port based communication.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:24
virtual bool open(const Contact &remote)
static Logger & get()
Definition: Logger.cpp:45
The components from which ports and connections are built.
A stream abstraction for datagram communication.