@ -86,7 +86,8 @@ Cluster::Cluster(
_peerAffinities ( 65536 ) ,
_lastCleanedPeerAffinities ( 0 ) ,
_lastCheckedPeersForAnnounce ( 0 ) ,
_lastFlushed ( 0 )
_lastFlushed ( 0 ) ,
_lastCleanedRemotePeers ( 0 )
{
uint16_t stmp [ ZT_SHA512_DIGEST_LEN / sizeof ( uint16_t ) ] ;
@ -107,6 +108,9 @@ Cluster::~Cluster()
Utils : : burn ( _masterSecret , sizeof ( _masterSecret ) ) ;
Utils : : burn ( _key , sizeof ( _key ) ) ;
delete [ ] _members ;
for ( std : : multimap < Address , _SQE * > : : iterator qi ( _sendViaClusterQueue . begin ( ) ) ; qi ! = _sendViaClusterQueue . end ( ) ; )
delete qi - > second ;
}
void Cluster : : handleIncomingStateMessage ( const void * msg , unsigned int len )
@ -160,222 +164,263 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
return ;
}
{
_Member & m = _members [ fromMemberId ] ;
Mutex : : Lock mlck ( m . lock ) ;
try {
while ( ptr < dmsg . size ( ) ) {
const unsigned int mlen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
const unsigned int nextPtr = ptr + mlen ;
if ( nextPtr > dmsg . size ( ) )
break ;
_Member & m = _members [ fromMemberId ] ;
int mtype = - 1 ;
try {
switch ( ( StateMessageType ) ( mtype = ( int ) dmsg [ ptr + + ] ) ) {
default :
break ;
case STATE_MESSAGE_ALIVE : {
ptr + = 7 ; // skip version stuff, not used yet
m . x = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . y = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . z = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
ptr + = 8 ; // skip local clock, not used
m . load = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
m . peers = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
ptr + = 8 ; // skip flags, unused
# ifdef ZT_TRACE
std : : string addrs ;
# endif
unsigned int physicalAddressCount = dmsg [ ptr + + ] ;
m . zeroTierPhysicalEndpoints . clear ( ) ;
for ( unsigned int i = 0 ; i < physicalAddressCount ; + + i ) {
m . zeroTierPhysicalEndpoints . push_back ( InetAddress ( ) ) ;
ptr + = m . zeroTierPhysicalEndpoints . back ( ) . deserialize ( dmsg , ptr ) ;
if ( ! ( m . zeroTierPhysicalEndpoints . back ( ) ) ) {
m . zeroTierPhysicalEndpoints . pop_back ( ) ;
}
try {
while ( ptr < dmsg . size ( ) ) {
const unsigned int mlen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
const unsigned int nextPtr = ptr + mlen ;
if ( nextPtr > dmsg . size ( ) )
break ;
int mtype = - 1 ;
try {
switch ( ( StateMessageType ) ( mtype = ( int ) dmsg [ ptr + + ] ) ) {
default :
break ;
case CLUSTER_MESSAGE_ALIVE : {
Mutex : : Lock mlck ( m . lock ) ;
ptr + = 7 ; // skip version stuff, not used yet
m . x = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . y = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . z = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
ptr + = 8 ; // skip local clock, not used
m . load = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
m . peers = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
ptr + = 8 ; // skip flags, unused
# ifdef ZT_TRACE
else {
if ( addrs . length ( ) > 0 )
addrs . push_back ( ' , ' ) ;
addrs . append ( m . zeroTierPhysicalEndpoints . back ( ) . toString ( ) ) ;
}
std : : string addrs ;
# endif
unsigned int physicalAddressCount = dmsg [ ptr + + ] ;
m . zeroTierPhysicalEndpoints . clear ( ) ;
for ( unsigned int i = 0 ; i < physicalAddressCount ; + + i ) {
m . zeroTierPhysicalEndpoints . push_back ( InetAddress ( ) ) ;
ptr + = m . zeroTierPhysicalEndpoints . back ( ) . deserialize ( dmsg , ptr ) ;
if ( ! ( m . zeroTierPhysicalEndpoints . back ( ) ) ) {
m . zeroTierPhysicalEndpoints . pop_back ( ) ;
}
# ifdef ZT_TRACE
if ( ( RR - > node - > now ( ) - m . lastReceivedAliveAnnouncement ) > = ZT_CLUSTER_TIMEOUT ) {
TRACE ( " [%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s " , ( unsigned int ) fromMemberId , m . x , m . y , m . z , addrs . c_str ( ) ) ;
else {
if ( addrs . length ( ) > 0 )
addrs . push_back ( ' , ' ) ;
addrs . append ( m . zeroTierPhysicalEndpoints . back ( ) . toString ( ) ) ;
}
# endif
m . lastReceivedAliveAnnouncement = RR - > node - > now ( ) ;
} break ;
case STATE_MESSAGE_HAVE_PEER : {
const Address zeroTierAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
_peerAffinities . set ( zeroTierAddress , fromMemberId ) ;
TRACE ( " [%u] has %s @ %s " , ( unsigned int ) fromMemberId , id . address ( ) . toString ( ) . c_str ( ) , physicalAddress . toString ( ) . c_str ( ) ) ;
} break ;
case STATE_MESSAGE_WANT_PEER : {
const Address zeroTierAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
SharedPtr < Peer > peer ( RR - > topology - > getPeerNoCache ( zeroTierAddress ) ) ;
if ( ( peer ) & & ( peer - > hasActiveDirectPath ( RR - > node - > now ( ) ) ) ) {
char buf [ ZT_ADDRESS_LENGTH ] ;
peer - > address ( ) . copyTo ( buf , ZT_ADDRESS_LENGTH ) ;
}
# ifdef ZT_TRACE
if ( ( RR - > node - > now ( ) - m . lastReceivedAliveAnnouncement ) > = ZT_CLUSTER_TIMEOUT ) {
TRACE ( " [%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s " , ( unsigned int ) fromMemberId , m . x , m . y , m . z , addrs . c_str ( ) ) ;
}
# endif
m . lastReceivedAliveAnnouncement = RR - > node - > now ( ) ;
} break ;
case CLUSTER_MESSAGE_HAVE_PEER : {
Identity id ;
ptr + = id . deserialize ( dmsg , ptr ) ;
if ( id ) {
RR - > topology - > saveIdentity ( id ) ;
{
Mutex : : Lock _l ( _remotePeers_m ) ;
_remotePeers [ std : : pair < Address , unsigned int > ( id . address ( ) , ( unsigned int ) fromMemberId ) ] = RR - > node - > now ( ) ;
}
std : : pair < Address , _SQE * > q [ ZT_CLUSTER_MAX_QUEUE_PER_SENDER ] ;
unsigned int qc = 0 ;
{
Mutex : : Lock _l ( _sendViaClusterQueue_m ) ;
std : : pair < std : : multimap < Address , _SQE * > : : iterator , std : : multimap < Address , _SQE > : : iterator > er ( _sendViaClusterQueue . equal_range ( id . address ( ) ) ) ;
for ( std : : multimap < Address , _SQE * > : : iterator qi ( er . first ) ; qi ! = er . second ; ) {
if ( qc > = ZT_CLUSTER_MAX_QUEUE_PER_SENDER ) // sanity check
break ;
q [ qc + + ] = * qi ;
_sendViaClusterQueue . erase ( qi + + ) ;
}
}
for ( unsigned int i = 0 ; i < qc ; + + i ) {
this - > sendViaCluster ( q [ i ] . first , q [ i ] . second - > toPeerAddress , q [ i ] . second - > data , q [ i ] . second - > len , q [ i ] . second - > unite ) ;
delete q [ i ] . second ;
}
TRACE ( " [%u] has %s " , ( unsigned int ) fromMemberId , id . address ( ) . toString ( ) . c_str ( ) ) ;
}
} break ;
case CLUSTER_MESSAGE_WANT_PEER : {
const Address zeroTierAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
SharedPtr < Peer > peer ( RR - > topology - > getPeerNoCache ( zeroTierAddress ) ) ;
if ( ( peer ) & & ( peer - > hasActiveDirectPath ( RR - > node - > now ( ) ) ) ) {
Buffer < 1024 > buf ;
peer - > identity ( ) . serialize ( buf ) ;
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , STATE_MESSAGE_HAVE_PEER , buf , ZT_ADDRESS_LENGTH ) ;
_flush ( fromMemberId ) ;
_send ( fromMemberId , CLU STER _MESSAGE_HAVE_PEER, buf . data ( ) , buf . size ( ) ) ;
_flush ( fromMemberId ) ; // lookups are latency sensitive
}
} break ;
case STATE_MESSAGE_MULTICAST_LIKE : {
const uint64_t nwid = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
const Address address ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const MAC mac ( dmsg . field ( ptr , 6 ) , 6 ) ; ptr + = 6 ;
const uint32_t adi = dmsg . at < uint32_t > ( ptr ) ; ptr + = 4 ;
RR - > mc - > add ( RR - > node - > now ( ) , nwid , MulticastGroup ( mac , adi ) , address ) ;
TRACE ( " [%u] %s likes %s/%.8x on %.16llx " , ( unsigned int ) fromMemberId , address . toString ( ) . c_str ( ) , mac . toString ( ) . c_str ( ) , ( unsigned int ) adi , nwid ) ;
} break ;
case STATE_MESSAGE_COM : {
/* not currently used so not decoded yet
CertificateOfMembership com ;
ptr + = com . deserialize ( dmsg , ptr ) ;
if ( com ) {
TRACE ( " [%u] COM for %s on %.16llu rev %llu " , ( unsigned int ) fromMemberId , com . issuedTo ( ) . toString ( ) . c_str ( ) , com . networkId ( ) , com . revision ( ) ) ;
}
} break ;
case CLUSTER_MESSAGE_REMOTE_PACKET : {
const unsigned int plen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
if ( plen ) {
Packet remotep ( dmsg . field ( ptr , plen ) , plen ) ; ptr + = plen ;
TRACE ( " remote %s from %s via %u (%u bytes) " , Packet : : verbString ( remotep . verb ( ) ) , remotep . source ( ) . toString ( ) . c_str ( ) , fromMemberId , plen ) ;
switch ( remotep . verb ( ) ) {
case Packet : : VERB_WHOIS : _doREMOTE_WHOIS ( fromMemberId , remotep ) ; break ;
case Packet : : VERB_MULTICAST_GATHER : _doREMOTE_MULTICAST_GATHER ( fromMemberId , remotep ) ; break ;
default : break ; // ignore things we don't care about across cluster
}
*/
} break ;
case STATE_MESSAGE_PROXY_UNITE : {
const Address localPeerAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const Address remotePeerAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const unsigned int numRemotePeerPaths = dmsg [ ptr + + ] ;
InetAddress remotePeerPaths [ 256 ] ; // size is 8-bit, so 256 is max
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i )
ptr + = remotePeerPaths [ i ] . deserialize ( dmsg , ptr ) ;
TRACE ( " [%u] requested that we unite local %s with remote %s " , ( unsigned int ) fromMemberId , localPeerAddress . toString ( ) . c_str ( ) , remotePeerAddress . toString ( ) . c_str ( ) ) ;
const uint64_t now = RR - > node - > now ( ) ;
SharedPtr < Peer > localPeer ( RR - > topology - > getPeerNoCache ( localPeerAddress ) ) ;
if ( ( localPeer ) & & ( numRemotePeerPaths > 0 ) ) {
InetAddress bestLocalV4 , bestLocalV6 ;
localPeer - > getBestActiveAddresses ( now , bestLocalV4 , bestLocalV6 ) ;
InetAddress bestRemoteV4 , bestRemoteV6 ;
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i ) {
if ( ( bestRemoteV4 ) & & ( bestRemoteV6 ) )
}
} break ;
case CLUSTER_MESSAGE_PROXY_UNITE : {
const Address localPeerAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const Address remotePeerAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const unsigned int numRemotePeerPaths = dmsg [ ptr + + ] ;
InetAddress remotePeerPaths [ 256 ] ; // size is 8-bit, so 256 is max
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i )
ptr + = remotePeerPaths [ i ] . deserialize ( dmsg , ptr ) ;
TRACE ( " [%u] requested that we unite local %s with remote %s " , ( unsigned int ) fromMemberId , localPeerAddress . toString ( ) . c_str ( ) , remotePeerAddress . toString ( ) . c_str ( ) ) ;
const uint64_t now = RR - > node - > now ( ) ;
SharedPtr < Peer > localPeer ( RR - > topology - > getPeerNoCache ( localPeerAddress ) ) ;
if ( ( localPeer ) & & ( numRemotePeerPaths > 0 ) ) {
InetAddress bestLocalV4 , bestLocalV6 ;
localPeer - > getBestActiveAddresses ( now , bestLocalV4 , bestLocalV6 ) ;
InetAddress bestRemoteV4 , bestRemoteV6 ;
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i ) {
if ( ( bestRemoteV4 ) & & ( bestRemoteV6 ) )
break ;
switch ( remotePeerPaths [ i ] . ss_family ) {
case AF_INET :
if ( ! bestRemoteV4 )
bestRemoteV4 = remotePeerPaths [ i ] ;
break ;
case AF_INET6 :
if ( ! bestRemoteV6 )
bestRemoteV6 = remotePeerPaths [ i ] ;
break ;
switch ( remotePeerPaths [ i ] . ss_family ) {
case AF_INET :
if ( ! bestRemoteV4 )
bestRemoteV4 = remotePeerPaths [ i ] ;
break ;
case AF_INET6 :
if ( ! bestRemoteV6 )
bestRemoteV6 = remotePeerPaths [ i ] ;
break ;
}
}
}
Packet rendezvousForLocal ( localPeerAddress , RR - > identity . address ( ) , Packet : : VERB_RENDEZVOUS ) ;
rendezvousForLocal . append ( ( uint8_t ) 0 ) ;
remotePeerAddress . appendTo ( rendezvousForLocal ) ;
Buffer < 2048 > rendezvousForRemote ;
remotePeerAddress . appendTo ( rendezvousForRemote ) ;
rendezvousForRemote . append ( ( uint8_t ) Packet : : VERB_RENDEZVOUS ) ;
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote . size ( ) ;
rendezvousForRemote . addSize ( 2 ) ; // space for actual packet payload length
rendezvousForRemote . append ( ( uint8_t ) 0 ) ; // flags == 0
localPeerAddress . appendTo ( rendezvousForRemote ) ;
bool haveMatch = false ;
if ( ( bestLocalV6 ) & & ( bestRemoteV6 ) ) {
haveMatch = true ;
rendezvousForLocal . append ( ( uint16_t ) bestRemoteV6 . port ( ) ) ;
rendezvousForLocal . append ( ( uint8_t ) 16 ) ;
rendezvousForLocal . append ( bestRemoteV6 . rawIpData ( ) , 16 ) ;
rendezvousForRemote . append ( ( uint16_t ) bestLocalV6 . port ( ) ) ;
rendezvousForRemote . append ( ( uint8_t ) 16 ) ;
rendezvousForRemote . append ( bestLocalV6 . rawIpData ( ) , 16 ) ;
rendezvousForRemote . setAt < uint16_t > ( rendezvousForOtherEndPayloadSizePtr , ( uint16_t ) ( 9 + 16 ) ) ;
} else if ( ( bestLocalV4 ) & & ( bestRemoteV4 ) ) {
haveMatch = true ;
rendezvousForLocal . append ( ( uint16_t ) bestRemoteV4 . port ( ) ) ;
rendezvousForLocal . append ( ( uint8_t ) 4 ) ;
rendezvousForLocal . append ( bestRemoteV4 . rawIpData ( ) , 4 ) ;
rendezvousForRemote . append ( ( uint16_t ) bestLocalV4 . port ( ) ) ;
rendezvousForRemote . append ( ( uint8_t ) 4 ) ;
rendezvousForRemote . append ( bestLocalV4 . rawIpData ( ) , 4 ) ;
rendezvousForRemote . setAt < uint16_t > ( rendezvousForOtherEndPayloadSizePtr , ( uint16_t ) ( 9 + 4 ) ) ;
}
Packet rendezvousForLocal ( localPeerAddress , RR - > identity . address ( ) , Packet : : VERB_RENDEZVOUS ) ;
rendezvousForLocal . append ( ( uint8_t ) 0 ) ;
remotePeerAddress . appendTo ( rendezvousForLocal ) ;
Buffer < 2048 > rendezvousForRemote ;
remotePeerAddress . appendTo ( rendezvousForRemote ) ;
rendezvousForRemote . append ( ( uint8_t ) Packet : : VERB_RENDEZVOUS ) ;
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote . size ( ) ;
rendezvousForRemote . addSize ( 2 ) ; // space for actual packet payload length
rendezvousForRemote . append ( ( uint8_t ) 0 ) ; // flags == 0
localPeerAddress . appendTo ( rendezvousForRemote ) ;
bool haveMatch = false ;
if ( ( bestLocalV6 ) & & ( bestRemoteV6 ) ) {
haveMatch = true ;
rendezvousForLocal . append ( ( uint16_t ) bestRemoteV6 . port ( ) ) ;
rendezvousForLocal . append ( ( uint8_t ) 16 ) ;
rendezvousForLocal . append ( bestRemoteV6 . rawIpData ( ) , 16 ) ;
rendezvousForRemote . append ( ( uint16_t ) bestLocalV6 . port ( ) ) ;
rendezvousForRemote . append ( ( uint8_t ) 16 ) ;
rendezvousForRemote . append ( bestLocalV6 . rawIpData ( ) , 16 ) ;
rendezvousForRemote . setAt < uint16_t > ( rendezvousForOtherEndPayloadSizePtr , ( uint16_t ) ( 9 + 16 ) ) ;
} else if ( ( bestLocalV4 ) & & ( bestRemoteV4 ) ) {
haveMatch = true ;
rendezvousForLocal . append ( ( uint16_t ) bestRemoteV4 . port ( ) ) ;
rendezvousForLocal . append ( ( uint8_t ) 4 ) ;
rendezvousForLocal . append ( bestRemoteV4 . rawIpData ( ) , 4 ) ;
rendezvousForRemote . append ( ( uint16_t ) bestLocalV4 . port ( ) ) ;
rendezvousForRemote . append ( ( uint8_t ) 4 ) ;
rendezvousForRemote . append ( bestLocalV4 . rawIpData ( ) , 4 ) ;
rendezvousForRemote . setAt < uint16_t > ( rendezvousForOtherEndPayloadSizePtr , ( uint16_t ) ( 9 + 4 ) ) ;
}
if ( haveMatch ) {
_send ( fromMemberId , STATE_MESSAGE_PROXY_SEND , rendezvousForRemote . data ( ) , rendezvousForRemote . size ( ) ) ;
_flush ( fromMemberId ) ;
RR - > sw - > send ( rendezvousForLocal , true , 0 ) ;
}
if ( haveMatch ) {
_send ( fromMemberId , CLUSTER_MESSAGE_PROXY_SEND , rendezvousForRemote . data ( ) , rendezvousForRemote . size ( ) ) ;
_flush ( fromMemberId ) ;
RR - > sw - > send ( rendezvousForLocal , true , 0 ) ;
}
} break ;
case STATE_MESSAGE_PROXY_SEND : {
const Address rcpt ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const Packet : : Verb verb = ( Packet : : Verb ) dmsg [ ptr + + ] ;
const unsigned int len = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
Packet outp ( rcpt , RR - > identity . address ( ) , verb ) ;
outp . append ( dmsg . field ( ptr , len ) , len ) ; ptr + = len ;
RR - > sw - > send ( outp , true , 0 ) ;
TRACE ( " [%u] proxy send %s to %s length %u " , ( unsigned int ) fromMemberId , Packet : : verbString ( verb ) , rcpt . toString ( ) . c_str ( ) , len ) ;
} break ;
}
} catch ( . . . ) {
TRACE ( " invalid message of size %u type %d (inner decode), discarding " , mlen , mtype ) ;
// drop invalids
}
} break ;
case CLUSTER_MESSAGE_PROXY_SEND : {
const Address rcpt ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const Packet : : Verb verb = ( Packet : : Verb ) dmsg [ ptr + + ] ;
const unsigned int len = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
Packet outp ( rcpt , RR - > identity . address ( ) , verb ) ;
outp . append ( dmsg . field ( ptr , len ) , len ) ; ptr + = len ;
RR - > sw - > send ( outp , true , 0 ) ;
//TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break ;
}
ptr = nextPtr ;
} catch ( . . . ) {
TRACE ( " invalid message of size %u type %d (inner decode), discarding " , mlen , mtype ) ;
// drop invalids
}
} catch ( . . . ) {
TRACE ( " invalid message (outer loop), discarding " ) ;
// drop invalids
ptr = nextPtr ;
}
} catch ( . . . ) {
TRACE ( " invalid message (outer loop), discarding " ) ;
// drop invalids
}
}
bool Cluster : : sendViaCluster ( const Address & fromPeerAddress , const Address & toPeerAddress , const void * data , unsigned int len , bool unite )
{
if ( len > 16384 ) // sanity check
if ( len > ZT_PROTO_MAX_PACKET_LENGTH ) // sanity check
return false ;
const uint64_t now = RR - > node - > now ( ) ;
unsigned int canHasPeer = 0 ;
uint64_t mostRecentTs = 0 ;
unsigned int mostRecentMemberId = 0xffffffff ;
{
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
const unsigned int * pa = _peerAffinities . get ( toPeerAddress ) ;
if ( ! pa ) {
char buf [ ZT_ADDRESS_LENGTH ] ;
peerId . address ( ) . copyTo ( buf , ZT_ADDRESS_LENGTH ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_WANT_PEER , buf , ZT_ADDRESS_LENGTH ) ;
}
Mutex : : Lock _l2 ( _remotePeers_m ) ;
std : : map < std : : pair < Address , unsigned int > , uint64_t > : : const_iterator rpe ( _remotePeers . lower_bound ( std : : pair < Address , unsigned int > ( fromPeerAddress , 0 ) ) ) ;
for ( ; ; ) {
if ( ( rpe = = _remotePeers . end ( ) ) | | ( rpe - > first . first ! = fromPeerAddress ) )
break ;
else if ( rpe - > second > mostRecentTs ) {
mostRecentTs = rpe - > second ;
mostRecentMemberId = rpe - > first . second ;
}
return false ;
}
canHasPeer = * pa ;
}
const uint64_t age = now - mostRecentTs ;
if ( age > = ( ZT_PEER_ACTIVITY_TIMEOUT / 3 ) ) {
// Poll everyone with WANT_PEER if the age of our most recent entry is
// approaching expiration (or has expired, or does not exist).
char tmp [ ZT_ADDRESS_LENGTH ] ;
toPeerAddress . copyTo ( tmp , ZT_ADDRESS_LENGTH ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , CLUSTER_MESSAGE_WANT_PEER , tmp , ZT_ADDRESS_LENGTH ) ;
if ( mostRecentMemberId > 0xffff )
_flush ( * mid ) ; // latency sensitive if we don't have one
}
}
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
if ( ( age > = ZT_PEER_ACTIVITY_TIMEOUT ) | | ( mostRecentMemberId > 0xffff ) ) {
Mutex : : Lock _l ( _sendViaClusterQueue_m ) ;
if ( _sendViaClusterQueue . count ( fromPeerAddress ) < ZT_CLUSTER_MAX_QUEUE_PER_SENDER )
_sendViaClusterQueue . insert ( std : : pair < Address , _SQE * > ( fromPeerAddress , new _SQE ( now , toPeerAddress , data , len , unite ) ) ) ;
return true ;
}
}
Buffer < 1024 > buf ;
@ -401,11 +446,14 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
v6 . serialize ( buf ) ;
}
}
{
Mutex : : Lock _l2 ( _members [ canHasPeer ] . lock ) ;
if ( buf . size ( ) > 0 )
_send ( canHasPeer , STATE_MESSAGE_PROXY_UNITE , buf . data ( ) , buf . size ( ) ) ;
if ( _members [ canHasPeer ] . zeroTierPhysicalEndpoints . size ( ) > 0 )
Mutex : : Lock _l2 ( _members [ mostRecentMemberId ] . lock ) ;
if ( buf . size ( ) > 0 ) {
_send ( canHasPeer , CLUSTER_MESSAGE_PROXY_UNITE , buf . data ( ) , buf . size ( ) ) ;
_flush ( canHasPeer ) ; // latency sensitive
}
if ( _members [ mostRecentMemberId ] . zeroTierPhysicalEndpoints . size ( ) > 0 )
RR - > node - > putPacket ( InetAddress ( ) , _members [ canHasPeer ] . zeroTierPhysicalEndpoints . front ( ) , data , len ) ;
}
@ -414,62 +462,43 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
return true ;
}
void Cluster : : replicateHavePeer ( const Identity & peerId )
{
char buf [ ZT_ADDRESS_LENGTH ] ;
peerId . address ( ) . copyTo ( buf , ZT_ADDRESS_LENGTH ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_HAVE_PEER , buf , ZT_ADDRESS_LENGTH ) ;
}
}
}
void Cluster : : replicateMulticastLike ( uint64_t nwid , const Address & peerAddress , const MulticastGroup & group )
{
Buffer < 1024 > buf ;
buf . append ( ( uint64_t ) nwid ) ;
peerAddress . appendTo ( buf ) ;
group . mac ( ) . appendTo ( buf ) ;
buf . append ( ( uint32_t ) group . adi ( ) ) ;
TRACE ( " replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members " , peerAddress . toString ( ) . c_str ( ) , nwid , group . mac ( ) . toString ( ) . c_str ( ) , ( unsigned int ) group . adi ( ) ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_MULTICAST_LIKE , buf . data ( ) , buf . size ( ) ) ;
}
}
}
void Cluster : : replicateCertificateOfNetworkMembership ( const CertificateOfMembership & com )
void Cluster : : sendDistributedQuery ( const Packet & pkt )
{
/* not used yet, so don't do this yet
Buffer < 4096 > buf ;
com . serialize ( buf ) ;
TRACE ( " replicating %s COM for %.16llx to all members " , com . issuedTo ( ) . toString ( ) . c_str ( ) , com . networkId ( ) ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_COM , buf . data ( ) , buf . size ( ) ) ;
}
buf . append ( ( uint16_t ) pkt . size ( ) ) ;
buf . append ( pkt . data ( ) , pkt . size ( ) ) ;
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , CLUSTER_MESSAGE_REMOTE_PACKET , buf . data ( ) , buf . size ( ) ) ;
_flush ( * mid ) ; // these tend to be latency-sensitive
}
*/
}
void Cluster : : doPeriodicTasks ( )
{
const uint64_t now = RR - > node - > now ( ) ;
if ( ( now - _lastFlushed ) > = ZT_CLUSTER_FLUSH_PERIOD ) {
_lastFlushed = now ;
{
Mutex : : Lock _l2 ( _sendViaClusterQueue_m ) ;
for ( std : : multimap < Address , _SQE * > : : iterator qi ( _sendViaClusterQueue . begin ( ) ) ; qi ! = _sendViaClusterQueue . end ( ) ; ) {
if ( ( now - qi - > second - > timestamp ) > = ZT_CLUSTER_QUEUE_EXPIRATION ) {
delete qi - > second ;
_sendViaClusterQueue . erase ( qi + + ) ;
} else + + qi ;
}
}
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
if ( ( now - _members [ * mid ] . lastAnnouncedAliveTo ) > = ( ( ZT_CLUSTER_TIMEOUT / 2 ) - 1000 ) ) {
_members [ * mid ] . lastAnnouncedAliveTo = now ;
Buffer < 2048 > alive ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_MAJOR ) ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_MINOR ) ;
@ -491,13 +520,23 @@ void Cluster::doPeriodicTasks()
alive . append ( ( uint8_t ) _zeroTierPhysicalEndpoints . size ( ) ) ;
for ( std : : vector < InetAddress > : : const_iterator pe ( _zeroTierPhysicalEndpoints . begin ( ) ) ; pe ! = _zeroTierPhysicalEndpoints . end ( ) ; + + pe )
pe - > serialize ( alive ) ;
_send ( * mid , STATE_MESSAGE_ALIVE , alive . data ( ) , alive . size ( ) ) ;
_members [ * mid ] . lastAnnouncedAliveTo = now ;
_send ( * mid , CLUSTER_MESSAGE_ALIVE , alive . data ( ) , alive . size ( ) ) ;
}
_flush ( * mid ) ; // does nothing if nothing to flush
}
}
if ( ( now - _lastCleanedRemotePeers ) > = ( ZT_PEER_ACTIVITY_TIMEOUT * 2 ) ) {
_lastCleanedRemotePeers = now ;
Mutex : : Lock _l ( _remotePeers_m ) ;
for ( std : : map < std : : pair < Address , unsigned int > , uint64_t > : : iterator rp ( _remotePeers . begin ( ) ) ; rp ! = _remotePeers . end ( ) ; ) {
if ( ( now - rp - > second ) > = ZT_PEER_ACTIVITY_TIMEOUT )
_remotePeers . erase ( rp + + ) ;
else + + rp ;
}
}
}
void Cluster : : addMember ( uint16_t memberId )