Gazebo Transport

API Reference

8.3.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <ignition/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <ignition/msgs/Utility.hh>
72 
73 #include "ignition/transport/config.hh"
74 #include "ignition/transport/Export.hh"
80 
81 namespace ignition
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
124  public: Discovery(const std::string &_pUuid,
125  const int _port,
126  const bool _verbose = false)
127  : port(_port),
128  hostAddr(determineHost()),
129  pUuid(_pUuid),
130  silenceInterval(kDefSilenceInterval),
131  activityInterval(kDefActivityInterval),
132  heartbeatInterval(kDefHeartbeatInterval),
133  connectionCb(nullptr),
134  disconnectionCb(nullptr),
135  verbose(_verbose),
136  initialized(false),
137  numHeartbeatsUninitialized(0),
138  exit(false),
139  enabled(false)
140  {
141  std::string ignIp;
142  if (env("IGN_IP", ignIp) && !ignIp.empty())
143  this->hostInterfaces = {ignIp};
144  else
145  {
146  // Get the list of network interfaces in this host.
147  this->hostInterfaces = determineInterfaces();
148  }
149 
150 #ifdef _WIN32
151  WORD wVersionRequested;
152  WSADATA wsaData;
153 
154  // Request WinSock v2.2.
155  wVersionRequested = MAKEWORD(2, 2);
156  // Load WinSock DLL.
157  if (WSAStartup(wVersionRequested, &wsaData) != 0)
158  {
159  std::cerr << "Unable to load WinSock DLL" << std::endl;
160  return;
161  }
162 #endif
163  for (const auto &netIface : this->hostInterfaces)
164  {
165  auto succeed = this->RegisterNetIface(netIface);
166 
167  // If the IP address that we're selecting as the main IP address of
168  // the host is invalid, we change it to 127.0.0.1 .
169  // This is probably because IGN_IP is set to a wrong value.
170  if (netIface == this->hostAddr && !succeed)
171  {
172  this->RegisterNetIface("127.0.0.1");
173  std::cerr << "Did you set the environment variable IGN_IP with a "
174  << "correct IP address? " << std::endl
175  << " [" << netIface << "] seems an invalid local IP "
176  << "address." << std::endl
177  << " Using 127.0.0.1 as hostname." << std::endl;
178  this->hostAddr = "127.0.0.1";
179  }
180  }
181 
182  // Socket option: SO_REUSEADDR. This options is used only for receiving
183  // data. We can reuse the same socket for receiving multicast data from
184  // multiple interfaces. We will use the socket at position 0 for
185  // receiving data.
186  int reuseAddr = 1;
187  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
189  {
190  std::cerr << "Error setting socket option (SO_REUSEADDR)."
191  << std::endl;
192  return;
193  }
194 
195 #ifdef SO_REUSEPORT
196  // Socket option: SO_REUSEPORT. This options is used only for receiving
197  // data. We can reuse the same socket for receiving multicast data from
198  // multiple interfaces. We will use the socket at position 0 for
199  // receiving data.
200  int reusePort = 1;
201  // cppcheck-suppress ConfigurationNotChecked
202  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
204  {
205  std::cerr << "Error setting socket option (SO_REUSEPORT)."
206  << std::endl;
207  return;
208  }
209 #endif
210  // Bind the first socket to the discovery port.
211  sockaddr_in localAddr;
212  memset(&localAddr, 0, sizeof(localAddr));
213  localAddr.sin_family = AF_INET;
214  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215  localAddr.sin_port = htons(static_cast<u_short>(this->port));
216 
217  if (bind(this->sockets.at(0),
218  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
219  {
220  std::cerr << "Binding to a local port failed." << std::endl;
221  return;
222  }
223 
224  // Set 'mcastAddr' to the multicast discovery group.
225  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
226  this->mcastAddr.sin_family = AF_INET;
227  this->mcastAddr.sin_addr.s_addr =
228  inet_addr(this->kMulticastGroup.c_str());
229  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
230 
232  std::string ignRelay = "";
233  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
234  {
235  relays = transport::split(ignRelay, ':');
236  }
237 
238  // Register all unicast relays.
239  for (auto const &relayAddr : relays)
240  this->AddRelayAddress(relayAddr);
241 
242  if (this->verbose)
243  this->PrintCurrentState();
244  }
245 
247  public: virtual ~Discovery()
248  {
249  // Tell the service thread to terminate.
250  this->exitMutex.lock();
251  this->exit = true;
252  this->exitMutex.unlock();
253 
254  // Wait for the service threads to finish before exit.
255  if (this->threadReception.joinable())
256  this->threadReception.join();
257 
258  // Broadcast a BYE message to trigger the remote cancellation of
259  // all our advertised topics.
260  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
261  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
262 
263  // Close sockets.
264  for (const auto &sock : this->sockets)
265  {
266 #ifdef _WIN32
267  closesocket(sock);
268  WSACleanup();
269 #else
270  close(sock);
271 #endif
272  }
273  }
274 
278  public: void Start()
279  {
280  {
281  std::lock_guard<std::mutex> lock(this->mutex);
282 
283  // The service is already running.
284  if (this->enabled)
285  return;
286 
287  this->enabled = true;
288  }
289 
290  auto now = std::chrono::steady_clock::now();
291  this->timeNextHeartbeat = now;
292  this->timeNextActivity = now;
293 
294  // Start the thread that receives discovery information.
295  this->threadReception = std::thread(&Discovery::RecvMessages, this);
296  }
297 
302  public: bool Advertise(const Pub &_publisher)
303  {
304  {
305  std::lock_guard<std::mutex> lock(this->mutex);
306 
307  if (!this->enabled)
308  return false;
309 
310  // Add the addressing information (local publisher).
311  if (!this->info.AddPublisher(_publisher))
312  return false;
313  }
314 
315  // Only advertise a message outside this process if the scope
316  // is not 'Process'
317  if (_publisher.Options().Scope() != Scope_t::PROCESS)
318  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
319  _publisher);
320 
321  return true;
322  }
323 
334  public: bool Discover(const std::string &_topic) const
335  {
337  bool found;
338  Addresses_M<Pub> addresses;
339 
340  {
341  std::lock_guard<std::mutex> lock(this->mutex);
342 
343  if (!this->enabled)
344  return false;
345 
346  cb = this->connectionCb;
347  }
348 
349  Pub pub;
350  pub.SetTopic(_topic);
351  pub.SetPUuid(this->pUuid);
352 
353  // Send a discovery request.
354  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
355 
356  {
357  std::lock_guard<std::mutex> lock(this->mutex);
358  found = this->info.Publishers(_topic, addresses);
359  }
360 
361  if (found)
362  {
363  // I already have information about this topic.
364  for (const auto &proc : addresses)
365  {
366  for (const auto &node : proc.second)
367  {
368  if (cb)
369  {
370  // Execute the user's callback for a service request. Notice
371  // that we only execute one callback for preventing receive
372  // multiple service responses for a single request.
373  cb(node);
374  }
375  }
376  }
377  }
378 
379  return true;
380  }
381 
384  public: void Register(const MessagePublisher &_pub) const
385  {
386  this->SendMsg(
387  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
388  }
389 
392  public: void Unregister(const MessagePublisher &_pub) const
393  {
394  this->SendMsg(
395  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
396  }
397 
400  public: const TopicStorage<Pub> &Info() const
401  {
402  std::lock_guard<std::mutex> lock(this->mutex);
403  return this->info;
404  }
405 
410  public: bool Publishers(const std::string &_topic,
411  Addresses_M<Pub> &_publishers) const
412  {
413  std::lock_guard<std::mutex> lock(this->mutex);
414  return this->info.Publishers(_topic, _publishers);
415  }
416 
424  public: bool Unadvertise(const std::string &_topic,
425  const std::string &_nUuid)
426  {
427  Pub inf;
428  {
429  std::lock_guard<std::mutex> lock(this->mutex);
430 
431  if (!this->enabled)
432  return false;
433 
434  // Don't do anything if the topic is not advertised by any of my nodes
435  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
436  return true;
437 
438  // Remove the topic information.
439  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
440  }
441 
442  // Only unadvertise a message outside this process if the scope
443  // is not 'Process'.
444  if (inf.Options().Scope() != Scope_t::PROCESS)
445  {
446  this->SendMsg(DestinationType::ALL,
447  msgs::Discovery::UNADVERTISE, inf);
448  }
449 
450  return true;
451  }
452 
455  public: std::string HostAddr() const
456  {
457  std::lock_guard<std::mutex> lock(this->mutex);
458  return this->hostAddr;
459  }
460 
465  public: unsigned int ActivityInterval() const
466  {
467  std::lock_guard<std::mutex> lock(this->mutex);
468  return this->activityInterval;
469  }
470 
476  public: unsigned int HeartbeatInterval() const
477  {
478  std::lock_guard<std::mutex> lock(this->mutex);
479  return this->heartbeatInterval;
480  }
481 
486  public: unsigned int SilenceInterval() const
487  {
488  std::lock_guard<std::mutex> lock(this->mutex);
489  return this->silenceInterval;
490  }
491 
495  public: void SetActivityInterval(const unsigned int _ms)
496  {
497  std::lock_guard<std::mutex> lock(this->mutex);
498  this->activityInterval = _ms;
499  }
500 
504  public: void SetHeartbeatInterval(const unsigned int _ms)
505  {
506  std::lock_guard<std::mutex> lock(this->mutex);
507  this->heartbeatInterval = _ms;
508  }
509 
513  public: void SetSilenceInterval(const unsigned int _ms)
514  {
515  std::lock_guard<std::mutex> lock(this->mutex);
516  this->silenceInterval = _ms;
517  }
518 
523  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
524  {
525  std::lock_guard<std::mutex> lock(this->mutex);
526  this->connectionCb = _cb;
527  }
528 
533  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
534  {
535  std::lock_guard<std::mutex> lock(this->mutex);
536  this->disconnectionCb = _cb;
537  }
538 
542  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
543  {
544  std::lock_guard<std::mutex> lock(this->mutex);
545  this->registrationCb = _cb;
546  }
547 
551  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
552  {
553  std::lock_guard<std::mutex> lock(this->mutex);
554  this->unregistrationCb = _cb;
555  }
556 
558  public: void PrintCurrentState() const
559  {
560  std::lock_guard<std::mutex> lock(this->mutex);
561 
562  std::cout << "---------------" << std::endl;
563  std::cout << std::boolalpha << "Enabled: "
564  << this->enabled << std::endl;
565  std::cout << "Discovery state" << std::endl;
566  std::cout << "\tUUID: " << this->pUuid << std::endl;
567  std::cout << "Settings" << std::endl;
568  std::cout << "\tActivity: " << this->activityInterval
569  << " ms." << std::endl;
570  std::cout << "\tHeartbeat: " << this->heartbeatInterval
571  << "ms." << std::endl;
572  std::cout << "\tSilence: " << this->silenceInterval
573  << " ms." << std::endl;
574  std::cout << "Known information:" << std::endl;
575  this->info.Print();
576 
577  // Used to calculate the elapsed time.
579 
580  std::cout << "Activity" << std::endl;
581  if (this->activity.empty())
582  std::cout << "\t<empty>" << std::endl;
583  else
584  {
585  for (auto &proc : this->activity)
586  {
587  // Elapsed time since the last update from this publisher.
588  std::chrono::duration<double> elapsed = now - proc.second;
589 
590  std::cout << "\t" << proc.first << std::endl;
591  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
592  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
593  << std::endl;
594  }
595  }
596  std::cout << "---------------" << std::endl;
597  }
598 
601  public: void TopicList(std::vector<std::string> &_topics) const
602  {
603  this->WaitForInit();
604  std::lock_guard<std::mutex> lock(this->mutex);
605  this->info.TopicList(_topics);
606  }
607 
610  public: void WaitForInit() const
611  {
612  std::unique_lock<std::mutex> lk(this->mutex);
613 
614  if (!this->initialized)
615  {
616  this->initializedCv.wait(lk, [this]{return this->initialized;});
617  }
618  }
619 
623  private: void UpdateActivity()
624  {
625  // The UUIDs of the processes that have expired.
627 
628  // A copy of the disconnection callback.
629  DiscoveryCallback<Pub> disconnectCb;
630 
632 
633  {
634  std::lock_guard<std::mutex> lock(this->mutex);
635 
636  if (now < this->timeNextActivity)
637  return;
638 
639  disconnectCb = this->disconnectionCb;
640 
641  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
642  {
643  // Elapsed time since the last update from this publisher.
644  auto elapsed = now - it->second;
645 
646  // This publisher has expired.
647  if (std::chrono::duration_cast<std::chrono::milliseconds>
648  (elapsed).count() > this->silenceInterval)
649  {
650  // Remove all the info entries for this process UUID.
651  this->info.DelPublishersByProc(it->first);
652 
653  uuids.push_back(it->first);
654 
655  // Remove the activity entry.
656  this->activity.erase(it++);
657  }
658  else
659  ++it;
660  }
661 
662  this->timeNextActivity = std::chrono::steady_clock::now() +
663  std::chrono::milliseconds(this->activityInterval);
664  }
665 
666  if (!disconnectCb)
667  return;
668 
669  // Notify without topic information. This is useful to inform the
670  // client that a remote node is gone, even if we were not
671  // interested in its topics.
672  for (auto const &uuid : uuids)
673  {
674  Pub publisher;
675  publisher.SetPUuid(uuid);
676  disconnectCb(publisher);
677  }
678  }
679 
681  private: void UpdateHeartbeat()
682  {
684 
685  {
686  std::lock_guard<std::mutex> lock(this->mutex);
687 
688  if (now < this->timeNextHeartbeat)
689  return;
690  }
691 
692  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
693  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
694 
696  {
697  std::lock_guard<std::mutex> lock(this->mutex);
698 
699  // Re-advertise topics that are advertised inside this process.
700  this->info.PublishersByProc(this->pUuid, nodes);
701  }
702 
703  for (const auto &topic : nodes)
704  {
705  for (const auto &node : topic.second)
706  {
707  this->SendMsg(DestinationType::ALL,
708  msgs::Discovery::ADVERTISE, node);
709  }
710  }
711 
712  {
713  std::lock_guard<std::mutex> lock(this->mutex);
714  if (!this->initialized)
715  {
716  ++this->numHeartbeatsUninitialized;
717  if (this->numHeartbeatsUninitialized == 2)
718  {
719  // We consider the discovery initialized after two cycles of
720  // heartbeats sent.
721  this->initialized = true;
722 
723  // Notify anyone waiting for the initialization phase to finish.
724  this->initializedCv.notify_all();
725  }
726  }
727 
728  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
729  std::chrono::milliseconds(this->heartbeatInterval);
730  }
731  }
732 
742  private: int NextTimeout() const
743  {
744  auto now = std::chrono::steady_clock::now();
745  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
746  auto timeUntilNextActivity = this->timeNextActivity - now;
747 
748  int t = static_cast<int>(
749  std::chrono::duration_cast<std::chrono::milliseconds>
750  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
751  int t2 = std::min(t, this->kTimeout);
752  return std::max(t2, 0);
753  }
754 
756  private: void RecvMessages()
757  {
758  bool timeToExit = false;
759  while (!timeToExit)
760  {
761  // Calculate the timeout.
762  int timeout = this->NextTimeout();
763 
764  if (pollSockets(this->sockets, timeout))
765  {
766  this->RecvDiscoveryUpdate();
767 
768  if (this->verbose)
769  this->PrintCurrentState();
770  }
771 
772  this->UpdateHeartbeat();
773  this->UpdateActivity();
774 
775  // Is it time to exit?
776  {
777  std::lock_guard<std::mutex> lock(this->exitMutex);
778  if (this->exit)
779  timeToExit = true;
780  }
781  }
782  }
783 
785  private: void RecvDiscoveryUpdate()
786  {
787  char rcvStr[Discovery::kMaxRcvStr];
788  sockaddr_in clntAddr;
789  socklen_t addrLen = sizeof(clntAddr);
790 
791  uint16_t received = recvfrom(this->sockets.at(0),
792  reinterpret_cast<raw_type *>(rcvStr),
793  this->kMaxRcvStr, 0,
794  reinterpret_cast<sockaddr *>(&clntAddr),
795  reinterpret_cast<socklen_t *>(&addrLen));
796  if (received > 0)
797  {
798  uint16_t len = 0;
799  memcpy(&len, &rcvStr[0], sizeof(len));
800 
801  // Ignition Transport delimits each discovery message with a
802  // frame_delimiter that contains byte size information.
803  // A discovery message has the form:
804  //
805  // <frame_delimiter><frame_body>
806  //
807  // Ignition Transport version < 8 sends a frame delimiter that
808  // contains the value of sizeof(frame_delimiter)
809  // + sizeof(frame_body). In other words, the frame_delimiter
810  // contains a value that represents the total size of the
811  // frame_body and frame_delimiter in bytes.
812  //
813  // Ignition Transport version >= 8 sends a frame_delimiter
814  // that contains the value of sizeof(frame_body). In other
815  // words, the frame_delimiter contains a value that represents
816  // the total size of only the frame_body.
817  //
818  // It is possible that two incompatible versions of Ignition
819  // Transport exist on the same network. If we receive an
820  // unexpected size, then we ignore the message.
821 
822  // If-condition for version 8+
823  if (len + sizeof(len) == received)
824  {
825  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
826  uint16_t srcPort = ntohs(clntAddr.sin_port);
827 
828  if (this->verbose)
829  {
830  std::cout << "\nReceived discovery update from "
831  << srcAddr << ": " << srcPort << std::endl;
832  }
833 
834  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
835  }
836  }
837  else if (received < 0)
838  {
839  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
840  << std::endl;
841  }
842  }
843 
848  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
849  char *_msg, uint16_t _len)
850  {
851  ignition::msgs::Discovery msg;
852 
853  // Parse the message, and return if parsing failed. Parsing could
854  // fail when another discovery node is publishing messages using an
855  // older (or newer) format.
856  if (!msg.ParseFromArray(_msg, _len))
857  return;
858 
859  // Discard the message if the wire protocol is different than mine.
860  if (this->Version() != msg.version())
861  return;
862 
863  std::string recvPUuid = msg.process_uuid();
864 
865  // Discard our own discovery messages.
866  if (recvPUuid == this->pUuid)
867  return;
868 
869  // Forwarding summary:
870  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
871  // - From multicast group -> to unicast peers (with RELAY flag).
872 
873  // If the RELAY flag is set, this discovery message is coming via a
874  // unicast transmission. In this case, we don't process it, we just
875  // forward it to the multicast group, and it will be dispatched once
876  // received there. Note that we also unset the RELAY flag and set the
877  // NO_RELAY flag, to avoid forwarding the message anymore.
878  if (msg.has_flags() && msg.flags().relay())
879  {
880  // Unset the RELAY flag in the header and set the NO_RELAY.
881  msg.mutable_flags()->set_relay(false);
882  msg.mutable_flags()->set_no_relay(true);
883  this->SendMulticast(msg);
884 
885  // A unicast peer contacted me. I need to save its address for
886  // sending future messages in the future.
887  this->AddRelayAddress(_fromIp);
888  return;
889  }
890  // If we are receiving this discovery message via the multicast channel
891  // and the NO_RELAY flag is not set, we forward this message via unicast
892  // to all our relays. Note that this is the most common case, where we
893  // receive a regular multicast message and we forward it to any remote
894  // relays.
895  else if (!msg.has_flags() || !msg.flags().no_relay())
896  {
897  msg.mutable_flags()->set_relay(true);
898  this->SendUnicast(msg);
899  }
900 
901  // Update timestamp and cache the callbacks.
902  DiscoveryCallback<Pub> connectCb;
903  DiscoveryCallback<Pub> disconnectCb;
904  DiscoveryCallback<Pub> registerCb;
905  DiscoveryCallback<Pub> unregisterCb;
906  {
907  std::lock_guard<std::mutex> lock(this->mutex);
908  this->activity[recvPUuid] = std::chrono::steady_clock::now();
909  connectCb = this->connectionCb;
910  disconnectCb = this->disconnectionCb;
911  registerCb = this->registrationCb;
912  unregisterCb = this->unregistrationCb;
913  }
914 
915  switch (msg.type())
916  {
917  case msgs::Discovery::ADVERTISE:
918  {
919  // Read the rest of the fields.
920  Pub publisher;
921  publisher.SetFromDiscovery(msg);
922 
923  // Check scope of the topic.
924  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
925  (publisher.Options().Scope() == Scope_t::HOST &&
926  _fromIp != this->hostAddr))
927  {
928  return;
929  }
930 
931  // Register an advertised address for the topic.
932  bool added;
933  {
934  std::lock_guard<std::mutex> lock(this->mutex);
935  added = this->info.AddPublisher(publisher);
936  }
937 
938  if (added && connectCb)
939  {
940  // Execute the client's callback.
941  connectCb(publisher);
942  }
943 
944  break;
945  }
946  case msgs::Discovery::SUBSCRIBE:
947  {
948  std::string recvTopic;
949  // Read the topic information.
950  if (msg.has_sub())
951  {
952  recvTopic = msg.sub().topic();
953  }
954  else
955  {
956  std::cerr << "Subscription discovery message is missing "
957  << "Subscriber information.\n";
958  break;
959  }
960 
961  // Check if at least one of my nodes advertises the topic requested.
962  Addresses_M<Pub> addresses;
963  {
964  std::lock_guard<std::mutex> lock(this->mutex);
965  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
966  {
967  break;
968  }
969 
970  if (!this->info.Publishers(recvTopic, addresses))
971  break;
972  }
973 
974  for (const auto &nodeInfo : addresses[this->pUuid])
975  {
976  // Check scope of the topic.
977  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
978  (nodeInfo.Options().Scope() == Scope_t::HOST &&
979  _fromIp != this->hostAddr))
980  {
981  continue;
982  }
983 
984  // Answer an ADVERTISE message.
985  this->SendMsg(DestinationType::ALL,
986  msgs::Discovery::ADVERTISE, nodeInfo);
987  }
988 
989  break;
990  }
991  case msgs::Discovery::NEW_CONNECTION:
992  {
993  // Read the rest of the fields.
994  Pub publisher;
995  publisher.SetFromDiscovery(msg);
996 
997  if (registerCb)
998  registerCb(publisher);
999 
1000  break;
1001  }
1002  case msgs::Discovery::END_CONNECTION:
1003  {
1004  // Read the rest of the fields.
1005  Pub publisher;
1006  publisher.SetFromDiscovery(msg);
1007 
1008  if (unregisterCb)
1009  unregisterCb(publisher);
1010 
1011  break;
1012  }
1013  case msgs::Discovery::HEARTBEAT:
1014  {
1015  // The timestamp has already been updated.
1016  break;
1017  }
1018  case msgs::Discovery::BYE:
1019  {
1020  // Remove the activity entry for this publisher.
1021  {
1022  std::lock_guard<std::mutex> lock(this->mutex);
1023  this->activity.erase(recvPUuid);
1024  }
1025 
1026  if (disconnectCb)
1027  {
1028  Pub pub;
1029  pub.SetPUuid(recvPUuid);
1030  // Notify the new disconnection.
1031  disconnectCb(pub);
1032  }
1033 
1034  // Remove the address entry for this topic.
1035  {
1036  std::lock_guard<std::mutex> lock(this->mutex);
1037  this->info.DelPublishersByProc(recvPUuid);
1038  }
1039 
1040  break;
1041  }
1042  case msgs::Discovery::UNADVERTISE:
1043  {
1044  // Read the address.
1045  Pub publisher;
1046  publisher.SetFromDiscovery(msg);
1047 
1048  // Check scope of the topic.
1049  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1050  (publisher.Options().Scope() == Scope_t::HOST &&
1051  _fromIp != this->hostAddr))
1052  {
1053  return;
1054  }
1055 
1056  if (disconnectCb)
1057  {
1058  // Notify the new disconnection.
1059  disconnectCb(publisher);
1060  }
1061 
1062  // Remove the address entry for this topic.
1063  {
1064  std::lock_guard<std::mutex> lock(this->mutex);
1065  this->info.DelPublisherByNode(publisher.Topic(),
1066  publisher.PUuid(), publisher.NUuid());
1067  }
1068 
1069  break;
1070  }
1071  default:
1072  {
1073  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1074  break;
1075  }
1076  }
1077  }
1078 
1085  private: template<typename T>
1086  void SendMsg(const DestinationType &_destType,
1087  const msgs::Discovery::Type _type,
1088  const T &_pub) const
1089  {
1090  ignition::msgs::Discovery discoveryMsg;
1091  discoveryMsg.set_version(this->Version());
1092  discoveryMsg.set_type(_type);
1093  discoveryMsg.set_process_uuid(this->pUuid);
1094 
1095  switch (_type)
1096  {
1097  case msgs::Discovery::ADVERTISE:
1098  case msgs::Discovery::UNADVERTISE:
1099  case msgs::Discovery::NEW_CONNECTION:
1100  case msgs::Discovery::END_CONNECTION:
1101  {
1102  _pub.FillDiscovery(discoveryMsg);
1103  break;
1104  }
1105  case msgs::Discovery::SUBSCRIBE:
1106  {
1107  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1108  break;
1109  }
1110  case msgs::Discovery::HEARTBEAT:
1111  case msgs::Discovery::BYE:
1112  break;
1113  default:
1114  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1115  << " type [" << _type << "]" << std::endl;
1116  return;
1117  }
1118 
1119  if (_destType == DestinationType::MULTICAST ||
1120  _destType == DestinationType::ALL)
1121  {
1122  this->SendMulticast(discoveryMsg);
1123  }
1124 
1125  // Send the discovery message to the unicast relays.
1126  if (_destType == DestinationType::UNICAST ||
1127  _destType == DestinationType::ALL)
1128  {
1129  // Set the RELAY flag in the header.
1130  discoveryMsg.mutable_flags()->set_relay(true);
1131  this->SendUnicast(discoveryMsg);
1132  }
1133 
1134  if (this->verbose)
1135  {
1136  std::cout << "\t* Sending " << msgs::ToString(_type)
1137  << " msg [" << _pub.Topic() << "]" << std::endl;
1138  }
1139  }
1140 
1143  private: void SendUnicast(const msgs::Discovery &_msg) const
1144  {
1145  uint16_t msgSize;
1146 
1147 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1148  size_t msgSizeFull = _msg.ByteSizeLong();
1149 #else
1150  int msgSizeFull = _msg.ByteSize();
1151 #endif
1152  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1153  {
1154  std::cerr << "Discovery message too large to send. Discovery won't "
1155  << "work. This shouldn't happen.\n";
1156  return;
1157  }
1158  msgSize = msgSizeFull;
1159 
1160  uint16_t totalSize = sizeof(msgSize) + msgSize;
1161  char *buffer = static_cast<char *>(new char[totalSize]);
1162  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1163 
1164  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1165  {
1166  // Send the discovery message to the unicast relays.
1167  for (const auto &sockAddr : this->relayAddrs)
1168  {
1169  auto sent = sendto(this->sockets.at(0),
1170  reinterpret_cast<const raw_type *>(
1171  reinterpret_cast<const unsigned char*>(buffer)),
1172  totalSize, 0,
1173  reinterpret_cast<const sockaddr *>(&sockAddr),
1174  sizeof(sockAddr));
1175 
1176  if (sent != totalSize)
1177  {
1178  std::cerr << "Exception sending a unicast message" << std::endl;
1179  break;
1180  }
1181  }
1182  }
1183  else
1184  {
1185  std::cerr << "Discovery::SendUnicast: Error serializing data."
1186  << std::endl;
1187  }
1188 
1189  delete [] buffer;
1190  }
1191 
1194  private: void SendMulticast(const msgs::Discovery &_msg) const
1195  {
1196  uint16_t msgSize;
1197 
1198 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1199  size_t msgSizeFull = _msg.ByteSizeLong();
1200 #else
1201  int msgSizeFull = _msg.ByteSize();
1202 #endif
1203  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1204  {
1205  std::cerr << "Discovery message too large to send. Discovery won't "
1206  << "work. This shouldn't happen.\n";
1207  return;
1208  }
1209 
1210  msgSize = msgSizeFull;
1211  uint16_t totalSize = sizeof(msgSize) + msgSize;
1212  char *buffer = static_cast<char *>(new char[totalSize]);
1213  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1214 
1215  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1216  {
1217  // Send the discovery message to the multicast group through all the
1218  // sockets.
1219  for (const auto &sock : this->Sockets())
1220  {
1221  errno = 0;
1222  if (sendto(sock, reinterpret_cast<const raw_type *>(
1223  reinterpret_cast<const unsigned char*>(buffer)),
1224  totalSize, 0,
1225  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1226  sizeof(*(this->MulticastAddr()))) != totalSize)
1227  {
1228  // Ignore EPERM and ENOBUFS errors.
1229  //
1230  // See issue #106
1231  //
1232  // Rationale drawn from:
1233  //
1234  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1235  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1236  if (errno != EPERM && errno != ENOBUFS)
1237  {
1238  std::cerr << "Exception sending a multicast message:"
1239  << strerror(errno) << std::endl;
1240  }
1241  break;
1242  }
1243  }
1244  }
1245  else
1246  {
1247  std::cerr << "Discovery::SendMulticast: Error serializing data."
1248  << std::endl;
1249  }
1250 
1251  delete [] buffer;
1252  }
1253 
1256  private: const std::vector<int> &Sockets() const
1257  {
1258  return this->sockets;
1259  }
1260 
1263  private: const sockaddr_in *MulticastAddr() const
1264  {
1265  return &this->mcastAddr;
1266  }
1267 
1270  private: uint8_t Version() const
1271  {
1272  static std::string ignStats;
1273  static int topicStats =
1274  (env("IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats == "1");
1275  return this->kWireVersion + (topicStats * 100);
1276  }
1277 
1282  private: bool RegisterNetIface(const std::string &_ip)
1283  {
1284  // Make a new socket for sending discovery information.
1285  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1286  if (sock < 0)
1287  {
1288  std::cerr << "Socket creation failed." << std::endl;
1289  return false;
1290  }
1291 
1292  // Socket option: IP_MULTICAST_IF.
1293  // This socket option needs to be applied to each socket used to send
1294  // data. This option selects the source interface for outgoing messages.
1295  struct in_addr ifAddr;
1296  ifAddr.s_addr = inet_addr(_ip.c_str());
1297  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1298  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1299  {
1300  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1301  << std::endl;
1302  return false;
1303  }
1304 
1305  this->sockets.push_back(sock);
1306 
1307  // Join the multicast group. We have to do it for each network interface
1308  // but we can do it on the same socket. We will use the socket at
1309  // position 0 for receiving multicast information.
1310  struct ip_mreq group;
1311  group.imr_multiaddr.s_addr =
1312  inet_addr(this->kMulticastGroup.c_str());
1313  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1314  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1315  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1316  {
1317  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1318  << std::endl;
1319  return false;
1320  }
1321 
1322  return true;
1323  }
1324 
1327  private: void AddRelayAddress(const std::string &_ip)
1328  {
1329  // Sanity check: Make sure that this IP address is not already saved.
1330  for (auto const &addr : this->relayAddrs)
1331  {
1332  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1333  return;
1334  }
1335 
1336  sockaddr_in addr;
1337  memset(&addr, 0, sizeof(addr));
1338  addr.sin_family = AF_INET;
1339  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1340  addr.sin_port = htons(static_cast<u_short>(this->port));
1341 
1342  this->relayAddrs.push_back(addr);
1343  }
1344 
1348  private: static const unsigned int kDefActivityInterval = 100;
1349 
1353  private: static const unsigned int kDefHeartbeatInterval = 1000;
1354 
1358  private: static const unsigned int kDefSilenceInterval = 3000;
1359 
1361  private: const std::string kMulticastGroup = "224.0.0.7";
1362 
1364  private: const int kTimeout = 250;
1365 
1367  private: static const uint16_t kMaxRcvStr =
1369 
1372  private: static const uint8_t kWireVersion = 10;
1373 
1375  private: int port;
1376 
1378  private: std::string hostAddr;
1379 
1381  private: std::vector<std::string> hostInterfaces;
1382 
1384  private: std::string pUuid;
1385 
1389  private: unsigned int silenceInterval;
1390 
1394  private: unsigned int activityInterval;
1395 
1399  private: unsigned int heartbeatInterval;
1400 
1402  private: DiscoveryCallback<Pub> connectionCb;
1403 
1405  private: DiscoveryCallback<Pub> disconnectionCb;
1406 
1408  private: DiscoveryCallback<Pub> registrationCb;
1409 
1411  private: DiscoveryCallback<Pub> unregistrationCb;
1412 
1414  private: TopicStorage<Pub> info;
1415 
1421 
1423  private: bool verbose;
1424 
1426  private: std::vector<int> sockets;
1427 
1429  private: sockaddr_in mcastAddr;
1430 
1432  private: std::vector<sockaddr_in> relayAddrs;
1433 
1435  private: mutable std::mutex mutex;
1436 
1438  private: std::thread threadReception;
1439 
1441  private: Timestamp timeNextHeartbeat;
1442 
1444  private: Timestamp timeNextActivity;
1445 
1447  private: std::mutex exitMutex;
1448 
1453  private: bool initialized;
1454 
1456  private: unsigned int numHeartbeatsUninitialized;
1457 
1459  private: mutable std::condition_variable initializedCv;
1460 
1462  private: bool exit;
1463 
1465  private: bool enabled;
1466  };
1467 
1471 
1475  }
1476  }
1477 }
1478 
1479 #endif
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:384
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:551
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition: Discovery.hh:523
bool Publishers(const std::string &_topic, std::map< std::string, std::vector< T >> &_info) const
Get the map of publishers stored for a given topic.
Definition: TopicStorage.hh:207
T lock(T... args)
bool DelPublisherByNode(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid)
Remove a publisher associated to a given topic and UUID pair.
Definition: TopicStorage.hh:222
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
Definition: AdvertiseOptions.hh:28
STL class.
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition: Publisher.hh:44
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1420
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:495
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition: Discovery.hh:465
@ ALL
Topic/service available to any subscriber (default scope).
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
STL class.
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:610
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently stored.
Definition: TopicStorage.hh:339
T strerror(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:400
This class stores all the information about a message publisher.
Definition: Publisher.hh:222
A class for customizing the publication options for a topic or service advertised....
Definition: AdvertiseOptions.hh:59
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:486
T unlock(T... args)
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:247
T push_back(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition: Discovery.hh:476
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:392
T joinable(T... args)
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:558
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
STL class.
T at(T... args)
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition: TopicStorage.hh:262
bool AddPublisher(const T &_publisher)
Add a new address associated to a given topic and node UUID.
Definition: TopicStorage.hh:53
T c_str(T... args)
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:424
T erase(T... args)
STL class.
@ ALL
Send data via unicast and multicast.
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
T min(T... args)
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:513
bool HasAnyPublishers(const std::string &_topic, const std::string &_pUuid) const
Return if there is any publisher stored for the given topic and process UUID.
Definition: TopicStorage.hh:134
T endl(T... args)
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:601
T duration_cast(T... args)
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:455
T boolalpha(T... args)
T cbegin(T... args)
void PublishersByProc(const std::string &_pUuid, std::map< std::string, std::vector< T >> &_pubs) const
Given a process UUID, the function returns the list of publishers contained in this process UUID with...
Definition: TopicStorage.hh:286
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition: Discovery.hh:533
void Print() const
Print all the information for debugging purposes.
Definition: TopicStorage.hh:346
void raw_type
Definition: Discovery.hh:45
T empty(T... args)
@ UNICAST
Send data via unicast only.
STL class.
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:124
T memcpy(T... args)
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:504
T max(T... args)
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:278
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:334
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:542
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:302
T join(T... args)
T memset(T... args)
bool Publisher(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid, T &_publisher) const
Get the address information for a given topic and node UUID.
Definition: TopicStorage.hh:169
@ MULTICAST
Send data via multicast only.
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github....
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:410