@ -83,7 +83,8 @@ Cluster::Cluster(
_id ( id ) ,
_zeroTierPhysicalEndpoints ( zeroTierPhysicalEndpoints ) ,
_members ( new _Member [ ZT_CLUSTER_MAX_MEMBERS ] ) ,
_peerAffinities ( 65536 )
_peerAffinities ( 65536 ) ,
_lastCleanedPeerAffinities ( 0 )
{
uint16_t stmp [ ZT_SHA512_DIGEST_LEN / sizeof ( uint16_t ) ] ;
@ -247,11 +248,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
} break ;
case STATE_MESSAGE_COM : {
/* not currently used so not decoded yet
CertificateOfMembership com ;
ptr + = com . deserialize ( dmsg , ptr ) ;
if ( com ) {
TRACE ( " [%u] COM for %s on %.16llu rev %llu " , ( unsigned int ) fromMemberId , com . issuedTo ( ) . toString ( ) . c_str ( ) , com . networkId ( ) , com . revision ( ) ) ;
}
*/
} break ;
case STATE_MESSAGE_PROXY_UNITE : {
@ -262,12 +265,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i )
ptr + = remotePeerPaths [ i ] . deserialize ( dmsg , ptr ) ;
TRACE ( " [%u] requested proxy unite between local peer %s and remote peer %s " , ( unsigned int ) fromMemberId , localPeerAddress . toString ( ) . c_str ( ) , remotePeerAddress . toString ( ) . c_str ( ) ) ;
TRACE ( " [%u] requested that we unite local %s with remote %s " , ( unsigned int ) fromMemberId , localPeerAddress . toString ( ) . c_str ( ) , remotePeerAddress . toString ( ) . c_str ( ) ) ;
SharedPtr < Peer > localPeer ( RR - > topology - > getPeer ( localPeerAddress ) ) ;
const uint64_t now = RR - > node - > now ( ) ;
SharedPtr < Peer > localPeer ( RR - > topology - > getPeerNoCache ( localPeerAddress , now ) ) ;
if ( ( localPeer ) & & ( numRemotePeerPaths > 0 ) ) {
InetAddress bestLocalV4 , bestLocalV6 ;
localPeer - > getBestActiveAddresses ( RR - > node - > now ( ) , bestLocalV4 , bestLocalV6 ) ;
localPeer - > getBestActiveAddresses ( now , bestLocalV4 , bestLocalV6 ) ;
InetAddress bestRemoteV4 , bestRemoteV6 ;
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i ) {
@ -369,11 +373,11 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
else return false ;
}
Buffer < 2048 > buf ;
Buffer < 1024 > buf ;
if ( unite ) {
InetAddress v4 , v6 ;
if ( fromPeerAddress ) {
SharedPtr < Peer > fromPeer ( RR - > topology - > getPeer ( fromPeerAddress ) ) ;
SharedPtr < Peer > fromPeer ( RR - > topology - > getPeerNoCache ( fromPeerAddress , now ) ) ;
if ( fromPeer )
fromPeer - > getBestActiveAddresses ( now , v4 , v6 ) ;
}
@ -408,7 +412,7 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
void Cluster : : replicateHavePeer ( const Identity & peerId , const InetAddress & physicalAddress )
{
const uint64_t now = RR - > node - > now ( ) ;
{ // Use peer affinity table to track our own last announce time for peers
{
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
_PA & pa = _peerAffinities [ peerId . address ( ) ] ;
if ( pa . mid ! = _id ) {
@ -436,7 +440,7 @@ void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physic
void Cluster : : replicateMulticastLike ( uint64_t nwid , const Address & peerAddress , const MulticastGroup & group )
{
Buffer < 2048 > buf ;
Buffer < 1024 > buf ;
buf . append ( ( uint64_t ) nwid ) ;
peerAddress . appendTo ( buf ) ;
group . mac ( ) . appendTo ( buf ) ;
@ -453,7 +457,7 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co
void Cluster : : replicateCertificateOfNetworkMembership ( const CertificateOfMembership & com )
{
Buffer < 2048 > buf ;
Buffer < 4096 > buf ;
com . serialize ( buf ) ;
TRACE ( " replicating %s COM for %.16llx to all members " , com . issuedTo ( ) . toString ( ) . c_str ( ) , com . networkId ( ) ) ;
{
@ -502,6 +506,20 @@ void Cluster::doPeriodicTasks()
_flush ( * mid ) ; // does nothing if nothing to flush
}
}
{
if ( ( now - _lastCleanedPeerAffinities ) > = ( ZT_PEER_ACTIVITY_TIMEOUT * 10 ) ) {
_lastCleanedPeerAffinities = now ;
Address * k = ( Address * ) 0 ;
_PA * v = ( _PA * ) 0 ;
Mutex : : Lock _l ( _peerAffinities_m ) ;
Hashtable < Address , _PA > : : Iterator i ( _peerAffinities ) ;
while ( i . next ( k , v ) ) {
if ( ( now - v - > ts ) > = ( ZT_PEER_ACTIVITY_TIMEOUT * 10 ) )
_peerAffinities . erase ( * k ) ;
}
}
}
}
void Cluster : : addMember ( uint16_t memberId )
@ -563,7 +581,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
// Find member closest to this peer
const uint64_t now = RR - > node - > now ( ) ;
std : : vector < InetAddress > best ; // initial "best" is for peer to stay put
std : : vector < InetAddress > best ;
const double currentDistance = _dist3d ( _x , _y , _z , px , py , pz ) ;
double bestDistance = ( offload ? 2147483648.0 : currentDistance ) ;
unsigned int bestMember = _id ;
@ -575,7 +593,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
// Consider member if it's alive and has sent us a location and one or more physical endpoints to send peers to
if ( ( ( now - m . lastReceivedAliveAnnouncement ) < ZT_CLUSTER_TIMEOUT ) & & ( ( m . x ! = 0 ) | | ( m . y ! = 0 ) | | ( m . z ! = 0 ) ) & & ( m . zeroTierPhysicalEndpoints . size ( ) > 0 ) ) {
double mdist = _dist3d ( m . x , m . y , m . z , px , py , pz ) ;
const double mdist = _dist3d ( m . x , m . y , m . z , px , py , pz ) ;
if ( mdist < bestDistance ) {
bestDistance = mdist ;
bestMember = * mid ;
@ -585,7 +603,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
}
}
// Suggestion redirection if a closer member was found
// Redirect to a closer member if it has a ZeroTier endpoint address in the same ss_family
for ( std : : vector < InetAddress > : : const_iterator a ( best . begin ( ) ) ; a ! = best . end ( ) ; + + a ) {
if ( a - > ss_family = = peerPhysicalAddress . ss_family ) {
TRACE ( " %s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s " , peerAddress . toString ( ) . c_str ( ) , px , py , pz , currentDistance , bestDistance , bestMember , a - > toString ( ) . c_str ( ) ) ;