|
|
|
|
@ -84,7 +84,8 @@ Cluster::Cluster(
|
|
|
|
|
_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), |
|
|
|
|
_members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), |
|
|
|
|
_peerAffinities(65536), |
|
|
|
|
_lastCleanedPeerAffinities(0) |
|
|
|
|
_lastCleanedPeerAffinities(0), |
|
|
|
|
_lastCheckedPeersForAnnounce(0) |
|
|
|
|
{ |
|
|
|
|
uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; |
|
|
|
|
|
|
|
|
|
@ -328,6 +329,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
|
|
|
|
|
|
|
|
|
|
if (haveMatch) { |
|
|
|
|
_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); |
|
|
|
|
_flush(fromMemberId); // we want this to go ASAP, since with port restricted cone NATs success can be timing-sensitive
|
|
|
|
|
RR->sw->send(rendezvousForLocal,true,0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -469,10 +471,45 @@ void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembers
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct _ClusterAnnouncePeers |
|
|
|
|
{ |
|
|
|
|
_ClusterAnnouncePeers(const uint64_t now_,Cluster *parent_) : now(now_),parent(parent_) {} |
|
|
|
|
const uint64_t now; |
|
|
|
|
Cluster *const parent; |
|
|
|
|
inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) |
|
|
|
|
{ |
|
|
|
|
Path *p = peer->getBestPath(now); |
|
|
|
|
if (p) |
|
|
|
|
parent->replicateHavePeer(peer->identity(),p->address()); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
void Cluster::doPeriodicTasks() |
|
|
|
|
{ |
|
|
|
|
const uint64_t now = RR->node->now(); |
|
|
|
|
|
|
|
|
|
// Erase old peer affinity entries just to control table size
|
|
|
|
|
if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) { |
|
|
|
|
_lastCleanedPeerAffinities = now; |
|
|
|
|
Address *k = (Address *)0; |
|
|
|
|
_PA *v = (_PA *)0; |
|
|
|
|
Mutex::Lock _l(_peerAffinities_m); |
|
|
|
|
Hashtable< Address,_PA >::Iterator i(_peerAffinities); |
|
|
|
|
while (i.next(k,v)) { |
|
|
|
|
if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 5)) |
|
|
|
|
_peerAffinities.erase(*k); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Announce peers that we have active direct paths to -- note that we forget paths
|
|
|
|
|
// that other cluster members claim they have, which prevents us from fighting
|
|
|
|
|
// with other cluster members (route flapping) over specific paths.
|
|
|
|
|
if ((now - _lastCheckedPeersForAnnounce) >= (ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD / 4)) { |
|
|
|
|
_lastCheckedPeersForAnnounce = now; |
|
|
|
|
_ClusterAnnouncePeers func(now,this); |
|
|
|
|
RR->topology->eachPeer<_ClusterAnnouncePeers &>(func); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Flush outgoing packet send queue every doPeriodicTasks()
|
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l(_memberIds_m); |
|
|
|
|
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { |
|
|
|
|
@ -506,20 +543,6 @@ void Cluster::doPeriodicTasks()
|
|
|
|
|
_flush(*mid); // does nothing if nothing to flush
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
if ((now - _lastCleanedPeerAffinities) >= (ZT_PEER_ACTIVITY_TIMEOUT * 10)) { |
|
|
|
|
_lastCleanedPeerAffinities = now; |
|
|
|
|
Address *k = (Address *)0; |
|
|
|
|
_PA *v = (_PA *)0; |
|
|
|
|
Mutex::Lock _l(_peerAffinities_m); |
|
|
|
|
Hashtable< Address,_PA >::Iterator i(_peerAffinities); |
|
|
|
|
while (i.next(k,v)) { |
|
|
|
|
if ((now - v->ts) >= (ZT_PEER_ACTIVITY_TIMEOUT * 10)) |
|
|
|
|
_peerAffinities.erase(*k); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Cluster::addMember(uint16_t memberId) |
|
|
|
|
|