@ -83,9 +83,6 @@ Cluster::Cluster(
_id ( id ) ,
_zeroTierPhysicalEndpoints ( zeroTierPhysicalEndpoints ) ,
_members ( new _Member [ ZT_CLUSTER_MAX_MEMBERS ] ) ,
_peerAffinities ( 65536 ) ,
_lastCleanedPeerAffinities ( 0 ) ,
_lastCheckedPeersForAnnounce ( 0 ) ,
_lastFlushed ( 0 ) ,
_lastCleanedRemotePeers ( 0 )
{
@ -231,7 +228,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
unsigned int qc = 0 ;
{
Mutex : : Lock _l ( _sendViaClusterQueue_m ) ;
std : : pair < std : : multimap < Address , _SQE * > : : iterator , std : : multimap < Address , _SQE > : : iterator > er ( _sendViaClusterQueue . equal_range ( id . address ( ) ) ) ;
std : : pair < std : : multimap < Address , _SQE * > : : iterator , std : : multimap < Address , _SQE * > : : iterator > er ( _sendViaClusterQueue . equal_range ( id . address ( ) ) ) ;
for ( std : : multimap < Address , _SQE * > : : iterator qi ( er . first ) ; qi ! = er . second ; ) {
if ( qc > = ZT_CLUSTER_MAX_QUEUE_PER_SENDER ) // sanity check
break ;
@ -252,12 +249,11 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
const Address zeroTierAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
SharedPtr < Peer > peer ( RR - > topology - > getPeerNoCache ( zeroTierAddress ) ) ;
if ( ( peer ) & & ( peer - > hasActiveDirectPath ( RR - > node - > now ( ) ) ) ) {
Buffer < 1024 > buf ;
peer - > identity ( ) . serialize ( buf ) ;
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , CLUSTER_MESSAGE_HAVE_PEER , buf . data ( ) , buf . size ( ) ) ;
_flush ( fromMemberId ) ; // lookups are latency sensitive
}
Buffer < 1024 > buf ;
peer - > identity ( ) . serialize ( buf ) ;
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , CLUSTER_MESSAGE_HAVE_PEER , buf . data ( ) , buf . size ( ) ) ;
_flush ( fromMemberId ) ; // lookups are latency sensitive
}
} break ;
@ -374,10 +370,16 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
}
bool Cluster : : sendViaCluster ( const Address & fromPeerAddress , const Address & toPeerAddress , const void * data , unsigned int len , bool unite )
void Cluster : : sendViaCluster ( const Address & fromPeerAddress , const Address & toPeerAddress , const void * data , unsigned int len , bool unite )
{
if ( len > ZT_PROTO_MAX_PACKET_LENGTH ) // sanity check
return false ;
return ;
_sendViaClusterQueue_m . lock ( ) ;
const unsigned long queueCount = _sendViaClusterQueue . count ( fromPeerAddress ) ;
_sendViaClusterQueue_m . unlock ( ) ;
if ( queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER )
return ;
const uint64_t now = RR - > node - > now ( ) ;
unsigned int canHasPeer = 0 ;
@ -399,6 +401,8 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
const uint64_t age = now - mostRecentTs ;
if ( age > = ( ZT_PEER_ACTIVITY_TIMEOUT / 3 ) ) {
const bool enqueueAndWait = ( ( age > = ZT_PEER_ACTIVITY_TIMEOUT ) | | ( mostRecentMemberId > 0xffff ) ) ;
// Poll everyone with WANT_PEER if the age of our most recent entry is
// approaching expiration (or has expired, or does not exist).
char tmp [ ZT_ADDRESS_LENGTH ] ;
@ -408,18 +412,17 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , CLUSTER_MESSAGE_WANT_PEER , tmp , ZT_ADDRESS_LENGTH ) ;
if ( mostRecentMemberId > 0xffff )
_flush ( * mid ) ; // latency sensitive if we don't have one
if ( ( enqueueAndWait ) & & ( queueCount = = 0 ) )
_flush ( * mid ) ; // send first query immediately to reduce latency
}
}
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
if ( ( age > = ZT_PEER_ACTIVITY_TIMEOUT ) | | ( mostRecentMemberId > 0xffff ) ) {
if ( enqueueAndWait ) {
Mutex : : Lock _l ( _sendViaClusterQueue_m ) ;
if ( _sendViaClusterQueue . count ( fromPeerAddress ) < ZT_CLUSTER_MAX_QUEUE_PER_SENDER )
_sendViaClusterQueue . insert ( std : : pair < Address , _SQE * > ( fromPeerAddress , new _SQE ( now , toPeerAddress , data , len , unite ) ) ) ;
return true ;
_sendViaClusterQueue . insert ( std : : pair < Address , _SQE * > ( fromPeerAddress , new _SQE ( now , toPeerAddress , data , len , unite ) ) ) ;
return ;
}
}
@ -458,8 +461,6 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
}
TRACE ( " sendViaCluster(): relaying %u bytes from %s to %s by way of %u " , len , fromPeerAddress . toString ( ) . c_str ( ) , toPeerAddress . toString ( ) . c_str ( ) , ( unsigned int ) canHasPeer ) ;
return true ;
}
void Cluster : : sendDistributedQuery ( const Packet & pkt )
@ -643,18 +644,20 @@ void Cluster::status(ZT_ClusterStatus &status) const
status . myId = _id ;
ms [ _id ] = & ( status . members [ status . clusterSize + + ] ) ;
ms [ _id ] - > id = _id ;
ms [ _id ] - > alive = 1 ;
ms [ _id ] - > x = _x ;
ms [ _id ] - > y = _y ;
ms [ _id ] - > z = _z ;
ms [ _id ] - > load = 0 ; // TODO
ms [ _id ] - > peers = RR - > topology - > countActive ( ) ;
for ( std : : vector < InetAddress > : : const_iterator ep ( _zeroTierPhysicalEndpoints . begin ( ) ) ; ep ! = _zeroTierPhysicalEndpoints . end ( ) ; + + ep ) {
if ( ms [ _id ] - > numZeroTierPhysicalEndpoints > = ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES ) // sanity check
break ;
memcpy ( & ( ms [ _id ] - > zeroTierPhysicalEndpoints [ ms [ _id ] - > numZeroTierPhysicalEndpoints + + ] ) , & ( * ep ) , sizeof ( struct sockaddr_storage ) ) ;
{
ZT_ClusterMemberStatus * const s = & ( status . members [ status . clusterSize + + ] ) ;
s - > id = _id ;
s - > alive = 1 ;
s - > x = _x ;
s - > y = _y ;
s - > z = _z ;
s - > load = 0 ; // TODO
s - > peers = RR - > topology - > countActive ( ) ;
for ( std : : vector < InetAddress > : : const_iterator ep ( _zeroTierPhysicalEndpoints . begin ( ) ) ; ep ! = _zeroTierPhysicalEndpoints . end ( ) ; + + ep ) {
if ( s - > numZeroTierPhysicalEndpoints > = ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES ) // sanity check
break ;
memcpy ( & ( s - > zeroTierPhysicalEndpoints [ s - > numZeroTierPhysicalEndpoints + + ] ) , & ( * ep ) , sizeof ( struct sockaddr_storage ) ) ;
}
}
{