YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
manager.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include <cstring>
17#include <yarp/os/LogStream.h>
18
20
21
22#define RUN_TIMEOUT 10 // Run timeout in seconds
23#define STOP_TIMEOUT 30 // Stop timeout in seconds
24#define KILL_TIMEOUT 10 // kill timeout in seconds
25
26#define BROKER_LOCAL "local"
27#define BROKER_YARPRUN "yarprun"
28#define BROKER_YARPDEV "yarpdev"
29
30
31using namespace yarp::manager;
32
33
38Manager::Manager(bool withWatchDog) : MEvent()
39{
40 logger = ErrorLogger::Instance();
41 bWithWatchDog = withWatchDog;
42 bAutoDependancy = false;
43 bAutoConnect = false;
44 bRestricted = false;
45 strDefBroker = BROKER_YARPRUN;
46 knowledge.createFrom(nullptr, nullptr, nullptr);
47 connector.init();
48}
49
50Manager::Manager(const char* szModPath, const char* szAppPath,
51 const char* szResPath, bool withWatchDog)
52{
53 logger = ErrorLogger::Instance();
54 bWithWatchDog = withWatchDog;
55 bAutoDependancy = false;
56 bAutoConnect = false;
57 bRestricted = false;
58 strDefBroker = BROKER_YARPRUN;
59
60 XmlModLoader modload(szModPath, nullptr);
61 XmlModLoader* pModLoad = &modload;
62 if (!modload.init()) {
63 pModLoad = nullptr;
64 }
65
66 XmlAppLoader appload(szAppPath, nullptr);
67 XmlAppLoader* pAppLoad = &appload;
68 if (!appload.init()) {
69 pAppLoad = nullptr;
70 }
71
72 XmlResLoader resload(szResPath, nullptr);
73 XmlResLoader* pResLoad = &resload;
74 if (!resload.init()) {
75 pResLoad = nullptr;
76 }
77
78 knowledge.createFrom(pModLoad, pAppLoad, pResLoad);
79 connector.init();
80}
81
82
84{
85 // untopic persistent connections
86 rmconnect();
87 clearExecutables();
88}
89
90bool Manager::addApplication(const char* szFileName, char** szAppName_, bool modifyName)
91{
92 if (find(listOfXml.begin(), listOfXml.end(), szFileName) == listOfXml.end()) {
93 listOfXml.emplace_back(szFileName);
94 } else {
95 return true; //it means that the app exist already so it is safe to return true
96 }
97 XmlAppLoader appload(szFileName);
98 if (!appload.init()) {
99 return false;
100 }
101 Application* application = appload.getNextApplication();
102 if (!application) {
103 return false;
104 }
105
106 return knowledge.addApplication(application, szAppName_, modifyName);
107}
108
109
110bool Manager::addApplications(const char* szPath)
111{
112 XmlAppLoader appload(szPath, nullptr);
113 if (!appload.init()) {
114 return false;
115 }
116 Application* application;
117 while((application = appload.getNextApplication()))
118 {
119 const char* currentFile = application->getXmlFile();
120 knowledge.addApplication(application);
121 listOfXml.emplace_back(currentFile);
122 }
123 return true;
124}
125
126
127bool Manager::addModule(const char* szFileName)
128{
129 XmlModLoader modload(szFileName);
130 if (!modload.init()) {
131 return false;
132 }
133 Module* module = modload.getNextModule();
134 if (!module) {
135 return false;
136 }
137 return knowledge.addModule(module);
138}
139
140
141bool Manager::addModules(const char* szPath)
142{
143 XmlModLoader modload(szPath, nullptr);
144 if (!modload.init()) {
145 return false;
146 }
147 Module* module;
148 while ((module = modload.getNextModule())) {
149 knowledge.addModule(module);
150 }
151 return true;
152}
153
154
155bool Manager::addResource(const char* szFileName)
156{
157 XmlResLoader resload(szFileName);
158 if (!resload.init()) {
159 return false;
160 }
161 GenericResource* resource;
162 bool bloaded = false;
163 while ((resource = resload.getNextResource())) {
164 bloaded |= knowledge.addResource(resource);
165 }
166 return bloaded;
167}
168
169
170bool Manager::addResources(const char* szPath)
171{
172 XmlResLoader resload(szPath, nullptr);
173 if (!resload.init()) {
174 return false;
175 }
176 GenericResource* resource;
177 while ((resource = resload.getNextResource())) {
178 knowledge.addResource(resource);
179 }
180 return true;
181}
182
183
184bool Manager::removeApplication(const char *szFileName, const char* szAppName)
185{
186 //Note: use it with care. it is better we first check that no application
187 //is loaded.
188 if(!runnables.empty())
189 {
190 logger->addError("Application cannot be removed if there is a loaded application");
191 return false;
192 }
193 listOfXml.erase(std::remove(listOfXml.begin(), listOfXml.end(), szFileName), listOfXml.end());
194 Application* app = knowledge.getApplication(szAppName);
195 if (!app) {
196 return false;
197 }
198 return knowledge.removeApplication(app);
199}
200
201
202bool Manager::removeModule(const char* szModName)
203{
204 //Note: use it with care. it is better we first check that no application
205 //is loaded.
206 if(!runnables.empty())
207 {
208 logger->addError("Module cannot be removed if there is a loaded application");
209 return false;
210 }
211
212 Module* mod = knowledge.getModule(szModName);
213 if (!mod) {
214 return false;
215 }
216
217 return knowledge.removeModule(mod);
218}
219
220bool Manager::removeResource(const char* szResName)
221{
222 //Note: use it with care. it is better we first check that no application
223 //is loaded.
224 if(!runnables.empty())
225 {
226 logger->addError("Resource cannot be removed if there is a loaded application");
227 return false;
228 }
229
230 GenericResource* res = knowledge.getResource(szResName);
231 if (!res) {
232 return false;
233 }
234
235 return knowledge.removeResource(res);
236}
237
238
239
240bool Manager::loadApplication(const char* szAppName)
241{
242 __CHECK_NULLPTR(szAppName);
243
244 if(!allStopped())
245 {
246 logger->addError("Please stop current running application first.");
247 return false;
248 }
249
250 strAppName = szAppName;
251
252 // set all resources as unavailable
253 ResourcePContainer allresources = knowledge.getResources();
254 for(auto& allresource : allresources)
255 {
256 auto* comp = dynamic_cast<Computer*>(allresource);
257 if (comp) {
258 comp->setAvailability(false);
259 }
260 }
261
262 return prepare(true);
263}
264
265
266bool Manager::saveApplication(const char* szAppName, const char* fileName)
267{
268 Application* pApp = knowledge.getApplication();
269 __CHECK_NULLPTR(pApp);
270
271 XmlAppSaver appsaver(fileName);
272 return knowledge.saveApplication(&appsaver, pApp);
273}
274
275
277{
279 bool ret = prepare(false);
280 return ret;
281}
282
284{
285 if (id < runnables.size())
286 {
287 return runnables[id];
288 }
289 else
290 {
291 return nullptr;
292 }
293}
294
296{
297 Executable* exe = getExecutableById(id);
298 if (removeBroker(exe))
299 {
300 modules[id]->setHost(exe->getHost());
301 Broker* broker = createBroker(modules[id]);
302 if ( broker == nullptr)
303 {
304 return false;
305 }
306 broker->setDisplay(modules[id]->getDisplay());
307 exe->setAndInitializeBroker(broker);
308 }
309 else
310 {
311 return false;
312 }
313 return true;
314}
315
316
317bool Manager::prepare(bool silent)
318{
319 knowledge.reasolveDependency(strAppName.c_str(), bAutoDependancy, silent);
320
321 clearExecutables();
322 connections.clear();
323 modules.clear();
324 resources.clear();
325 connections = knowledge.getSelConnection();
326 modules = knowledge.getSelModules();
327 resources = knowledge.getSelResources();
328
337 ModulePIterator itr;
338 int id = 0;
339 for(itr=modules.begin(); itr!=modules.end(); itr++)
340 {
341 Broker* broker = createBroker(*itr);
342 broker->setDisplay((*itr)->getDisplay());
343 auto* exe = new Executable(broker, (MEvent*)this, *itr, bWithWatchDog);
344 exe->setID(id++);
345 exe->setCommand((*itr)->getName());
346 exe->setParam((*itr)->getParam());
347 exe->setHost((*itr)->getHost());
348 exe->setStdio((*itr)->getStdio());
349 exe->setWorkDir((*itr)->getWorkDir());
350 exe->setPostExecWait((*itr)->getPostExecWait());
351 exe->setPostStopWait((*itr)->getPostStopWait());
352 exe->setOriginalPostExecWait((*itr)->getPostExecWait());
353 exe->setOriginalPostStopWait((*itr)->getPostStopWait());
354 std::string env;
355 if ((*itr)->getPrefix() && strlen((*itr)->getPrefix())) {
356 env = std::string("YARP_PORT_PREFIX=") + std::string((*itr)->getPrefix());
357 }
358 if ((*itr)->getEnvironment() && strlen((*itr)->getEnvironment())) {
359 env += (env.length()) ? (std::string(";") + (*itr)->getEnvironment()) : (*itr)->getEnvironment();
360 }
361 exe->setEnv(env.c_str());
362
368 //CnnIterator cnn;
369 //for(cnn=connections.begin(); cnn!=connections.end(); cnn++)
370 // if((*cnn).owner() == (*itr))
371 // exe->addConnection(*cnn);
372
376 for(auto& resource : resources)
377 {
378 auto* res = dynamic_cast<ResYarpPort*>(resource);
379 if (res && (res->owner() == (*itr))) {
380 exe->addResource(*res);
381 }
382 }
383
384 runnables.push_back(exe);
385 }
386
387 return true;
388}
389
390Broker* Manager::createBroker(Module* module)
391{
392 if(strlen(module->getBroker()) == 0)
393 {
394 if (compareString(module->getHost(), "localhost")) {
395 return (new LocalBroker());
396 } else {
397 return (new YarpBroker());
398 }
399 }
400 else if(compareString(module->getBroker(), BROKER_YARPDEV))
401 {
402 if (compareString(module->getHost(), "localhost")) {
403 return (new YarpdevLocalBroker());
404 } else {
405 return (new YarpdevYarprunBroker());
406 }
407 } else if (compareString(module->getHost(), "localhost")) {
408 return (new ScriptLocalBroker(module->getBroker()));
409 }
410
411 return (new ScriptYarprunBroker(module->getBroker()));
412}
413
414bool Manager::removeBroker(Executable* exe)
415{
416 if (exe == nullptr)
417 {
418 return false;
419 }
420 else if(exe->state() == RUNNING)
421 {
422 exe->stop();
423 exe->stopWatchDog();
424 }
425
426 exe->removeBroker(); //TODO possible race condition in case watchdog enabled.
427 return true;
428}
429
430bool Manager::updateExecutable(unsigned int id, const char* szparam,
431 const char* szhost, const char* szstdio,
432 const char* szworkdir, const char* szenv )
433{
434 if(runnables.empty())
435 {
436 logger->addError("Application is not loaded.");
437 return false;
438 }
439
440 if(id>=runnables.size())
441 {
442 logger->addError("Module id is out of range.");
443 return false;
444 }
445
446 Executable* exe = runnables[id];
447 exe->setParam(szparam);
448 exe->setHost(szhost);
449 exe->setStdio(szstdio);
450 exe->setWorkDir(szworkdir);
451 exe->setEnv(szenv);
452 return true;
453}
454
455
456bool Manager::updateConnection(unsigned int id, const char* from,
457 const char* to, const char* carrier)
458{
459 if(id>=connections.size())
460 {
461 logger->addError("Connection id is out of range.");
462 return false;
463 }
464
465 /*
466 if(connections[id].owner())
467 {
468 OSTRINGSTREAM msg;
469 msg<<"Connection ["<<connections[id].from()<<" -> ";
470 msg<<connections[id].to()<<"] cannot be updated.";
471 logger->addWarning(msg);
472 return false;
473 }
474 */
475
476 connections[id].setFrom(from);
477 connections[id].setTo(to);
478 connections[id].setCarrier(carrier);
479
480 return true;
481}
482
483Node* Manager::getNode(std::string appName)
484{
485 Node* node = knowledge.getNode(appName);
486 return node;
487}
488
489bool Manager::exist(unsigned int id)
490{
491 if(id>=resources.size())
492 {
493 logger->addError("Resource id is out of range.");
494 return false;
495 }
496
497 GenericResource* res = resources[id];
498 if (compareString(res->getName(), "localhost")) {
499 return true;
500 }
501
502 if(dynamic_cast<Computer*>(res) || dynamic_cast<ResYarpPort*>(res))
503 {
504 if(res->getName())
505 {
506 //YarpBroker broker;
507 //broker.init();
508 std::string strPort = res->getName();
509 if (strPort[0] != '/') {
510 strPort = std::string("/") + strPort;
511 }
512 if(dynamic_cast<ResYarpPort*>(res))
513 {
514 res->setAvailability(connector.exists(strPort.c_str()));
515 }
516 else //if it is a computer I have to be sure that the port has been opened through yarp runner
517 {
518 yarp::os::Bottle cmd, reply;
519 cmd.addString("get");
520 cmd.addString(strPort);
521 cmd.addString("yarprun");
522 bool ret = yarp::os::impl::NameClient::getNameClient().send(cmd, reply);
523 if(!ret)
524 {
525 yError()<<"Manager::Cannot contact the NameClient";
526 return false;
527 }
528 if(reply.size()==6)
529 {
530 if(reply.get(5).asBool())
531 {
532 res->setAvailability(true);
533 }
534 else
535 {
536 res->setAvailability(false);
537 }
538
539 }
540 else
541 {
542 res->setAvailability(false);
543 }
544
545 }
546
547 }
548 }
549 return res->getAvailability();
550}
551
552
554{
555 YarpBroker broker;
556 broker.init();
557
558 // finding all available yarp ports
559 std::vector<std::string> ports;
560 broker.getAllPorts(ports);
561
562 ResourcePContainer allresources = knowledge.getResources();
563 for(auto& allresource : allresources)
564 {
565 auto* comp = dynamic_cast<Computer*>(allresource);
566 if(comp && updateResource(comp))
567 {
568 //set all as unavailable
569 for(int i=0; i<comp->peripheralCount(); i++)
570 {
571 auto* res = dynamic_cast<ResYarpPort*>(&comp->getPeripheralAt(i));
572 if (res) {
573 res->setAvailability(false);
574 }
575 }
576
577 // adding all available yarp ports as peripherals
578 for(auto& port : ports)
579 {
580 ResYarpPort resport;
581 resport.setName(port.c_str());
582 resport.setPort(port.c_str());
583
584 bool bfound = false;
585 for(int i=0; i<comp->peripheralCount(); i++)
586 {
587 auto* res = dynamic_cast<ResYarpPort*>(&comp->getPeripheralAt(i));
588 if(res && (std::string(res->getName()) == std::string(resport.getName())))
589 {
590 res->setAvailability(true);
591 bfound = true;
592 break;
593 }
594 }
595 if (!bfound) {
596 comp->addPeripheral(resport);
597 }
598 }
599 }
600 } // end of for
601
602 return true;
603}
604
605
606bool Manager::updateResource(const char* szName)
607{
608 GenericResource* res = knowledge.getResource(szName);
609 if (!res) {
610 return false;
611 }
612 return updateResource(res);
613}
614
616{
617 YarpBroker broker;
618 broker.init();
619
620 auto* comp = dynamic_cast<Computer*>(resource);
621 if (!comp || !strlen(comp->getName())) {
622 return false;
623 }
624
625 if (compareString(comp->getName(), "localhost")) {
626 return false;
627 }
628
630 std::string strServer = comp->getName();
631 if (strServer[0] != '/') {
632 strServer = std::string("/") + strServer;
633 }
634 if(!broker.getSystemInfo(strServer.c_str(), info))
635 {
636 logger->addError(broker.error());
637 comp->setAvailability(false);
638 }
639 else
640 {
641 comp->setAvailability(true);
642
643 comp->getMemory().setTotalSpace(info.memory.totalSpace*1024);
644 comp->getMemory().setFreeSpace(info.memory.freeSpace*1024);
645
646 comp->getStorage().setTotalSpace(info.storage.totalSpace*1024);
647 comp->getStorage().setFreeSpace(info.storage.freeSpace*1024);
648
649 //comp->getNetwork().setIP4(info.network.ip4.c_str());
650 //comp->getNetwork().setIP6(info.network.ip6.c_str());
651 //comp->getNetwork().setMAC(info.network.mac.c_str());
652
653
654 comp->getProcessor().setArchitecture(info.processor.architecture.c_str());
655 comp->getProcessor().setCores(info.processor.cores);
656 comp->getProcessor().setSiblings(info.processor.siblings);
657 comp->getProcessor().setFrequency(info.processor.frequency);
658 comp->getProcessor().setModel(info.processor.model.c_str());
659 LoadAvg load;
660 load.loadAverageInstant = (double)info.load.cpuLoadInstant;
661 load.loadAverage1 = info.load.cpuLoad1;
662 load.loadAverage5 = info.load.cpuLoad5;
663 load.loadAverage15 = info.load.cpuLoad15;
664 comp->getProcessor().setCPULoad(load);
665
666 comp->getPlatform().setName(info.platform.name.c_str());
667 comp->getPlatform().setDistribution(info.platform.distribution.c_str());
668 comp->getPlatform().setRelease(info.platform.release.c_str());
669 }
670 return true;
671}
672
673bool Manager::waitingModuleRun(unsigned int id)
674{
675 double base = yarp::os::Time::now();
676 double wait = runnables[id]->getPostExecWait() + RUN_TIMEOUT;
677 while (!timeout(base, wait)) {
678 if (running(id)) {
679 return true;
680 }
681 }
682
683 OSTRINGSTREAM msg;
684 msg<<"Failed to run "<<runnables[id]->getCommand();
685 msg<<" on "<<runnables[id]->getHost();
686 msg<<". (State: "<<runnables[id]->state();
687 msg<<", parameter: "<<runnables[id]->getParam()<<")";
688 logger->addError(msg);
689 return false;
690
691}
692
693bool Manager::waitingModuleStop(unsigned int id)
694{
695 double base = yarp::os::Time::now();
696 while (!timeout(base, STOP_TIMEOUT)) {
697 if (!running(id)) {
698 return true;
699 }
700 }
701
702 OSTRINGSTREAM msg;
703 msg<<"Failed to stop "<<runnables[id]->getCommand();
704 msg<<" on "<<runnables[id]->getHost();
705 msg<<". (State: "<<runnables[id]->state();
706 msg<<", paramete: "<<runnables[id]->getParam()<<")";
707 logger->addError(msg);
708 return false;
709}
710
711bool Manager::waitingModuleKill(unsigned int id)
712{
713 double base = yarp::os::Time::now();
714 while (!timeout(base, KILL_TIMEOUT)) {
715 if (!running(id)) {
716 return true;
717 }
718 }
719
720 OSTRINGSTREAM msg;
721 msg<<"Failed to kill "<<runnables[id]->getCommand();
722 msg<<" on "<<runnables[id]->getHost();
723 msg<<". (State: "<<runnables[id]->state();
724 msg<<", paramete: "<<runnables[id]->getParam()<<")";
725 logger->addError(msg);
726 return false;
727
728}
729
730
731bool Manager::existPortFrom(unsigned int id)
732{
733 if(id>=connections.size())
734 {
735 logger->addError("Connection id is out of range.");
736 return false;
737 }
738 std::string portName(connections[id].from());
739 if(portName.find(' ') != std::string::npos)
740 {
741 std::string message = "Port name \"" + portName + "\" contains spaces.";
742 logger->addError(message.c_str());
743 return false;
744 }
745
746 bool exists = connector.exists(portName.c_str());
747 connections[id].setFromExists(exists);
748 return exists;
749}
750
751
752bool Manager::existPortTo(unsigned int id)
753{
754 if(id>=connections.size())
755 {
756 logger->addError("Connection id is out of range.");
757 return false;
758 }
759 std::string portName(connections[id].to());
760 if(portName.find(' ') != std::string::npos)
761 {
762 std::string message = "Port name \"" + portName + "\" contains spaces.";
763 logger->addError(message.c_str());
764 return false;
765 }
766
767 bool exists = connector.exists(portName.c_str());
768 connections[id].setToExists(exists);
769 return exists;
770}
771
772
774{
780 bool ret = true;
781 ResourcePIterator itrRes;
782 for(itrRes=resources.begin(); itrRes!=resources.end(); itrRes++)
783 {
784 if(!(*itrRes)->getAvailability())
785 {
786 ret = false;
787 OSTRINGSTREAM err;
788 err<<"Resource "<<(*itrRes)->getName()<<" is not available!";
789 logger->addError(err);
790 }
791 }
792
793 return ret;
794}
795
796
797
798bool Manager::run(unsigned int id, bool async)
799{
800 if(runnables.empty())
801 {
802 logger->addError("Application is not loaded.");
803 return false;
804 }
805
806 if(id>=runnables.size())
807 {
808 logger->addError("Module id is out of range.");
809 return false;
810 }
811
812 if (runnables[id]->shouldChangeBroker())
813 {
814 if (!switchBroker(id))
815 {
816 logger->addError("Failing to switch broker");
817 return false;
818 }
819 }
820
821 runnables[id]->disableAutoConnect();
822 runnables[id]->start();
823 if(bWithWatchDog) {
825 runnables[id]->startWatchDog();
826 }
827 if (async) {
828 return true;
829 }
830
831 // waiting for running
832 return waitingModuleRun(id);
833}
834
836{
837 if(runnables.empty())
838 {
839 logger->addError("Application is not loaded.");
840 return false;
841 }
842
843 if(!checkDependency())
844 {
845 if(bRestricted)
846 {
847 logger->addError("Some of external ports dependency are not satisfied.");
848 return false;
849 } else {
850 logger->addWarning("Some of external ports dependency are not satisfied.");
851 }
852 }
853
855 double wait = 0.0;
856 for(itr=runnables.begin(); itr!=runnables.end(); itr++)
857 {
858 if (bAutoConnect) {
859 (*itr)->enableAutoConnect();
860 } else {
861 (*itr)->disableAutoConnect();
862 }
863 (*itr)->start();
865 wait = (wait > (*itr)->getPostExecWait()) ? wait : (*itr)->getPostExecWait();
866 }
867
868 // waiting for running
869 double base = yarp::os::SystemClock::nowSystem();
870 while (!timeout(base, wait + RUN_TIMEOUT)) {
871 if (allRunning()) {
872 break;
873 }
874 }
875
876 // starting the watchdog if needed
877 if(bWithWatchDog) {
878 for (itr = runnables.begin(); itr != runnables.end(); itr++) {
879 (*itr)->startWatchDog();
880 }
881 }
882
883 if(!allRunning())
884 {
886 for (itr = runnables.begin(); itr != runnables.end(); itr++) {
887 if((*itr)->state() != RUNNING)
888 {
889 OSTRINGSTREAM msg;
890 msg<<"Failed to run "<<(*itr)->getCommand();
891 msg<<" on "<<(*itr)->getHost();
892 msg<<". (State: "<<(*itr)->state();
893 msg<<", parameter: "<<(*itr)->getParam()<<")";
894 logger->addError(msg);
895 }
896 }
897
898 if(bRestricted)
899 {
900 kill();
901 return false;
902 }
903 }
904
905 /* connecting extra ports*/
906 if (bAutoConnect) {
907 if(!connectExtraPorts())
908 {
909 logger->addError("Failed to stablish some of connections.");
910 if (bRestricted) {
911 return false;
912 }
913 }
914 }
915
916 return true;
917}
918
919bool Manager::stop(unsigned int id, bool async)
920{
921 if(runnables.empty())
922 {
923 logger->addError("Application is not loaded.");
924 return false;
925 }
926
927 if(id>=runnables.size())
928 {
929 logger->addError("Module id is out of range.");
930 return false;
931 }
932
933 runnables[id]->stop();
934
935 if (async) {
936 return true;
937 }
938
939 // waiting for stop
940 return waitingModuleStop(id);
941}
942
943
945{
946 if (runnables.empty()) {
947 return true;
948 }
949
951 for(itr=runnables.begin(); itr!=runnables.end(); itr++)
952 {
953 (*itr)->stop();
955 }
956
957 double base = yarp::os::SystemClock::nowSystem();
958 while (!timeout(base, STOP_TIMEOUT)) {
959 if (allStopped()) {
960 break;
961 }
962 }
963
964 if(!allStopped())
965 {
967 for (itr = runnables.begin(); itr != runnables.end(); itr++) {
968 if( ((*itr)->state() != SUSPENDED) &&
969 ((*itr)->state() != DEAD))
970 {
971 OSTRINGSTREAM msg;
972 msg<<"Failed to stop "<<(*itr)->getCommand();
973 msg<<" on "<<(*itr)->getHost();
974 msg<<". (State: "<<(*itr)->state();
975 msg<<", paramete: "<<(*itr)->getParam()<<")";
976 logger->addError(msg);
977 }
978 }
979 return false;
980 }
981
982 return true;
983}
984
985bool Manager::kill(unsigned int id, bool async)
986{
987 if(runnables.empty())
988 {
989 logger->addError("Application is not loaded.");
990 return false;
991 }
992
993 if(id>=runnables.size())
994 {
995 logger->addError("Module id is out of range.");
996 return false;
997 }
998
999 runnables[id]->kill();
1000
1001 if (async) {
1002 return true;
1003 }
1004 return waitingModuleKill(id);
1005}
1006
1007
1009{
1010 if (runnables.empty()) {
1011 return true;
1012 }
1013
1015 for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1016 {
1017 (*itr)->kill();
1019 }
1020
1021 double base = yarp::os::SystemClock::nowSystem();
1022 while (!timeout(base, KILL_TIMEOUT)) {
1023 if (allStopped()) {
1024 break;
1025 }
1026 }
1027
1028 if(!allStopped())
1029 {
1031 for (itr = runnables.begin(); itr != runnables.end(); itr++) {
1032 if( ((*itr)->state() != SUSPENDED) &&
1033 ((*itr)->state() != DEAD))
1034 {
1035 OSTRINGSTREAM msg;
1036 msg<<"Failed to kill "<<(*itr)->getCommand();
1037 msg<<" on "<<(*itr)->getHost();
1038 msg<<". (State: "<<(*itr)->state();
1039 msg<<", paramete: "<<(*itr)->getParam()<<")";
1040 logger->addError(msg);
1041 }
1042 }
1043 return false;
1044 }
1045
1046 return true;
1047}
1048
1049
1050void Manager::clearExecutables()
1051{
1053 for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1054 {
1055 // broker will be deleted by Executable
1056 delete (*itr);
1057 }
1058 runnables.clear();
1059}
1060
1061
1062bool Manager::connect(unsigned int id)
1063{
1064 if(id>=connections.size())
1065 {
1066 logger->addError("Connection id is out of range.");
1067 return false;
1068 }
1069
1070 //YarpBroker connector;
1071 //connector.init();
1072
1073 if( !connector.connect(connections[id].from(),
1074 connections[id].to(),
1075 connections[id].carrier(),
1076 connections[id].isPersistent()) )
1077 {
1078 logger->addError(connector.error());
1079 //cout<<connector.error()<<endl;
1080 return false;
1081 }
1082
1083 // setting the connection Qos if specified
1084 return connector.setQos(connections[id].from(),
1085 connections[id].to(),
1086 connections[id].qosFrom(),
1087 connections[id].qosTo());
1088}
1089
1091{
1092 //YarpBroker connector;
1093 //connector.init();
1094 CnnIterator cnn;
1095 for(cnn=connections.begin(); cnn!=connections.end(); cnn++) {
1096 if( !(*cnn).getFromExists() ||
1097 !(*cnn).getToExists() ||
1098 !connector.connect((*cnn).from(), (*cnn).to(),
1099 (*cnn).carrier(), (*cnn).isPersistent()) )
1100 {
1101 logger->addError(connector.error());
1102 //cout<<connector.error()<<endl;
1103 if (bRestricted) {
1104 return false;
1105 }
1106 }
1107
1108 // setting the connection Qos if specified
1109 if(! connector.setQos((*cnn).from(), (*cnn).to(),
1110 (*cnn).qosFrom(), (*cnn).qosTo())) {
1111 if (bRestricted) {
1112 return false;
1113 }
1114 }
1115 }
1116 return true;
1117}
1118
1119bool Manager::disconnect(unsigned int id)
1120{
1121 if(id>=connections.size())
1122 {
1123 logger->addError("Connection id is out of range.");
1124 return false;
1125 }
1126
1127 //YarpBroker connector;
1128 //connector.init();
1129
1130 if( !connector.disconnect(connections[id].from(),
1131 connections[id].to(),
1132 connections[id].carrier()) )
1133 {
1134 logger->addError(connector.error());
1135 //cout<<connector.error()<<endl;
1136 return false;
1137 }
1138
1139 return true;
1140}
1141
1143{
1144 //YarpBroker connector;
1145 //connector.init();
1146 CnnIterator cnn;
1147 for (cnn = connections.begin(); cnn != connections.end(); cnn++) {
1148 if( !connector.disconnect((*cnn).from(), (*cnn).to(), (*cnn).carrier()) )
1149 {
1150 logger->addError(connector.error());
1151 //cout<<connector.error()<<endl;
1152 return false;
1153 }
1154 }
1155 return true;
1156}
1157
1158
1159bool Manager::rmconnect(unsigned int id)
1160{
1161 if(id>=connections.size())
1162 {
1163 logger->addError("Connection id is out of range.");
1164 return false;
1165 }
1166
1167 if(!connector.rmconnect(connections[id].from(),
1168 connections[id].to()) )
1169 {
1170 logger->addError(connector.error());
1171 return false;
1172 }
1173
1174 return true;
1175}
1176
1177
1179{
1180 CnnIterator cnn;
1181 for (cnn = connections.begin(); cnn != connections.end(); cnn++) {
1182 if( !connector.rmconnect((*cnn).from(), (*cnn).to()) )
1183 {
1184 logger->addError(connector.error());
1185 return false;
1186 }
1187 }
1188 return true;
1189}
1190
1191
1192bool Manager::connected(unsigned int id)
1193{
1194 if(id>=connections.size())
1195 {
1196 logger->addError("Connection id is out of range.");
1197 return false;
1198 }
1199
1200 return connections[id].getFromExists() &&
1201 connections[id].getToExists() &&
1202 connector.connected(connections[id].from(),
1203 connections[id].to(),
1204 connections[id].carrier());
1205}
1206
1207
1209{
1210 //YarpBroker connector;
1211 //connector.init();
1212 CnnIterator cnn;
1213 bool bConnected = true;
1214 for (cnn = connections.begin(); cnn != connections.end(); cnn++) {
1215 if (!(*cnn).getFromExists() || !(*cnn).getToExists() || !connector.connected((*cnn).from(), (*cnn).to(), (*cnn).carrier())) {
1216 bConnected = false;
1217 }
1218 }
1219 return bConnected;
1220}
1221
1222bool Manager::checkPortsAvailable(Broker* broker)
1223{
1224 CnnIterator itr;
1225 for(itr=connections.begin(); itr!=connections.end(); itr++)
1226 {
1227 //if(!(*itr).owner() )
1228 // {
1229 if (!broker->exists((*itr).to()) || !broker->exists((*itr).from())) {
1230 return false;
1231 }
1232 // }
1233 }
1234 return true;
1235}
1236
1237
1238bool Manager::connectExtraPorts()
1239{
1240 //YarpBroker connector;
1241 //connector.init();
1242
1243 double base = yarp::os::SystemClock::nowSystem();
1244 while (!timeout(base, 10.0)) {
1245 if (checkPortsAvailable(&connector)) {
1246 break;
1247 }
1248 }
1249
1250 CnnIterator cnn;
1251 for(cnn=connections.begin(); cnn!=connections.end(); cnn++)
1252 {
1253 //if(!(*cnn).owner() )
1254 //{
1255 if( !connector.connect((*cnn).from(), (*cnn).to(),
1256 (*cnn).carrier()) )
1257 {
1258 logger->addError(connector.error());
1259 //cout<<connector.error()<<endl;
1260 return false;
1261 }
1262 //}
1263 }
1264 return true;
1265}
1266
1267bool Manager::running(unsigned int id)
1268{
1269 if(id>=runnables.size())
1270 {
1271 logger->addError("Module id is out of range.");
1272 return false;
1273 }
1274
1275 RSTATE st = runnables[id]->state();
1276 if ((st == RUNNING) || (st == CONNECTING) || (st == DYING)) {
1277 return true;
1278 }
1279 return false;
1280}
1281
1282
1283bool Manager::allRunning()
1284{
1285 if (!runnables.size()) {
1286 return false;
1287 }
1289 for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1290 {
1291 RSTATE st = (*itr)->state();
1292 if ((st != RUNNING) && (st != CONNECTING) && (st != DYING)) {
1293 return false;
1294 }
1295 }
1296 return true;
1297}
1298
1299
1300bool Manager::suspended(unsigned int id)
1301{
1302 if(id>=runnables.size())
1303 {
1304 logger->addError("Module id is out of range.");
1305 return false;
1306 }
1307 RSTATE st = runnables[id]->state();
1308 if ((st == SUSPENDED) || (st == DEAD)) {
1309 return true;
1310 }
1311 return false;
1312}
1313
1314
1315bool Manager::allStopped()
1316{
1317 if (!runnables.size()) {
1318 return true;
1319 }
1321 for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1322 {
1323 RSTATE st = (*itr)->state();
1324 if ((st != SUSPENDED) && (st != DEAD)) {
1325 return false;
1326 }
1327 }
1328 return true;
1329}
1330
1331bool Manager::attachStdout(unsigned int id)
1332{
1333 if(id>=runnables.size())
1334 {
1335 logger->addError("Module id is out of range.");
1336 return false;
1337 }
1338
1339 if(!runnables[id]->getBroker()->attachStdout())
1340 {
1341 OSTRINGSTREAM msg;
1342 msg<<"Cannot attach to stdout of "<<runnables[id]->getCommand();
1343 msg<<" on "<<runnables[id]->getHost();
1344 msg<<". (State: "<<runnables[id]->state();
1345 msg<<", paramete: "<<runnables[id]->getParam()<<") ";
1346 msg<<"because "<<runnables[id]->getBroker()->error();
1347 logger->addError(msg);
1348 return false;
1349 }
1350 return true;
1351}
1352
1353bool Manager::detachStdout(unsigned int id)
1354{
1355 if(id>=runnables.size())
1356 {
1357 logger->addError("Module id is out of range.");
1358 return false;
1359 }
1360
1361 runnables[id]->getBroker()->detachStdout();
1362 return true;
1363}
1364
1365bool Manager::timeout(double base, double t)
1366{
1368 if ((yarp::os::SystemClock::nowSystem() - base) > t) {
1369 return true;
1370 }
1371 return false;
1372}
1373
1374
1375void Manager::onExecutableStart(void* which) {}
1376void Manager::onExecutableStop(void* which) {}
1377void Manager::onCnnStablished(void* which) {}
1378void Manager::onExecutableDied(void* which) {}
1380void Manager::onExecutableStdout(void* which, const char* msg) {}
1381void Manager::onCnnFailed(void* which) {}
1382void Manager::onError(void* which) {}
1383
1384
1385/*
1386bool Manager::loadModule(const char* szModule, const char* szHost)
1387{
1388 __CHECK_NULLPTR(szModule);
1389 strAppName = szModule;
1390
1391 SingleAppLoader appLoader(szModule, szHost);
1392 if(!appLoader.init())
1393 {
1394 logger->addError("Error initializing SingleAppLoader.");
1395 return false;
1396 }
1397
1398 if(!createKnowledgeBase(appLoader))
1399 {
1400 logger->addError("Cannot create knowledge base");
1401 return false;
1402 }
1403
1404 return prepare();
1405
1406}
1407*/
float t
bool ret
#define yError(...)
Definition Log.h:361
Class Application.
Class Broker.
Definition broker.h:30
void setDisplay(const char *szDisplay)
Definition broker.h:60
virtual bool exists(const std::string &port)=0
void addError(const char *szError)
Definition utility.cpp:126
void addWarning(const char *szWarning)
Definition utility.cpp:104
static ErrorLogger * Instance()
Singleton class ErrorLogger.
Definition utility.cpp:98
Class Executable.
Definition executable.h:71
void setParam(const char *val)
Definition executable.h:82
void setWorkDir(const char *val)
Definition executable.h:85
void setHost(const char *val)
Definition executable.h:83
void setStdio(const char *val)
Definition executable.h:84
void setEnv(const char *val)
Definition executable.h:86
void setAndInitializeBroker(Broker *_broker)
void setAvailability(bool flag)
Definition resource.h:22
void setName(const char *szName)
Definition resource.h:27
Module * getModule(const char *szName)
Definition kbase.h:81
bool removeModule(Module *module)
Definition kbase.cpp:177
const CnnContainer & getSelConnection()
Definition kbase.h:68
const ModulePContainer & getSelModules()
Definition kbase.h:67
GenericResource * getResource(const char *szName)
Definition kbase.h:89
bool removeResource(GenericResource *resource)
Definition kbase.cpp:182
const ResourcePContainer & getSelResources()
Definition kbase.h:69
bool reasolveDependency(const char *szName, bool bAutoDependancy=false, bool bSilent=false)
Definition kbase.cpp:894
bool addResource(GenericResource *resource)
Definition kbase.cpp:144
Node * getNode(std::string appName)
Definition kbase.cpp:1664
Application * getApplication()
Definition kbase.h:84
bool addModule(Module *module)
Definition kbase.cpp:120
bool removeApplication(Application *application)
Definition kbase.cpp:172
bool createFrom(ModuleLoader *_mloader, AppLoader *_apploader, ResourceLoader *_resloader)
Definition kbase.cpp:19
const ResourcePContainer & getResources(Application *parent=nullptr)
Definition kbase.cpp:261
bool addApplication(Application *application, char **szAppName_=nullptr, bool modifyName=false)
Definition kbase.cpp:76
bool saveApplication(AppSaver *appSaver, Application *application)
Definition kbase.cpp:1304
Class LocalBroker.
Definition localbroker.h:33
void onExecutableStdout(void *which, const char *msg) override
Definition manager.cpp:1380
void onExecutableDied(void *which) override
Definition manager.cpp:1378
void onExecutableStop(void *which) override
Definition manager.cpp:1376
Manager(bool withWatchDog=false)
Class Manager.
Definition manager.cpp:38
bool removeApplication(const char *szFileName, const char *szAppName)
Definition manager.cpp:184
bool updateResource(const char *szName)
Definition manager.cpp:606
bool detachStdout(unsigned int id)
Definition manager.cpp:1353
void onCnnFailed(void *which) override
Definition manager.cpp:1381
bool addModule(const char *szFileName)
Definition manager.cpp:127
bool updateConnection(unsigned int id, const char *from, const char *to, const char *carrier)
Definition manager.cpp:456
bool attachStdout(unsigned int id)
Definition manager.cpp:1331
bool removeModule(const char *szModName)
Definition manager.cpp:202
bool existPortFrom(unsigned int id)
Definition manager.cpp:731
bool addModules(const char *szPath)
Definition manager.cpp:141
bool addResources(const char *szPath)
Definition manager.cpp:170
Node * getNode(std::string appName)
Definition manager.cpp:483
bool addApplication(const char *szFileName, char **szAppName_=nullptr, bool modifyName=false)
Definition manager.cpp:90
bool exist(unsigned int id)
Definition manager.cpp:489
bool existPortTo(unsigned int id)
Definition manager.cpp:752
void onError(void *which) override
Definition manager.cpp:1382
bool switchBroker(size_t id)
Definition manager.cpp:295
bool addResource(const char *szFileName)
Definition manager.cpp:155
bool updateExecutable(unsigned int id, const char *szparam, const char *szhost, const char *szstdio, const char *szworkdir, const char *szenv)
Definition manager.cpp:430
bool waitingModuleKill(unsigned int id)
Definition manager.cpp:711
bool waitingModuleStop(unsigned int id)
Definition manager.cpp:693
bool removeResource(const char *szResName)
Definition manager.cpp:220
bool addApplications(const char *szPath)
Definition manager.cpp:110
void onExecutableFailed(void *which) override
Definition manager.cpp:1379
bool loadApplication(const char *szAppName)
Definition manager.cpp:240
bool saveApplication(const char *szAppName, const char *fileName=nullptr)
Definition manager.cpp:266
void onCnnStablished(void *which) override
Definition manager.cpp:1377
bool waitingModuleRun(unsigned int id)
Definition manager.cpp:673
Executable * getExecutableById(size_t id)
Definition manager.cpp:283
void onExecutableStart(void *which) override
Definition manager.cpp:1375
Class Module.
Definition module.h:99
const char * getBroker()
Definition module.h:139
const char * getHost()
Definition module.h:131
a Node of a Graph
Definition node.h:64
void setPort(const char *szPort)
Class XmlAppLoader.
Application * getNextApplication() override
Class XmlAppSaver.
Definition xmlappsaver.h:19
Class XmlModLoader.
Module * getNextModule() override
Class XmlResLoader.
GenericResource * getNextResource() override
bool rmconnect(const std::string &from, const std::string &to)
bool getAllPorts(std::vector< std::string > &stingList)
bool connected(const std::string &from, const std::string &to, const std::string &carrier) override
bool setQos(const std::string &from, const std::string &to, const std::string &qosFrom, const std::string &qosTo)
bool connect(const std::string &from, const std::string &to, const std::string &carrier, bool persist=false) override
connection broker
std::string error() override
bool disconnect(const std::string &from, const std::string &to, const std::string &carrier) override
bool exists(const std::string &port) override
bool getSystemInfo(const std::string &server, yarp::os::SystemInfoSerializer &info)
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
size_type size() const
Gets the number of elements in the bottle.
Definition Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition Bottle.cpp:246
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition Bottle.cpp:170
static double nowSystem()
static void delaySystem(double seconds)
A helper class to pass the SystemInfo object around the YARP network.
virtual bool asBool() const
Get boolean value.
Definition Value.cpp:186
static NameClient & getNameClient()
Get an instance of the name client.
#define KILL_TIMEOUT
#define RUN_TIMEOUT
#define STOP_TIMEOUT
#define BROKER_YARPDEV
Definition manager.cpp:28
#define BROKER_YARPRUN
Definition manager.cpp:27
bool compareString(const char *szFirst, const char *szSecond)
Definition utility.cpp:326
std::vector< Executable * >::iterator ExecutablePIterator
Definition executable.h:167
enum yarp::manager::__RSTATE RSTATE
std::vector< GenericResource * >::iterator ResourcePIterator
Definition resource.h:59
std::vector< Connection >::iterator CnnIterator
std::stringstream OSTRINGSTREAM
Definition utility.h:50
std::vector< GenericResource * > ResourcePContainer
Definition resource.h:58
std::vector< Module * >::iterator ModulePIterator
Definition module.h:231
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition Time.cpp:121
#define __CHECK_NULLPTR(_ptr)
Definition ymm-types.h:77