YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
execstate.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
8#include <yarp/os/Time.h>
9
10#include <iostream>
11
12using namespace FSM;
13using namespace yarp::manager;
14
15
24Event* EventFactory::recoverEvent = new Event("recover");
25Event* EventFactory::startModuleEventOk = new Event("startModule:ok");
26Event* EventFactory::startModuleEventFailed = new Event("startModule:failed");
27Event* EventFactory::stopModuleEventOk = new Event("stopModule:ok");
28Event* EventFactory::stopModuleEventFailed = new Event("stopModule:failed");
29Event* EventFactory::killModuleEventOk = new Event("killModule:ok");
30Event* EventFactory::killModuleEventFailed = new Event("killModule:failed");
31Event* EventFactory::connectAllPortsEventOk = new Event("connectAllPorts:ok");
32Event* EventFactory::connectAllPortsEventFailed = new Event("connectAllPorts:failed");
33Event* EventFactory::disconnectAllPortsEventOk = new Event("disconnectAllPorts:ok");
34
35
36
41 : StateBase(pEventSink, "SUSPENDED")
42{
43 executable = pExecutable;
44}
45
46
47Suspended::~Suspended() = default;
48
53
58
63
64void Suspended::moduleFailed() { /* do nothing*/ }
65
66
67// refresh() from Suspended can be used for recovering from
68// unexptected termination of manager.
70{
72 int ret = executable->getBroker()->running();
73 if(ret == 1)
74 {
77 } else if (ret == -1) {
78 logger->addError(executable->getBroker()->error());
79 }
80}
81
82
86Ready::Ready(Executable* pExecutable, FSM::IEventSink* pEventSink)
87 : StateBase(pEventSink, "READY")
88{
89 executable = pExecutable;
90 bAborted = false;
91}
92
93
94Ready::~Ready() = default;
95
96bool Ready::checkPriorityPorts()
97{
98 CnnIterator itr;
99 for(itr=executable->getConnections().begin();
100 itr!=executable->getConnections().end(); itr++)
101 {
102 if ((*itr).withPriority()
103 && !executable->getBroker()->exists((*itr).from())) {
104 return false;
105 }
106 }
107 return true;
108}
109
110bool Ready::checkResources(bool silent)
111{
112 bool allOK = true;
114 for(itr=executable->getResources().begin();
115 itr!=executable->getResources().end(); itr++)
116 {
117 if(!executable->getBroker()->exists((*itr).getPort())) {
118 allOK = false;
119 if (silent) {
120 return false;
121 } else {
122 OSTRINGSTREAM msg;
123 msg<<(*itr).getPort()<<" does not exist";
125 continue;
126 }
127 }
128 // check the rpc request/reply if required
129 if(strlen((*itr).getRequest()) != 0) {
130 std::string reply = executable->getBroker()->requestRpc((*itr).getPort(),
131 (*itr).getRequest(),
132 (*itr).getTimeout());
133 if(reply.empty()) {
134 allOK = false;
135 OSTRINGSTREAM msg;
136 msg<<"cannot request resource "<<(*itr).getPort()<<" for "<<(*itr).getRequest();
138 if (silent) {
139 return false;
140 } else {
141 continue;
142 }
143 }
144
145 if (!compareString(reply.c_str(), (*itr).getReply())) {
146 allOK = false;
147 OSTRINGSTREAM msg;
148 msg<<"waiting for the expected reply from resource "<<(*itr).getPort();
149 msg<<" for request "<<(*itr).getRequest();
150 msg<<". (expected="<<(*itr).getReply()<<", received="<<reply<<")";
152 if (silent) {
153 return false;
154 } else {
155 continue;
156 }
157 }
158 }
159 }
160 return allOK;
161}
162
163bool Ready::timeout(double base, double timeout)
164{
166 if ((yarp::os::SystemClock::nowSystem() - base) > timeout) {
167 return true;
168 }
169 return false;
170}
171
173{
174
176
177 // wait for priority ports if auto connecte is enabled
178 if(executable->autoConnect())
179 {
180 bAborted = false;
181 while(!checkPriorityPorts())
182 {
184 if (bAborted) {
185 return;
186 }
187 }
188 }
189
190 // finding maximum resource-waiting timeout
192 double maxTimeout = 0;
193 for(itr=executable->getResources().begin();
194 itr!=executable->getResources().end(); itr++)
195 {
196 if ((*itr).getTimeout() > maxTimeout) {
197 maxTimeout = (*itr).getTimeout();
198 }
199 }
200
201 // waiting for resources
202 double base = yarp::os::SystemClock::nowSystem();
203 while(!checkResources()) {
204 if (bAborted) {
205 return;
206 }
207
208 if(timeout(base, maxTimeout)) {
209 // give it the last try and collect the error messages
210 if(!checkResources(false)) {
211 OSTRINGSTREAM msg;
212 msg<<"cannot run "<<executable->getCommand()<<" on "<<executable->getHost();
213 msg<<" : Timeout while waiting for "<<executable->getHost();
214 logger->addError(msg);
215
217 executable->getEvent()->onExecutableDied(executable);
218 return;
219 }
220 }
221 }
222
223 if (executable->getPostExecWait() > 0)
224 {
226 }
227 executable->restoreOriginalPostExecWait();
228 if(!executable->getBroker()->start())
229 {
230 OSTRINGSTREAM msg;
231 msg<<"cannot run "<<executable->getCommand()<<" on "<<executable->getHost();
232 if (!executable->getBroker()->error().empty()) {
233 msg << " : " << executable->getBroker()->error();
234 }
235 logger->addError(msg);
236
238 executable->getEvent()->onExecutableDied(executable);
239 }
240 else
241 {
243 executable->getEvent()->onExecutableStart(executable);
244 }
245}
246
247
249{
250 bAborted = true;
252}
253
254void Ready::moduleFailed() { /* do nothing */ }
255
256
261 : StateBase(pEventSink, "CONNECTING"),
262 executable(pExecutable),
263 bAborted(false)
264{}
265
266Connecting::~Connecting() = default;
267
268bool Connecting::checkNormalPorts()
269{
270 CnnIterator itr;
271 for(itr=executable->getConnections().begin();
272 itr!=executable->getConnections().end(); itr++)
273 {
274 if (!executable->getBroker()->exists((*itr).to()) || !executable->getBroker()->exists((*itr).from())) {
275 return false;
276 }
277 }
278 return true;
279}
280
281
283{
285 if(executable->autoConnect())
286 {
290 bAborted = false;
291 while(!checkNormalPorts())
292 {
294 if (bAborted) {
295 return;
296 }
297 }
298
299 CnnIterator itr;
300 for(itr=executable->getConnections().begin();
301 itr!=executable->getConnections().end(); itr++)
302 {
303 if( !executable->getBroker()->connect((*itr).from(), (*itr).to(),
304 (*itr).carrier()) )
305 {
306 OSTRINGSTREAM msg;
307 msg<<"cannot connect "<<(*itr).from() <<" to "<<(*itr).to();
308 if (!executable->getBroker()->error().empty()) {
309 msg << " : " << executable->getBroker()->error();
310 }
311 logger->addError(msg);
312 } else {
313 executable->getEvent()->onCnnStablished(&(*itr));
314 }
315 }
316 }
317
319}
320
322{
324 int ret = executable->getBroker()->running();
325 if (ret == 0) {
327 } else if (ret == -1) {
328 logger->addError(executable->getBroker()->error());
329 }
330}
331
333{
334 bAborted = true;
336}
337
339{
340 bAborted = true;
342 executable->getEvent()->onExecutableFailed(executable);
343}
344
345
350 : StateBase(pEventSink, "RUNNING")
351{
352 executable = pExecutable;
353}
354
355
356Running::~Running() = default;
357
359{
361 int ret = executable->getBroker()->running();
362 if (ret == 0) {
364 } else if (ret == -1) {
365 logger->addError(executable->getBroker()->error());
366 }
367}
368
370{
371 executable->getEvent()->onExecutableStart(executable);
372}
373
374
379
384
386{
388 executable->getEvent()->onExecutableFailed(executable);
389}
390
392{
393 executable->getEvent()->onCnnFailed(which);
394}
395
396
400Dying::Dying(Executable* pExecutable, FSM::IEventSink* pEventSink)
401 : StateBase(pEventSink, "DYING")
402{
403 executable = pExecutable;
404}
405
406
407Dying::~Dying() = default;
408
410{
412 if (executable->getPostStopWait() > 0)
413 {
415 }
416 executable->restoreOriginalPostStopWait();
417 if(!executable->getBroker()->stop())
418 {
419 OSTRINGSTREAM msg;
420 msg<<"cannot stop "<<executable->getCommand()<<" on "<<executable->getHost();
421 if (!executable->getBroker()->error().empty()) {
422 msg << " : " << executable->getBroker()->error();
423 }
424 logger->addError(msg);
425 executable->getEvent()->onError(executable);
427 }
428 else
429 {
431 executable->getEvent()->onExecutableStop(executable);
432 }
433}
434
436{
438 if(!executable->getBroker()->kill())
439 {
440 OSTRINGSTREAM msg;
441 msg<<"cannot kill "<<executable->getCommand()<<" on "<<executable->getHost();
442 if (!executable->getBroker()->error().empty()) {
443 msg << " : " << executable->getBroker()->error();
444 }
445 logger->addError(msg);
446 executable->getEvent()->onError(executable);
448 }
449 else
450 {
452 executable->getEvent()->onExecutableDied(executable);
453 }
454}
455
456
458{
460 if(executable->autoConnect())
461 {
462 CnnIterator itr;
463 for(itr=executable->getConnections().begin();
464 itr!=executable->getConnections().end(); itr++)
465 {
466 if( !executable->getBroker()->disconnect((*itr).from(), (*itr).to(), (*itr).carrier()) )
467 {
468 OSTRINGSTREAM msg;
469 msg<<"cannot disconnect "<<(*itr).from() <<" to "<<(*itr).to();
470 if (!executable->getBroker()->error().empty()) {
471 msg << " : " << executable->getBroker()->error();
472 }
473 logger->addError(msg);
474 } else {
475 executable->getEvent()->onCnnReleased(&(*itr));
476 }
477 }
478 }
479 // We do not need to handle event disconnectAllPortsEventOk
480}
481
483{
485 int ret = executable->getBroker()->running();
486 if (ret == 0) {
488 } else if (ret == -1) {
489 logger->addError(executable->getBroker()->error());
490 }
491}
492
493void Dying::kill() { /* do nothing */ }
494
496{
497 // Notice that we should not call onExecutableFailed
498 // in DYING state!
500 executable->getEvent()->onExecutableDied(executable);
501}
502
503
504
505
509Dead::Dead(Executable* pExecutable, FSM::IEventSink* pEventSink)
510 : StateBase(pEventSink, "DEAD")
511{
512 executable = pExecutable;
513}
514
515
516Dead::~Dead() = default;
517
522
524{
525 executable->getEvent()->onExecutableStop(executable);
526}
527
528
533
534// refresh() from Dead can be used for recovering from
535// unexpect termination of manager.
537{
539 int ret = executable->getBroker()->running();
540 if(ret == 1)
541 {
542 executable->getEvent()->onExecutableStart(executable);
544 } else if (ret == -1) {
545 logger->addError(executable->getBroker()->error());
546 }
547}
548
549
550void Dead::moduleFailed() { /* do nothing*/ }
551
552
553
558{
559 executable = pExecutable;
560 // creating states
561 suspended = new Suspended(executable, this);
562 ready = new Ready(executable, this);
563 connecting = new Connecting(executable, this);
564 running = new Running(executable, this);
565 dying = new Dying(executable, this);
566 dead = new Dead(executable, this);
567
568 // setting initial state
569 setInitState(suspended);
570
571 // transitions from suspended
572 addTransition(suspended, EventFactory::startEvent, ready);
573 addTransition(suspended, EventFactory::recoverEvent, running); //recovering
574 addTransition(suspended, EventFactory::killEvent, dying);
576 addTransition(suspended, EventFactory::failedEvent, suspended);
577
578 // transitions from ready
583
584 // transitions from connecting
586 addTransition(connecting, EventFactory::failedEvent, dead);
587 addTransition(connecting, EventFactory::killEvent, dying);
589
590 // transitions from running
591 addTransition(running, EventFactory::stopEvent, dying);
593 addTransition(running, EventFactory::killEvent, dying);
595
596 // transitions from dying
604
605 // transitions from dead
608 addTransition(dead, EventFactory::recoverEvent, running); // recovering
616
617}
618
620{
621 delete running;
622 delete suspended;
623 delete ready;
624 delete connecting;
625 delete dying;
626 delete dead;
627}
628
630{
631 auto* tr = dynamic_cast<ITransition*>(currentState());
632 if (tr) {
633 tr->refresh();
634 }
635}
636
638{
639 auto* tr = dynamic_cast<ITransition*>(currentState());
640 if (tr) {
641 tr->start();
642 }
643}
644
646{
647 auto* tr = dynamic_cast<ITransition*>(currentState());
648 if (tr) {
649 tr->stop();
650 }
651}
652
654{
655 auto* tr = dynamic_cast<ITransition*>(currentState());
656 if (tr) {
657 tr->kill();
658 }
659}
660
662{
663 auto* tr = dynamic_cast<ITransition*>(currentState());
664 if (tr) {
665 tr->startModule();
666 }
667}
668
670{
671 auto* tr = dynamic_cast<ITransition*>(currentState());
672 if (tr) {
673 tr->stopModule();
674 }
675}
676
678{
679 auto* tr = dynamic_cast<ITransition*>(currentState());
680 if (tr) {
681 tr->killModule();
682 }
683}
684
686{
687 auto* tr = dynamic_cast<ITransition*>(currentState());
688 if (tr) {
689 tr->connectAllPorts();
690 }
691}
692
694{
695 auto* tr = dynamic_cast<ITransition*>(currentState());
696 if (tr) {
697 tr->disconnectAllPorts();
698 }
699}
700
702{
703 auto* tr = dynamic_cast<ITransition*>(currentState());
704 if (tr) {
705 tr->moduleFailed();
706 }
707}
708
710{
711 auto* tr = dynamic_cast<ITransition*>(currentState());
712 if (tr) {
713 tr->connectionFailed(which);
714 }
715}
716
717// For debugging
719 Event* event, StateBase* current)
720{
721 /*
722 std::cout<<executable->getID()<<": ";
723 std::cout<<"["<<previous->getName()<<"] ";
724 std::cout<<"--- ("<<event->getName()<<"/"<<event->getTimeStamp()<<") --> ";
725 std::cout<<"["<<current->getName()<<"]"<<endl;
726 */
727}
bool ret
class IEventSink
Definition fsm.h:29
class IEventSink
Definition fsm.h:56
Class StateBase.
Definition fsm.h:77
void castEvent(Event *event)
Definition fsm.h:90
StateBase * currentState()
Definition fsm.h:115
void setInitState(StateBase *pState)
Definition fsm.h:128
void addTransition(StateBase *source, Event *event, StateBase *target)
Definition fsm.h:132
virtual bool exists(const std::string &port)=0
virtual int running()=0
virtual bool connect(const std::string &from, const std::string &to, const std::string &carrier, bool persist=false)=0
virtual bool kill()=0
virtual bool stop()=0
virtual std::string requestRpc(const std::string &szport, const std::string &request, double timeout=0.0)=0
virtual bool disconnect(const std::string &from, const std::string &to, const std::string &carrier)=0
virtual bool start()=0
virtual std::string error()=0
class Connecting
Definition execstate.h:108
Connecting(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Connecting.
void moduleFailed() override
void connectAllPorts() override
void refresh() override
Dead(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Dead.
void moduleFailed() override
void start() override
void stop() override
void kill() override
void refresh() override
Dying(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Dying.
void disconnectAllPorts() override
void refresh() override
void kill() override
void stopModule() override
void moduleFailed() override
void killModule() override
Singleton class ErrorLogger.
Definition utility.h:58
void addError(const char *szError)
Definition utility.cpp:126
void addWarning(const char *szWarning)
Definition utility.cpp:104
static ErrorLogger * Instance()
Singleton class ErrorLogger.
Definition utility.cpp:98
static FSM::Event * connectAllPortsEventFailed
Definition execstate.h:54
static FSM::Event * startModuleEventOk
Definition execstate.h:47
static FSM::Event * startModuleEventFailed
Definition execstate.h:48
static FSM::Event * stopModuleEventFailed
Definition execstate.h:50
static FSM::Event * killModuleEventOk
Definition execstate.h:51
static FSM::Event * startEvent
Initializing event factory.
Definition execstate.h:42
static FSM::Event * killEvent
Definition execstate.h:44
static FSM::Event * connectAllPortsEventOk
Definition execstate.h:53
static FSM::Event * disconnectAllPortsEventOk
Definition execstate.h:55
static FSM::Event * killModuleEventFailed
Definition execstate.h:52
static FSM::Event * failedEvent
Definition execstate.h:45
static FSM::Event * recoverEvent
Definition execstate.h:46
static FSM::Event * stopModuleEventOk
Definition execstate.h:49
static FSM::Event * stopEvent
Definition execstate.h:43
ExecMachine(Executable *pExecutable)
Class ExecMachine.
void onTransition(FSM::StateBase *previous, FSM::Event *event, FSM::StateBase *current) override
Callback onTransition represents the change in the states.
void connectionFailed(void *which)
Class Executable.
Definition executable.h:71
CnnContainer & getConnections()
Definition executable.h:89
ResourceContainer & getResources()
Definition executable.h:91
const char * getCommand()
Definition executable.h:101
all transitions are used in state machine
Definition execstate.h:22
virtual void stopModule()
Definition execstate.h:32
virtual void startModule()
Definition execstate.h:31
virtual void connectionFailed(void *which)
Definition execstate.h:28
virtual void disconnectAllPorts()
Definition execstate.h:35
virtual void connectAllPorts()
Definition execstate.h:34
virtual void moduleFailed()=0
virtual void killModule()
Definition execstate.h:33
virtual void refresh()
Definition execstate.h:27
virtual void onCnnFailed(void *which)
Definition executable.h:59
virtual void onExecutableFailed(void *which)
Definition executable.h:55
virtual void onExecutableStop(void *which)
Definition executable.h:53
virtual void onExecutableDied(void *which)
Definition executable.h:54
virtual void onCnnStablished(void *which)
Definition executable.h:57
virtual void onCnnReleased(void *which)
Definition executable.h:58
virtual void onExecutableStart(void *which)
Definition executable.h:52
virtual void onError(void *which)
Definition executable.h:60
void moduleFailed() override
Ready(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Ready.
Definition execstate.cpp:86
void startModule() override
void kill() override
void connectionFailed(void *which) override
void refresh() override
void start() override
void moduleFailed() override
void stop() override
Running(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Running.
void kill() override
class Suspended
Definition execstate.h:63
void kill() override
Definition execstate.cpp:59
void stop() override
Definition execstate.cpp:54
void refresh() override
Definition execstate.cpp:69
Executable * executable
Definition execstate.h:76
Suspended(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Suspended.
Definition execstate.cpp:40
void moduleFailed() override
Definition execstate.cpp:64
void start() override
Definition execstate.cpp:49
static double nowSystem()
static void delaySystem(double seconds)
Definition fsm.h:17
bool compareString(const char *szFirst, const char *szSecond)
Definition utility.cpp:326
std::vector< ResYarpPort >::iterator ResourceIterator
std::vector< Connection >::iterator CnnIterator
std::stringstream OSTRINGSTREAM
Definition utility.h:50