|
|
|
|
@ -184,6 +184,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
|
|
|
|
|
m.z = dmsg.at<int32_t>(ptr); ptr += 4; |
|
|
|
|
ptr += 8; // skip local clock, not used
|
|
|
|
|
m.load = dmsg.at<uint64_t>(ptr); ptr += 8; |
|
|
|
|
m.peers = dmsg.at<uint64_t>(ptr); ptr += 8; |
|
|
|
|
ptr += 8; // skip flags, unused
|
|
|
|
|
#ifdef ZT_TRACE |
|
|
|
|
std::string addrs; |
|
|
|
|
@ -214,22 +215,23 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
|
|
|
|
|
|
|
|
|
|
case STATE_MESSAGE_HAVE_PEER: { |
|
|
|
|
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; |
|
|
|
|
InetAddress physicalAddress; |
|
|
|
|
ptr += physicalAddress.deserialize(dmsg,ptr); |
|
|
|
|
if (physicalAddress) { |
|
|
|
|
SharedPtr<Peer> myPeerRecord(RR->topology->getPeerNoCache(zeroTierAddress)); |
|
|
|
|
if (myPeerRecord) |
|
|
|
|
myPeerRecord->removePathByAddress(physicalAddress); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l2(_peerAffinities_m); |
|
|
|
|
_PA &pa = _peerAffinities[zeroTierAddress]; |
|
|
|
|
pa.ts = RR->node->now(); |
|
|
|
|
pa.mid = fromMemberId; |
|
|
|
|
} |
|
|
|
|
Mutex::Lock _l2(_peerAffinities_m); |
|
|
|
|
_peerAffinities.set(zeroTierAddress,fromMemberId); |
|
|
|
|
TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); |
|
|
|
|
} break; |
|
|
|
|
|
|
|
|
|
case STATE_MESSAGE_WANT_PEER: { |
|
|
|
|
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; |
|
|
|
|
SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress)); |
|
|
|
|
if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) { |
|
|
|
|
char buf[ZT_ADDRESS_LENGTH]; |
|
|
|
|
peer->address().copyTo(buf,ZT_ADDRESS_LENGTH); |
|
|
|
|
Mutex::Lock _l2(_members[fromMemberId].lock); |
|
|
|
|
_send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); |
|
|
|
|
_flush(fromMemberId); |
|
|
|
|
} |
|
|
|
|
} break; |
|
|
|
|
|
|
|
|
|
case STATE_MESSAGE_MULTICAST_LIKE: { |
|
|
|
|
const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; |
|
|
|
|
const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; |
|
|
|
|
@ -320,7 +322,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
|
|
|
|
|
|
|
|
|
|
if (haveMatch) { |
|
|
|
|
_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); |
|
|
|
|
_flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive
|
|
|
|
|
_flush(fromMemberId); |
|
|
|
|
RR->sw->send(rendezvousForLocal,true,0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -358,12 +360,22 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
|
|
|
|
|
const uint64_t now = RR->node->now(); |
|
|
|
|
unsigned int canHasPeer = 0; |
|
|
|
|
|
|
|
|
|
{ // Anyone got this peer?
|
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l2(_peerAffinities_m); |
|
|
|
|
_PA *pa = _peerAffinities.get(toPeerAddress); |
|
|
|
|
if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT)) |
|
|
|
|
canHasPeer = pa->mid; |
|
|
|
|
else return false; |
|
|
|
|
const unsigned int *pa = _peerAffinities.get(toPeerAddress); |
|
|
|
|
if (!pa) { |
|
|
|
|
char buf[ZT_ADDRESS_LENGTH]; |
|
|
|
|
peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l(_memberIds_m); |
|
|
|
|
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { |
|
|
|
|
Mutex::Lock _l2(_members[*mid].lock); |
|
|
|
|
_send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
canHasPeer = *pa; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Buffer<1024> buf; |
|
|
|
|
@ -402,32 +414,15 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress) |
|
|
|
|
void Cluster::replicateHavePeer(const Identity &peerId) |
|
|
|
|
{ |
|
|
|
|
const uint64_t now = RR->node->now(); |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l2(_peerAffinities_m); |
|
|
|
|
_PA &pa = _peerAffinities[peerId.address()]; |
|
|
|
|
if (pa.mid != _id) { |
|
|
|
|
pa.ts = now; |
|
|
|
|
pa.mid = _id; |
|
|
|
|
// fall through to send code below
|
|
|
|
|
} else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
pa.ts = now; |
|
|
|
|
// fall through to send code below
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Buffer<1024> buf; |
|
|
|
|
peerId.address().appendTo(buf); |
|
|
|
|
physicalAddress.serialize(buf); |
|
|
|
|
char buf[ZT_ADDRESS_LENGTH]; |
|
|
|
|
peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l(_memberIds_m); |
|
|
|
|
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { |
|
|
|
|
Mutex::Lock _l2(_members[*mid].lock); |
|
|
|
|
_send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size()); |
|
|
|
|
_send(*mid,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -465,45 +460,9 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers
|
|
|
|
|
*/ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct _ClusterAnnouncePeers |
|
|
|
|
{ |
|
|
|
|
_ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {} |
|
|
|
|
const uint64_t now; |
|
|
|
|
Cluster *const parent; |
|
|
|
|
inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const |
|
|
|
|
{ |
|
|
|
|
Path *p = peer->getBestPath(now); |
|
|
|
|
if (p) |
|
|
|
|
parent->replicateHavePeer(peer->identity(),p->address()); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
void Cluster::doPeriodicTasks() |
|
|
|
|
{ |
|
|
|
|
const uint64_t now = RR->node->now(); |
|
|
|
|
|
|
|
|
|
// Erase old peer affinity entries just to control table size
|
|
|
|
|
if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) { |
|
|
|
|
_lastCleanedPeerAffinities = now; |
|
|
|
|
Address *k = (Address *)0; |
|
|
|
|
_PA *v = (_PA *)0; |
|
|
|
|
Mutex::Lock _l(_peerAffinities_m); |
|
|
|
|
Hashtable< Address,_PA >::Iterator i(_peerAffinities); |
|
|
|
|
while (i.next(k,v)) { |
|
|
|
|
if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) |
|
|
|
|
_peerAffinities.erase(*k); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Announce peers that we have active direct paths to -- note that we forget paths
|
|
|
|
|
// that other cluster members claim they have, which prevents us from fighting
|
|
|
|
|
// with other cluster members (route flapping) over specific paths.
|
|
|
|
|
if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) { |
|
|
|
|
_lastCheckedPeersForAnnounce = now; |
|
|
|
|
_ClusterAnnouncePeers func(now,this); |
|
|
|
|
RR->topology->eachPeer<_ClusterAnnouncePeers &>(func); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Flush outgoing packet send queue every doPeriodicTasks()
|
|
|
|
|
if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { |
|
|
|
|
_lastFlushed = now; |
|
|
|
|
Mutex::Lock _l(_memberIds_m); |
|
|
|
|
@ -527,6 +486,7 @@ void Cluster::doPeriodicTasks()
|
|
|
|
|
} |
|
|
|
|
alive.append((uint64_t)now); |
|
|
|
|
alive.append((uint64_t)0); // TODO: compute and send load average
|
|
|
|
|
alive.append((uint64_t)RR->topology->countActive()); |
|
|
|
|
alive.append((uint64_t)0); // unused/reserved flags
|
|
|
|
|
alive.append((uint8_t)_zeroTierPhysicalEndpoints.size()); |
|
|
|
|
for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe) |
|
|
|
|
@ -641,8 +601,6 @@ void Cluster::status(ZT_ClusterStatus &status) const
|
|
|
|
|
{ |
|
|
|
|
const uint64_t now = RR->node->now(); |
|
|
|
|
memset(&status,0,sizeof(ZT_ClusterStatus)); |
|
|
|
|
ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS]; |
|
|
|
|
memset(ms,0,sizeof(ms)); |
|
|
|
|
|
|
|
|
|
status.myId = _id; |
|
|
|
|
|
|
|
|
|
@ -652,6 +610,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
|
|
|
|
|
ms[_id]->x = _x; |
|
|
|
|
ms[_id]->y = _y; |
|
|
|
|
ms[_id]->z = _z; |
|
|
|
|
ms[_id]->load = 0; // TODO
|
|
|
|
|
ms[_id]->peers = RR->topology->countActive(); |
|
|
|
|
for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { |
|
|
|
|
if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
|
|
|
|
|
@ -664,10 +623,11 @@ void Cluster::status(ZT_ClusterStatus &status) const
|
|
|
|
|
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { |
|
|
|
|
if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
|
|
|
|
|
break; |
|
|
|
|
ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]); |
|
|
|
|
|
|
|
|
|
_Member &m = _members[*mid]; |
|
|
|
|
Mutex::Lock ml(m.lock); |
|
|
|
|
|
|
|
|
|
ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]); |
|
|
|
|
s->id = *mid; |
|
|
|
|
s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement)); |
|
|
|
|
s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0; |
|
|
|
|
@ -675,6 +635,7 @@ void Cluster::status(ZT_ClusterStatus &status) const
|
|
|
|
|
s->y = m.y; |
|
|
|
|
s->z = m.z; |
|
|
|
|
s->load = m.load; |
|
|
|
|
s->peers = m.peers; |
|
|
|
|
for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) { |
|
|
|
|
if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
|
|
|
|
|
break; |
|
|
|
|
@ -682,17 +643,6 @@ void Cluster::status(ZT_ClusterStatus &status) const
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l2(_peerAffinities_m); |
|
|
|
|
Address *k = (Address *)0; |
|
|
|
|
_PA *v = (_PA *)0; |
|
|
|
|
Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities); |
|
|
|
|
while (i.next(k,v)) { |
|
|
|
|
if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) ) |
|
|
|
|
++ms[v->mid]->peers; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) |
|
|
|
|
|