18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
33 #include <sys/types.h>
35 #include <sys/socket.h>
39 #include <arpa/inet.h>
43 #include <netinet/in.h>
49 #pragma warning(push, 0)
54 #pragma warning(disable: 4503)
56 #pragma warning(disable: 4996)
59 #include <ignition/msgs/discovery.pb.h>
62 #include <condition_variable>
71 #include <ignition/msgs/Utility.hh>
73 #include "ignition/transport/config.hh"
74 #include "ignition/transport/Export.hh"
86 inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
116 template<
typename Pub>
126 const bool _verbose =
false)
130 silenceInterval(kDefSilenceInterval),
131 activityInterval(kDefActivityInterval),
132 heartbeatInterval(kDefHeartbeatInterval),
133 connectionCb(nullptr),
134 disconnectionCb(nullptr),
137 numHeartbeatsUninitialized(0),
142 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
143 this->hostInterfaces = {ignIp};
151 WORD wVersionRequested;
155 wVersionRequested = MAKEWORD(2, 2);
157 if (WSAStartup(wVersionRequested, &wsaData) != 0)
163 for (
const auto &netIface : this->hostInterfaces)
165 auto succeed = this->RegisterNetIface(netIface);
170 if (netIface == this->hostAddr && !succeed)
172 this->RegisterNetIface(
"127.0.0.1");
173 std::cerr <<
"Did you set the environment variable IGN_IP with a "
175 <<
" [" << netIface <<
"] seems an invalid local IP "
177 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
178 this->hostAddr =
"127.0.0.1";
187 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
190 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
202 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
205 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
211 sockaddr_in localAddr;
212 memset(&localAddr, 0,
sizeof(localAddr));
213 localAddr.sin_family = AF_INET;
214 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215 localAddr.sin_port = htons(
static_cast<u_short
>(this->port));
217 if (bind(this->sockets.at(0),
218 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
225 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
226 this->mcastAddr.sin_family = AF_INET;
227 this->mcastAddr.sin_addr.s_addr =
228 inet_addr(this->kMulticastGroup.c_str());
229 this->mcastAddr.sin_port = htons(
static_cast<u_short
>(this->port));
233 if (
env(
"IGN_RELAY", ignRelay) && !ignRelay.
empty())
239 for (
auto const &relayAddr : relays)
240 this->AddRelayAddress(relayAddr);
243 this->PrintCurrentState();
250 this->exitMutex.lock();
252 this->exitMutex.unlock();
255 if (this->threadReception.joinable())
256 this->threadReception.join();
264 for (
const auto &sock : this->sockets)
287 this->enabled =
true;
291 this->timeNextHeartbeat = now;
292 this->timeNextActivity = now;
295 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
311 if (!this->info.AddPublisher(_publisher))
346 cb = this->connectionCb;
350 pub.SetTopic(_topic);
351 pub.SetPUuid(this->pUuid);
358 found = this->info.Publishers(_topic, addresses);
364 for (
const auto &proc : addresses)
366 for (
const auto &node : proc.second)
414 return this->info.Publishers(_topic, _publishers);
435 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
439 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
447 msgs::Discovery::UNADVERTISE, inf);
458 return this->hostAddr;
468 return this->activityInterval;
479 return this->heartbeatInterval;
489 return this->silenceInterval;
498 this->activityInterval = _ms;
507 this->heartbeatInterval = _ms;
516 this->silenceInterval = _ms;
526 this->connectionCb = _cb;
536 this->disconnectionCb = _cb;
545 this->registrationCb = _cb;
554 this->unregistrationCb = _cb;
568 std::cout <<
"\tActivity: " << this->activityInterval
570 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
572 std::cout <<
"\tSilence: " << this->silenceInterval
581 if (this->activity.empty())
585 for (
auto &proc : this->activity)
605 this->info.TopicList(_topics);
614 if (!this->initialized)
616 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
623 private:
void UpdateActivity()
636 if (now < this->timeNextActivity)
639 disconnectCb = this->disconnectionCb;
641 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
644 auto elapsed = now - it->second;
647 if (std::chrono::duration_cast<std::chrono::milliseconds>
648 (elapsed).count() > this->silenceInterval)
651 this->info.DelPublishersByProc(it->first);
656 this->activity.
erase(it++);
672 for (
auto const &uuid : uuids)
675 publisher.SetPUuid(uuid);
676 disconnectCb(publisher);
681 private:
void UpdateHeartbeat()
688 if (now < this->timeNextHeartbeat)
692 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
700 this->info.PublishersByProc(this->pUuid, nodes);
703 for (
const auto &topic : nodes)
705 for (
const auto &node : topic.second)
708 msgs::Discovery::ADVERTISE, node);
714 if (!this->initialized)
716 ++this->numHeartbeatsUninitialized;
717 if (this->numHeartbeatsUninitialized == 2)
721 this->initialized =
true;
724 this->initializedCv.notify_all();
742 private:
int NextTimeout()
const
745 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
746 auto timeUntilNextActivity = this->timeNextActivity - now;
748 int t =
static_cast<int>(
749 std::chrono::duration_cast<std::chrono::milliseconds>
750 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
751 int t2 =
std::min(t, this->kTimeout);
756 private:
void RecvMessages()
758 bool timeToExit =
false;
762 int timeout = this->NextTimeout();
766 this->RecvDiscoveryUpdate();
769 this->PrintCurrentState();
772 this->UpdateHeartbeat();
773 this->UpdateActivity();
785 private:
void RecvDiscoveryUpdate()
787 char rcvStr[Discovery::kMaxRcvStr];
788 sockaddr_in clntAddr;
789 socklen_t addrLen =
sizeof(clntAddr);
791 uint16_t received = recvfrom(this->sockets.at(0),
792 reinterpret_cast<raw_type *
>(rcvStr),
794 reinterpret_cast<sockaddr *
>(&clntAddr),
795 reinterpret_cast<socklen_t *
>(&addrLen));
799 memcpy(&len, &rcvStr[0],
sizeof(len));
823 if (len +
sizeof(len) == received)
825 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
826 uint16_t srcPort = ntohs(clntAddr.sin_port);
830 std::cout <<
"\nReceived discovery update from "
831 << srcAddr <<
": " << srcPort <<
std::endl;
834 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
837 else if (received < 0)
839 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
848 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
849 char *_msg, uint16_t _len)
851 ignition::msgs::Discovery msg;
856 if (!msg.ParseFromArray(_msg, _len))
860 if (this->Version() != msg.version())
866 if (recvPUuid == this->pUuid)
878 if (msg.has_flags() && msg.flags().relay())
881 msg.mutable_flags()->set_relay(
false);
882 msg.mutable_flags()->set_no_relay(
true);
883 this->SendMulticast(msg);
887 this->AddRelayAddress(_fromIp);
895 else if (!msg.has_flags() || !msg.flags().no_relay())
897 msg.mutable_flags()->set_relay(
true);
898 this->SendUnicast(msg);
902 DiscoveryCallback<Pub> connectCb;
903 DiscoveryCallback<Pub> disconnectCb;
904 DiscoveryCallback<Pub> registerCb;
905 DiscoveryCallback<Pub> unregisterCb;
909 connectCb = this->connectionCb;
910 disconnectCb = this->disconnectionCb;
911 registerCb = this->registrationCb;
912 unregisterCb = this->unregistrationCb;
917 case msgs::Discovery::ADVERTISE:
921 publisher.SetFromDiscovery(msg);
926 _fromIp != this->hostAddr))
935 added = this->info.AddPublisher(publisher);
938 if (added && connectCb)
941 connectCb(publisher);
946 case msgs::Discovery::SUBSCRIBE:
952 recvTopic = msg.sub().topic();
956 std::cerr <<
"Subscription discovery message is missing "
957 <<
"Subscriber information.\n";
962 Addresses_M<Pub> addresses;
965 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
970 if (!this->info.Publishers(recvTopic, addresses))
974 for (
const auto &nodeInfo : addresses[this->pUuid])
979 _fromIp != this->hostAddr))
986 msgs::Discovery::ADVERTISE, nodeInfo);
991 case msgs::Discovery::NEW_CONNECTION:
995 publisher.SetFromDiscovery(msg);
998 registerCb(publisher);
1002 case msgs::Discovery::END_CONNECTION:
1006 publisher.SetFromDiscovery(msg);
1009 unregisterCb(publisher);
1013 case msgs::Discovery::HEARTBEAT:
1018 case msgs::Discovery::BYE:
1023 this->activity.erase(recvPUuid);
1029 pub.SetPUuid(recvPUuid);
1037 this->info.DelPublishersByProc(recvPUuid);
1042 case msgs::Discovery::UNADVERTISE:
1046 publisher.SetFromDiscovery(msg);
1051 _fromIp != this->hostAddr))
1059 disconnectCb(publisher);
1065 this->info.DelPublisherByNode(publisher.Topic(),
1066 publisher.PUuid(), publisher.NUuid());
1073 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1085 private:
template<
typename T>
1087 const msgs::Discovery::Type _type,
1088 const T &_pub)
const
1090 ignition::msgs::Discovery discoveryMsg;
1091 discoveryMsg.set_version(this->Version());
1092 discoveryMsg.set_type(_type);
1093 discoveryMsg.set_process_uuid(this->pUuid);
1097 case msgs::Discovery::ADVERTISE:
1098 case msgs::Discovery::UNADVERTISE:
1099 case msgs::Discovery::NEW_CONNECTION:
1100 case msgs::Discovery::END_CONNECTION:
1102 _pub.FillDiscovery(discoveryMsg);
1105 case msgs::Discovery::SUBSCRIBE:
1107 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1110 case msgs::Discovery::HEARTBEAT:
1111 case msgs::Discovery::BYE:
1114 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
1115 <<
" type [" << _type <<
"]" <<
std::endl;
1122 this->SendMulticast(discoveryMsg);
1130 discoveryMsg.mutable_flags()->set_relay(
true);
1131 this->SendUnicast(discoveryMsg);
1136 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1137 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1143 private:
void SendUnicast(
const msgs::Discovery &_msg)
const
1147 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1148 size_t msgSizeFull = _msg.ByteSizeLong();
1150 int msgSizeFull = _msg.ByteSize();
1152 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1154 std::cerr <<
"Discovery message too large to send. Discovery won't "
1155 <<
"work. This shouldn't happen.\n";
1158 msgSize = msgSizeFull;
1160 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1161 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1162 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1164 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1167 for (
const auto &sockAddr : this->relayAddrs)
1169 auto sent = sendto(this->sockets.at(0),
1170 reinterpret_cast<const raw_type *
>(
1171 reinterpret_cast<const unsigned char*
>(buffer)),
1173 reinterpret_cast<const sockaddr *
>(&sockAddr),
1176 if (sent != totalSize)
1185 std::cerr <<
"Discovery::SendUnicast: Error serializing data."
1194 private:
void SendMulticast(
const msgs::Discovery &_msg)
const
1198 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1199 size_t msgSizeFull = _msg.ByteSizeLong();
1201 int msgSizeFull = _msg.ByteSize();
1203 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1205 std::cerr <<
"Discovery message too large to send. Discovery won't "
1206 <<
"work. This shouldn't happen.\n";
1210 msgSize = msgSizeFull;
1211 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1212 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1213 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1215 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1219 for (
const auto &sock : this->Sockets())
1222 if (sendto(sock,
reinterpret_cast<const raw_type *
>(
1223 reinterpret_cast<const unsigned char*
>(buffer)),
1225 reinterpret_cast<const sockaddr *
>(this->MulticastAddr()),
1226 sizeof(*(this->MulticastAddr()))) != totalSize)
1236 if (errno != EPERM && errno != ENOBUFS)
1238 std::cerr <<
"Exception sending a multicast message:"
1247 std::cerr <<
"Discovery::SendMulticast: Error serializing data."
1258 return this->sockets;
1263 private:
const sockaddr_in *MulticastAddr()
const
1265 return &this->mcastAddr;
1270 private: uint8_t Version()
const
1273 static int topicStats =
1274 (
env(
"IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats ==
"1");
1275 return this->kWireVersion + (topicStats * 100);
1282 private:
bool RegisterNetIface(
const std::string &_ip)
1285 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1295 struct in_addr ifAddr;
1296 ifAddr.s_addr = inet_addr(_ip.
c_str());
1297 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1298 reinterpret_cast<const char*
>(&ifAddr),
sizeof(ifAddr)) != 0)
1300 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1305 this->sockets.push_back(sock);
1310 struct ip_mreq group;
1311 group.imr_multiaddr.s_addr =
1312 inet_addr(this->kMulticastGroup.c_str());
1313 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1314 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1315 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1317 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1327 private:
void AddRelayAddress(
const std::string &_ip)
1330 for (
auto const &addr : this->relayAddrs)
1332 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1337 memset(&addr, 0,
sizeof(addr));
1338 addr.sin_family = AF_INET;
1339 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1340 addr.sin_port = htons(
static_cast<u_short
>(this->port));
1342 this->relayAddrs.push_back(addr);
1348 private:
static const unsigned int kDefActivityInterval = 100;
1353 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1358 private:
static const unsigned int kDefSilenceInterval = 3000;
1361 private:
const std::string kMulticastGroup =
"224.0.0.7";
1364 private:
const int kTimeout = 250;
1367 private:
static const uint16_t kMaxRcvStr =
1372 private:
static const uint8_t kWireVersion = 10;
1389 private:
unsigned int silenceInterval;
1394 private:
unsigned int activityInterval;
1399 private:
unsigned int heartbeatInterval;
1402 private: DiscoveryCallback<Pub> connectionCb;
1405 private: DiscoveryCallback<Pub> disconnectionCb;
1408 private: DiscoveryCallback<Pub> registrationCb;
1411 private: DiscoveryCallback<Pub> unregistrationCb;
1414 private: TopicStorage<Pub> info;
1423 private:
bool verbose;
1429 private: sockaddr_in mcastAddr;
1453 private:
bool initialized;
1456 private:
unsigned int numHeartbeatsUninitialized;
1465 private:
bool enabled;