YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
SubscriberOnSql.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include <cstdlib>
8#include <cstdio>
9
10#include <sqlite3.h>
11
16
17#if !defined(_WIN32)
18#include <unistd.h>
19#else
20#include <io.h>
21#define access(f,a) _access(f,a)
22#endif
23
24#include <vector>
25#include <string>
26
27#ifndef F_OK
28#define F_OK 0
29#endif
30
31#define SQLDB(x) ((sqlite3*)(x))
32
33using namespace yarp::os;
34using namespace yarp::serversql::impl;
35
36namespace {
37YARP_SERVERSQL_LOG_COMPONENT(SUBSCRIBERONSQL, "yarp.serversql.impl.SubscriberOnSql")
38} // namespace
39
40
41bool SubscriberOnSql::open(const std::string& filename, bool fresh) {
42 sqlite3 *db = nullptr;
43 if (fresh) {
44 int result = access(filename.c_str(),F_OK);
45 if (result==0) {
46 yCWarning(SUBSCRIBERONSQL, "Database needs to be recreated.");
47 yCWarning(SUBSCRIBERONSQL, "Please move %s out of the way.", filename.c_str());
48 return false;
49 }
50
51 }
52 int result = sqlite3_open_v2(filename.c_str(),
53 &db,
55 nullptr);
56 if (result!=SQLITE_OK) {
57 yCError(SUBSCRIBERONSQL, "Failed to open database %s", filename.c_str());
58 if (db != nullptr) {
59 sqlite3_close(db);
60 }
61 return false;
62 }
63
64 const char *create_subscribe_table = "CREATE TABLE IF NOT EXISTS subscriptions (\n\
65 id INTEGER PRIMARY KEY,\n\
66 src TEXT,\n\
67 dest TEXT,\n\
68 srcFull TEXT,\n\
69 destFull TEXT,\n\
70 mode TEXT);";
71
72 result = sqlite3_exec(db, create_subscribe_table, nullptr, nullptr, nullptr);
73 if (result!=SQLITE_OK) {
74 sqlite3_close(db);
75 yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
76 std::exit(1);
77 }
78
79 const char *check_subscriptions_size = "PRAGMA table_info(subscriptions)";
80
81 sqlite3_stmt *statement = nullptr;
82 result = sqlite3_prepare_v2(db, check_subscriptions_size, -1, &statement, nullptr);
83 if (result!=SQLITE_OK) {
84 yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
85 std::exit(1);
86 }
87
88 int count = 0;
90 count++;
91 }
93
94 if (count==5) {
95 const char *add_structure = "ALTER TABLE subscriptions ADD COLUMN mode";
96 result = sqlite3_exec(db, add_structure, nullptr, nullptr, nullptr);
97 if (result!=SQLITE_OK) {
98 sqlite3_close(db);
99 yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
100 std::exit(1);
101 }
102 }
103
104 const char *create_topic_table = "CREATE TABLE IF NOT EXISTS topics (\n\
105 id INTEGER PRIMARY KEY,\n\
106 topic TEXT,\n\
107 structure TEXT);";
108
109 result = sqlite3_exec(db, create_topic_table, nullptr, nullptr, nullptr);
110 if (result!=SQLITE_OK) {
111 sqlite3_close(db);
112 yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
113 std::exit(1);
114 }
115
116 const char *check_topic_size = "PRAGMA table_info(topics)";
117
118 statement = nullptr;
119 result = sqlite3_prepare_v2(db, check_topic_size, -1, &statement, nullptr);
120 if (result!=SQLITE_OK) {
121 yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
122 std::exit(1);
123 }
124
125 count = 0;
126 while (sqlite3_step(statement) == SQLITE_ROW) {
127 //sqlite3_column_text(statement,1);
128 count++;
129 }
131
132 if (count==2) {
133 const char *add_structure = "ALTER TABLE topics ADD COLUMN structure";
134 result = sqlite3_exec(db, add_structure, nullptr, nullptr, nullptr);
135 if (result!=SQLITE_OK) {
136 sqlite3_close(db);
137 yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
138 std::exit(1);
139 }
140 }
141
142 const char *create_live_table = "CREATE TABLE IF NOT EXISTS live (\n\
143 id INTEGER PRIMARY KEY,\n\
144 name TEXT UNIQUE,\n\
145 stamp DATETIME);";
146
147 result = sqlite3_exec(db, create_live_table, nullptr, nullptr, nullptr);
148 if (result!=SQLITE_OK) {
149 sqlite3_close(db);
150 yCError(SUBSCRIBERONSQL, "Failed to set up live table");
151 std::exit(1);
152 }
153
154 const char *create_struct_table = "CREATE TABLE IF NOT EXISTS structures (\n\
155 name TEXT PRIMARY KEY,\n\
156 yarp TEXT);";
157
158 result = sqlite3_exec(db, create_struct_table, nullptr, nullptr, nullptr);
159 if (result!=SQLITE_OK) {
160 sqlite3_close(db);
161 yCError(SUBSCRIBERONSQL, "Failed to set up structures table");
162 std::exit(1);
163 }
164
165 implementation = db;
166 return true;
167}
168
169
171 if (implementation != nullptr) {
172 auto* db = (sqlite3 *)implementation;
173 sqlite3_close(db);
174 implementation = nullptr;
175 }
176 return true;
177}
178
179bool SubscriberOnSql::addSubscription(const std::string& src,
180 const std::string& dest,
181 const std::string& mode) {
182 removeSubscription(src,dest);
184 psrc.apply(src);
185 pdest.apply(dest);
186 if (psrc.getCarrier()=="topic") {
187 setTopic(psrc.getPortName(),"",true);
188 }
189 if (pdest.getCarrier()=="topic") {
190 setTopic(pdest.getPortName(),"",true);
191 }
192 char *msg = nullptr;
193 const char *zmode = mode.c_str();
194 if (mode == "") {
195 zmode = nullptr;
196 }
197 char *query = sqlite3_mprintf("INSERT INTO subscriptions (src,dest,srcFull,destFull,mode) VALUES(%Q,%Q,%Q,%Q,%Q)",
198 psrc.getPortName().c_str(),
199 pdest.getPortName().c_str(),
200 src.c_str(),
201 dest.c_str(),
202 zmode);
203 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
204
205 bool ok = true;
206 int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
207 if (result!=SQLITE_OK) {
208 ok = false;
209 if (msg != nullptr) {
210 yCError(SUBSCRIBERONSQL, "%s", msg);
211 sqlite3_free(msg);
212 }
213 }
215 if (ok) {
216 if (psrc.getCarrier()!="topic") {
217 if (pdest.getCarrier()!="topic") {
218 checkSubscription(psrc.getPortName(),
219 pdest.getPortName(),
220 src,
221 dest,
222 mode);
223 } else {
224 hookup(psrc.getPortName());
225 }
226 } else {
227 if (pdest.getCarrier()!="topic") {
228 hookup(pdest.getPortName());
229 }
230 }
231 }
232 return ok;
233}
234
235bool SubscriberOnSql::removeSubscription(const std::string& src,
236 const std::string& dest) {
238 psrc.apply(src);
239 pdest.apply(dest);
240 char *query = sqlite3_mprintf("DELETE FROM subscriptions WHERE src = %Q AND dest = %Q",
241 psrc.getPortName().c_str(),
242 pdest.getPortName().c_str());
243 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
244
245 int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, nullptr);
246 bool ok = true;
247 if (result!=SQLITE_OK) {
248 yCError(SUBSCRIBERONSQL, "Error in query");
249 ok = false;
250 }
252
253 return ok;
254}
255
256
257bool SubscriberOnSql::welcome(const std::string& port, int activity) {
258 mutex.lock();
259
260 NameSpace *ns = getDelegate();
261 if (ns) {
262 NestedContact nc(port);
263 if (nc.getNestedName().size()>0) {
264 NameStore *store = getStore();
265 if (store != nullptr) {
266 Contact node = store->query(nc.getNodeName());
267 Contact me = store->query(port);
268 if (node.isValid() && me.isValid()) {
269 if (activity>0) {
270 ns->registerAdvanced(me,store);
271 } else {
272 ns->unregisterAdvanced(port,store);
273 }
274 }
275 }
276 }
277 }
278
279 char *msg = nullptr;
280 char *query;
281 if (activity>0) {
282 query = sqlite3_mprintf("INSERT OR IGNORE INTO live (name,stamp) VALUES(%Q,DATETIME('now'))",
283 port.c_str());
284 } else {
285 // Port not responding. Mark as non-live.
286 if (activity==0) {
287 query = sqlite3_mprintf("DELETE FROM live WHERE name=%Q AND stamp < DATETIME('now','-30 seconds')",
288 port.c_str());
289 } else {
290 // activity = -1 -- definite dodo
291 query = sqlite3_mprintf("DELETE FROM live WHERE name=%Q",
292 port.c_str());
293 }
294 }
295 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
296
297 bool ok = true;
298 int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
299 if (result!=SQLITE_OK) {
300 ok = false;
301 if (msg != nullptr) {
302 yCError(SUBSCRIBERONSQL, "%s", msg);
303 sqlite3_free(msg);
304 }
305 }
307 mutex.unlock();
308
309 if (activity>0) {
310 hookup(port);
311 } else if (activity<0) {
312 breakdown(port);
313 }
314 return ok;
315}
316
317bool SubscriberOnSql::hookup(const std::string& port) {
318 if (getDelegate()) {
319 NestedContact nc(port);
320 if (nc.getNestedName().size()>0) {
321 return false;
322 }
323 }
324 mutex.lock();
325 sqlite3_stmt *statement = nullptr;
326 char *query = nullptr;
327 //query = sqlite3_mprintf("SELECT * FROM subscriptions WHERE src = %Q OR dest= %Q",port, port);
328 query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull FROM subscriptions WHERE (src = %Q OR dest= %Q) AND EXISTS (SELECT NULL FROM live WHERE name=src) AND EXISTS (SELECT NULL FROM live WHERE name=dest) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic) AND (s1.src = %Q OR s2.dest = %Q) AND EXISTS (SELECT NULL FROM live WHERE name=s1.src) AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)",port.c_str(), port.c_str(), port.c_str(), port.c_str());
329 //
330 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
331
332 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
333 if (result!=SQLITE_OK) {
334 const char *msg = sqlite3_errmsg(SQLDB(implementation));
335 if (msg != nullptr) {
336 yCError(SUBSCRIBERONSQL, "%s", msg);
337 }
338 }
339 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
340 char *src = (char *)sqlite3_column_text(statement,0);
341 char *dest = (char *)sqlite3_column_text(statement,1);
342 char *srcFull = (char *)sqlite3_column_text(statement,2);
343 char *destFull = (char *)sqlite3_column_text(statement,3);
344 char *mode = (char *)sqlite3_column_text(statement,4);
345 checkSubscription(src,dest,srcFull,destFull,mode?mode:"");
346 }
349 mutex.unlock();
350
351 return false;
352}
353
354
355bool SubscriberOnSql::breakdown(const std::string& port) {
356 if (getDelegate()) {
357 NestedContact nc(port);
358 if (nc.getNestedName().size()>0) {
359 return false;
360 }
361 }
362 mutex.lock();
363 sqlite3_stmt *statement = nullptr;
364 char *query = nullptr;
365 // query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=dest)) OR (dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=src))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port, port, port, port);
366 query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=dest))) OR (dest = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=src)))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port.c_str(), port.c_str(), port.c_str(), port.c_str());
367 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
368
369 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
370 if (result!=SQLITE_OK) {
371 const char *msg = sqlite3_errmsg(SQLDB(implementation));
372 if (msg != nullptr) {
373 yCError(SUBSCRIBERONSQL, "%s", msg);
374 }
375 }
376 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
377 char *src = (char *)sqlite3_column_text(statement,0);
378 char *dest = (char *)sqlite3_column_text(statement,1);
379 char *srcFull = (char *)sqlite3_column_text(statement,2);
380 char *destFull = (char *)sqlite3_column_text(statement,3);
381 char *mode = (char *)sqlite3_column_text(statement,4);
382 breakSubscription(port,src,dest,srcFull,destFull,mode?mode:"");
383 }
386 mutex.unlock();
387
388 return false;
389}
390
391
392bool SubscriberOnSql::checkSubscription(const std::string& src,const std::string& dest,
393 const std::string& srcFull,
394 const std::string& destFull,
395 const std::string& mode) {
396 if (getDelegate()) {
397 NestedContact nc(src);
398 if (nc.getNestedName().size()>0) {
399 NestedContact nc(dest);
400 if (nc.getNestedName().size()>0) {
401 return false;
402 }
403 }
404 }
406 "+++ Checking %s %s / %s %s",
407 src.c_str(),
408 dest.c_str(),
409 srcFull.c_str(),
410 destFull.c_str());
411
412 NameStore *store = getStore();
413 if (store != nullptr) {
414 Contact csrc = store->query(src);
415 Contact cdest = store->query(dest);
416 if (csrc.isValid()&&cdest.isValid()) {
417 bool srcTopic = (csrc.getCarrier()=="topic");
418 bool destTopic = (cdest.getCarrier()=="topic");
419 if (!(srcTopic||destTopic)) {
421 "++> check connection %s %s",
422 srcFull.c_str(),
423 destFull.c_str());
425 }
426 }
427 if (mode!="") {
428 std::string mode_name = mode;
429 if (mode_name=="from") {
430 if (!csrc.isValid()) {
431 removeSubscription(src,dest);
432 }
433 } else if (mode_name=="to") {
434 if (!cdest.isValid()) {
435 removeSubscription(src,dest);
436 }
437 }
438 }
439 }
440 return false;
441}
442
443
445 const std::string& src, const std::string& dest,
446 const std::string& srcFull,
447 const std::string& destFull,
448 const std::string& mode) {
449 if (getDelegate()) {
450 NestedContact nc(src);
451 if (nc.getNestedName().size()>0) {
452 NestedContact nc(dest);
453 if (nc.getNestedName().size()>0) {
454 return false;
455 }
456 }
457 }
459 "--- Checking %s %s / %s %s",
460 src.c_str(),
461 dest.c_str(),
462 srcFull.c_str(),
463 destFull.c_str());
464 NameStore *store = getStore();
465 if (store != nullptr) {
466 bool srcDrop = std::string(dropper) == src;
467 Contact contact;
468 if (srcDrop) {
469 contact = store->query(src);
470 } else {
471 contact = store->query(dest);
472 }
473 if (contact.isValid()) {
475 "--> check connection %s %s",
476 srcFull.c_str(),
477 destFull.c_str());
479 }
480 if (mode!="") {
481 std::string mode_name = mode;
482 if (mode_name=="from") {
483 if (srcDrop) {
484 removeSubscription(src,dest);
485 }
486 } else if (mode_name=="to") {
487 if (!srcDrop) {
488 removeSubscription(src,dest);
489 }
490 }
491 }
492 }
493 return false;
494}
495
496
497
498bool SubscriberOnSql::listSubscriptions(const std::string& port,
499 yarp::os::Bottle& reply) {
500 mutex.lock();
501 sqlite3_stmt *statement = nullptr;
502 char *query = nullptr;
503 if (std::string(port)!="") {
504 query = sqlite3_mprintf("SELECT s.srcFull, s.DestFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s WHERE s.src = %Q OR s.dest= %Q ORDER BY s.src, s.dest",port.c_str(),port.c_str());
505 } else {
506 query = sqlite3_mprintf("SELECT s.srcFull, s.destFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s ORDER BY s.src, s.dest");
507 }
508 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
509
510 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
511 if (result!=SQLITE_OK) {
512 const char *msg = sqlite3_errmsg(SQLDB(implementation));
513 if (msg != nullptr) {
514 yCError(SUBSCRIBERONSQL, "%s", msg);
515 }
516 }
517 reply.addString("subscriptions");
518 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
519 char *src = (char *)sqlite3_column_text(statement,0);
520 char *dest = (char *)sqlite3_column_text(statement,1);
523 char *mode = (char *)sqlite3_column_text(statement,4);
524 Bottle& b = reply.addList();
525 b.addString("subscription");
526 Bottle bsrc;
527 bsrc.addString("src");
528 bsrc.addString(src);
530 bdest.addString("dest");
531 bdest.addString(dest);
532 b.addList() = bsrc;
533 b.addList() = bdest;
534 if (mode != nullptr) {
535 if (mode[0]!='\0') {
537 bmode.addString("mode");
538 bmode.addString(mode);
539 b.addList() = bmode;
540 }
541 }
542 if (srcTopic||destTopic) {
544 btopic.addString("topic");
545 btopic.addInt32(srcTopic);
546 btopic.addInt32(destTopic);
547 b.addList() = btopic;
548 }
549 }
552 mutex.unlock();
553
554 return true;
555}
556
557
558bool SubscriberOnSql::setTopic(const std::string& port, const std::string& structure,
559 bool active) {
560 if (structure!="" || !active) {
561 mutex.lock();
562 char *query = sqlite3_mprintf("DELETE FROM topics WHERE topic = %Q",
563 port.c_str());
564 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
565
566 int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, nullptr);
567 bool ok = true;
568 if (result!=SQLITE_OK) {
569 yCError(SUBSCRIBERONSQL, "Error in query");
570 ok = false;
571 }
573 mutex.unlock();
574 if (!ok) {
575 return false;
576 }
577 if (!active) {
578 return true;
579 }
580 }
581
582 bool have_topic = false;
583 if (structure=="") {
584 mutex.lock();
585 sqlite3_stmt *statement = nullptr;
586 char *query = nullptr;
587 query = sqlite3_mprintf("SELECT topic FROM topics WHERE topic = %Q",
588 port.c_str());
589 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
590
591 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
592 if (result!=SQLITE_OK) {
593 yCError(SUBSCRIBERONSQL, "Error in query");
594 }
595 if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
596 have_topic = true;
597 }
600 mutex.unlock();
601 }
602
603 if (structure!="" || !have_topic) {
604 mutex.lock();
605 char *msg = nullptr;
606 const char *pstructure = structure.c_str();
607 if (structure == "") {
608 pstructure = nullptr;
609 }
610 char *query = sqlite3_mprintf("INSERT INTO topics (topic,structure) VALUES(%Q,%Q)",
611 port.c_str(),pstructure);
612 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
613
614 bool ok = true;
615 int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
616 if (result!=SQLITE_OK) {
617 ok = false;
618 if (msg != nullptr) {
619 yCError(SUBSCRIBERONSQL, "%s", msg);
620 sqlite3_free(msg);
621 }
622 }
624 mutex.unlock();
625 if (!ok) {
626 return false;
627 }
628 }
629
630 std::vector<std::vector<std::string> > subs;
631
632 // go ahead and connect anything needed
633 mutex.lock();
634 sqlite3_stmt *statement = nullptr;
635 char *query = sqlite3_mprintf("SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (t.topic = %Q AND s1.dest = t.topic AND s2.src = t.topic)", port.c_str());
636 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
637
638 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
639 if (result!=SQLITE_OK) {
640 const char *msg = sqlite3_errmsg(SQLDB(implementation));
641 if (msg != nullptr) {
642 yCError(SUBSCRIBERONSQL, "%s", msg);
643 }
644 }
645 while (result == SQLITE_OK &&
647 char *src = (char *)sqlite3_column_text(statement,0);
648 char *dest = (char *)sqlite3_column_text(statement,1);
649 char *srcFull = (char *)sqlite3_column_text(statement,2);
650 char *destFull = (char *)sqlite3_column_text(statement,3);
651 char *mode = (char *)sqlite3_column_text(statement,4);
652 std::vector<std::string> sub;
653 sub.emplace_back(src);
654 sub.emplace_back(dest);
655 sub.emplace_back(srcFull);
656 sub.emplace_back(destFull);
657 sub.emplace_back(mode?mode:"");
658 subs.push_back(sub);
659 }
662 mutex.unlock();
663
664 for (auto& sub : subs) {
665 checkSubscription(sub[0],sub[1],sub[2],sub[3],sub[4]);
666 }
667
668 return true;
669}
670
671
673 mutex.lock();
674 sqlite3_stmt *statement = nullptr;
675 char *query = nullptr;
676 query = sqlite3_mprintf("SELECT topic FROM topics");
677 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
678
679 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
680 if (result!=SQLITE_OK) {
681 yCError(SUBSCRIBERONSQL, "Error in query");
682 }
683 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
684 char *topic = (char *)sqlite3_column_text(statement,0);
685 topics.addString(topic);
686 }
689 mutex.unlock();
690
691 return true;
692}
693
694
695bool SubscriberOnSql::setType(const std::string& family,
696 const std::string& structure,
697 const std::string& value) {
698 mutex.lock();
699 char *msg = nullptr;
700 char *query = sqlite3_mprintf("INSERT OR REPLACE INTO structures (name,%Q) VALUES(%Q,%Q)",
701 family.c_str(),
702 (structure=="") ? nullptr : structure.c_str(),
703 value.c_str());
704 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
705
706 bool ok = true;
707 int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
708 if (result!=SQLITE_OK) {
709 ok = false;
710 if (msg != nullptr) {
711 yCError(SUBSCRIBERONSQL, "%s", msg);
712 sqlite3_free(msg);
713 }
714 }
716 mutex.unlock();
717 return ok;
718}
719
720std::string SubscriberOnSql::getType(const std::string& family,
721 const std::string& structure) {
722 mutex.lock();
723 sqlite3_stmt *statement = nullptr;
724 char *query = nullptr;
725 query = sqlite3_mprintf("SELECT %s FROM structures WHERE name = %Q",
726 family.c_str(), structure.c_str());
727 yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
728
729 int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
730 std::string sresult;
731 if (result!=SQLITE_OK) {
732 yCError(SUBSCRIBERONSQL, "Error in query");
733 }
734 if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
735 sresult = (const char *)sqlite3_column_text(statement,0);
736 }
739 mutex.unlock();
740
741 return sresult;
742}
#define SQLDB(x)
#define F_OK
yarp::os::Contact query(const std::string &name) override
Definition NameService.h:40
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition Bottle.cpp:182
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition Bottle.cpp:170
A mini-server for performing network communication in the background.
Represents how to reach a part of a YARP network.
Definition Contact.h:33
bool isValid() const
Checks if a Contact is tagged as valid.
Definition Contact.cpp:298
An abstract name space for ports.
Definition NameSpace.h:22
virtual Contact unregisterAdvanced(const std::string &name, NameStore *store)
Remove contact information, with access to the contact information of other ports for cross-referenci...
Definition NameSpace.h:103
virtual Contact registerAdvanced(const Contact &contact, NameStore *store)
Record contact information, with access to the contact information of other ports for cross-referenci...
Definition NameSpace.h:92
Abstract interface for a database of port names.
Definition NameStore.h:19
virtual Contact query(const std::string &name)=0
A placeholder for rich contact information.
std::string getNodeName() const
std::string getNestedName() const
bool checkSubscription(const std::string &src, const std::string &dest, const std::string &srcFull, const std::string &destFull, const std::string &mode)
bool removeSubscription(const std::string &src, const std::string &dest) override
bool open(const std::string &filename, bool fresh=false)
bool hookup(const std::string &port)
bool welcome(const std::string &port, int activity) override
bool breakSubscription(const std::string &dropper, const std::string &src, const std::string &dest, const std::string &srcFull, const std::string &destFull, const std::string &mode)
bool addSubscription(const std::string &src, const std::string &dest, const std::string &mode) override
bool listSubscriptions(const std::string &port, yarp::os::Bottle &reply) override
std::string getType(const std::string &family, const std::string &structure) override
bool breakdown(const std::string &port)
bool setTopic(const std::string &port, const std::string &structure, bool active) override
bool listTopics(yarp::os::Bottle &topics) override
bool setType(const std::string &family, const std::string &structure, const std::string &value) override
void disconnect(const std::string &src, const std::string &dest, bool srcDrop)
Definition Subscriber.h:55
yarp::os::NameStore * getStore()
Definition Subscriber.h:39
yarp::os::NameSpace * getDelegate()
Definition Subscriber.h:106
void connect(const std::string &src, const std::string &dest)
Definition Subscriber.h:49
#define yCError(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_SERVERSQL_LOG_COMPONENT(name, name_string)
An interface to the operating system, including Port based communication.