@ -52,18 +52,16 @@ Multicaster::~Multicaster()
{
}
void Multicaster : : addMultiple ( uint64_t now , uint64_t nwid , const MulticastGroup & mg , const Address & learnedFrom , const void * addresses , unsigned int count , unsigned int totalKnown )
void Multicaster : : addMultiple ( uint64_t now , uint64_t nwid , const MulticastGroup & mg , const void * addresses , unsigned int count , unsigned int totalKnown )
{
const unsigned char * p = ( const unsigned char * ) addresses ;
const unsigned char * e = p + ( 5 * count ) ;
Mutex : : Lock _l ( _groups_m ) ;
MulticastGroupStatus & gs = _groups [ std : : pair < uint64_t , MulticastGroup > ( nwid , mg ) ] ;
while ( p ! = e ) {
_add ( now , nwid , mg , gs , learnedFrom , Address ( p , 5 ) ) ;
_add ( now , nwid , mg , gs , Address ( p , 5 ) ) ;
p + = 5 ;
}
if ( RR - > topology - > isSupernode ( learnedFrom ) )
gs . totalKnownMembers = totalKnown ;
}
unsigned int Multicaster : : gather ( const Address & queryingPeer , uint64_t nwid , const MulticastGroup & mg , Packet & appendTo , unsigned int limit ) const
@ -160,123 +158,145 @@ void Multicaster::send(
const void * data ,
unsigned int len )
{
unsigned long idxbuf [ 8194 ] ;
unsigned long * indexes = idxbuf ;
Mutex : : Lock _l ( _groups_m ) ;
MulticastGroupStatus & gs = _groups [ std : : pair < uint64_t , MulticastGroup > ( nwid , mg ) ] ;
if ( gs . members . size ( ) > = limit ) {
// If we already have enough members, just send and we're done. We can
// skip the TX queue and skip the overhead of maintaining a send log by
// using sendOnly().
OutboundMulticast out ;
out . init (
RR ,
now ,
nwid ,
com ,
limit ,
0 ,
src ,
mg ,
etherType ,
data ,
len ) ;
if ( ! gs . members . empty ( ) ) {
// Use a stack-allocated buffer unless this multicast group is ridiculously huge
if ( gs . members . size ( ) > 8194 )
indexes = new unsigned long [ gs . members . size ( ) ] ;
// Generate a random permutation of member indexes
for ( unsigned long i = 0 ; i < gs . members . size ( ) ; + + i )
indexes [ i ] = i ;
for ( unsigned long i = gs . members . size ( ) - 1 ; i > 0 ; - - i ) {
unsigned long j = RR - > prng - > next32 ( ) % ( i + 1 ) ;
unsigned long tmp = indexes [ j ] ;
indexes [ j ] = indexes [ i ] ;
indexes [ i ] = tmp ;
}
unsigned int count = 0 ;
if ( gs . members . size ( ) > = limit ) {
// If we already have enough members, just send and we're done. We can
// skip the TX queue and skip the overhead of maintaining a send log by
// using sendOnly().
OutboundMulticast out ;
out . init (
RR ,
now ,
nwid ,
com ,
limit ,
2 , // we'll still gather a little from peers to keep multicast list fresh
src ,
mg ,
etherType ,
data ,
len ) ;
unsigned int count = 0 ;
for ( std : : vector < Address > : : const_iterator ast ( alwaysSendTo . begin ( ) ) ; ast ! = alwaysSendTo . end ( ) ; + + ast ) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr < Peer > p ( RR - > topology - > getPeer ( * ast ) ) ;
if ( ( p ) & & ( p - > remoteVersionKnown ( ) ) & & ( p - > remoteVersionMajor ( ) < 1 ) )
continue ;
}
for ( std : : vector < Address > : : const_iterator ast ( alwaysSendTo . begin ( ) ) ; ast ! = alwaysSendTo . end ( ) ; + + ast ) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr < Peer > p ( RR - > topology - > getPeer ( * ast ) ) ;
if ( ( p ) & & ( p - > remoteVersionKnown ( ) ) & & ( p - > remoteVersionMajor ( ) < 1 ) )
continue ;
out . sendOnly ( RR , * ast ) ;
if ( + + count > = limit )
break ;
}
out . sendOnly ( RR , * ast ) ;
if ( + + count > = limit )
break ;
}
unsigned long idx = 0 ;
while ( count < limit ) {
const MulticastGroupMember & m = gs . members [ indexes [ idx + + ] ] ;
if ( count < limit ) {
for ( std : : vector < MulticastGroupMember > : : const_reverse_iterator m ( gs . members . rbegin ( ) ) ; m ! = gs . members . rend ( ) ; + + m ) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr < Peer > p ( RR - > topology - > getPeer ( m - > address ) ) ;
SharedPtr < Peer > p ( RR - > topology - > getPeer ( m . address ) ) ;
if ( ( p ) & & ( p - > remoteVersionKnown ( ) ) & & ( p - > remoteVersionMajor ( ) < 1 ) )
continue ;
}
if ( std : : find ( alwaysSendTo . begin ( ) , alwaysSendTo . end ( ) , m - > address ) = = alwaysSendTo . end ( ) ) {
out . sendOnly ( RR , m - > address ) ;
if ( + + count > = limit )
break ;
if ( std : : find ( alwaysSendTo . begin ( ) , alwaysSendTo . end ( ) , m . address ) = = alwaysSendTo . end ( ) ) {
out . sendOnly ( RR , m . address ) ;
+ + count ;
}
}
}
} else {
unsigned int gatherLimit = ( limit - ( unsigned int ) gs . members . size ( ) ) + 1 ;
if ( ( now - gs . lastExplicitGather ) > = ZT_MULTICAST_EXPLICIT_GATHER_DELAY ) {
gs . lastExplicitGather = now ;
SharedPtr < Peer > sn ( RR - > topology - > getBestSupernode ( ) ) ;
if ( sn ) {
TRACE ( " >>MC GATHER up to %u in %.16llx/%s " , gatherLimit , nwid , mg . toString ( ) . c_str ( ) ) ;
Packet outp ( sn - > address ( ) , RR - > identity . address ( ) , Packet : : VERB_MULTICAST_GATHER ) ;
outp . append ( nwid ) ;
outp . append ( ( uint8_t ) 0 ) ;
mg . mac ( ) . appendTo ( outp ) ;
outp . append ( ( uint32_t ) mg . adi ( ) ) ;
outp . append ( ( uint32_t ) gatherLimit ) ; // +1 just means we'll have an extra in the queue if available
outp . armor ( sn - > key ( ) , true ) ;
sn - > send ( RR , outp . data ( ) , outp . size ( ) , now ) ;
} else {
unsigned int gatherLimit = ( limit - ( unsigned int ) gs . members . size ( ) ) + 1 ;
if ( ( now - gs . lastExplicitGather ) > = ZT_MULTICAST_EXPLICIT_GATHER_DELAY ) {
gs . lastExplicitGather = now ;
SharedPtr < Peer > sn ( RR - > topology - > getBestSupernode ( ) ) ;
if ( sn ) {
TRACE ( " >>MC GATHER up to %u in %.16llx/%s " , gatherLimit , nwid , mg . toString ( ) . c_str ( ) ) ;
Packet outp ( sn - > address ( ) , RR - > identity . address ( ) , Packet : : VERB_MULTICAST_GATHER ) ;
outp . append ( nwid ) ;
outp . append ( ( uint8_t ) 0 ) ;
mg . mac ( ) . appendTo ( outp ) ;
outp . append ( ( uint32_t ) mg . adi ( ) ) ;
outp . append ( ( uint32_t ) gatherLimit ) ; // +1 just means we'll have an extra in the queue if available
outp . armor ( sn - > key ( ) , true ) ;
sn - > send ( RR , outp . data ( ) , outp . size ( ) , now ) ;
}
gatherLimit = 0 ; // don't need to gather from peers this time since we consulted the core
}
gatherLimit = 0 ; // implicit not needed
}
gs . txQueue . push_back ( OutboundMulticast ( ) ) ;
OutboundMulticast & out = gs . txQueue . back ( ) ;
out . init (
RR ,
now ,
nwid ,
com ,
limit ,
gatherLimit ,
src ,
mg ,
etherType ,
data ,
len ) ;
unsigned int count = 0 ;
gs . txQueue . push_back ( OutboundMulticast ( ) ) ;
OutboundMulticast & out = gs . txQueue . back ( ) ;
out . init (
RR ,
now ,
nwid ,
com ,
limit ,
gatherLimit ,
src ,
mg ,
etherType ,
data ,
len ) ;
unsigned int count = 0 ;
for ( std : : vector < Address > : : const_iterator ast ( alwaysSendTo . begin ( ) ) ; ast ! = alwaysSendTo . end ( ) ; + + ast ) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr < Peer > p ( RR - > topology - > getPeer ( * ast ) ) ;
if ( ( p ) & & ( p - > remoteVersionKnown ( ) ) & & ( p - > remoteVersionMajor ( ) < 1 ) )
continue ;
}
for ( std : : vector < Address > : : const_iterator ast ( alwaysSendTo . begin ( ) ) ; ast ! = alwaysSendTo . end ( ) ; + + ast ) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr < Peer > p ( RR - > topology - > getPeer ( * ast ) ) ;
if ( ( p ) & & ( p - > remoteVersionKnown ( ) ) & & ( p - > remoteVersionMajor ( ) < 1 ) )
continue ;
out . sendAndLog ( RR , * ast ) ;
if ( + + count > = limit )
break ;
}
out . sendAndLog ( RR , * ast ) ;
if ( + + count > = limit )
break ;
}
unsigned long idx = 0 ;
while ( ( count < limit ) & & ( idx < gs . members . size ( ) ) ) {
const MulticastGroupMember & m = gs . members [ indexes [ idx + + ] ] ;
if ( count < limit ) {
for ( std : : vector < MulticastGroupMember > : : const_reverse_iterator m ( gs . members . rbegin ( ) ) ; m ! = gs . members . rend ( ) ; + + m ) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr < Peer > p ( RR - > topology - > getPeer ( m - > address ) ) ;
SharedPtr < Peer > p ( RR - > topology - > getPeer ( m . address ) ) ;
if ( ( p ) & & ( p - > remoteVersionKnown ( ) ) & & ( p - > remoteVersionMajor ( ) < 1 ) )
continue ;
}
if ( std : : find ( alwaysSendTo . begin ( ) , alwaysSendTo . end ( ) , m - > address ) = = alwaysSendTo . end ( ) ) {
out . sendAndLog ( RR , m - > address ) ;
if ( + + count > = limit )
break ;
if ( std : : find ( alwaysSendTo . begin ( ) , alwaysSendTo . end ( ) , m . address ) = = alwaysSendTo . end ( ) ) {
out . sendAndLog ( RR , m . address ) ;
+ + count ;
}
}
}
if ( indexes ! = idxbuf )
delete [ ] indexes ;
}
// DEPRECATED / LEGACY / TODO:
@ -344,25 +364,6 @@ void Multicaster::clean(uint64_t now)
while ( reader ! = mm - > second . members . end ( ) ) {
if ( ( now - reader - > timestamp ) < ZT_MULTICAST_LIKE_EXPIRE ) {
* writer = * reader ;
/* We rank in ascending order of most recent relevant activity. For peers we've learned
* about by direct LIKEs , we do this in order of their own activity . For indirectly
* acquired peers we do this minus a constant to place these categorically below directly
* learned peers . For peers with no active Peer record , we use the time we last learned
* about them minus one day ( a large constant ) to put these at the bottom of the list .
* List is sorted in ascending order of rank and multicasts are sent last - to - first . */
if ( writer - > learnedFrom ! = writer - > address ) {
SharedPtr < Peer > p ( RR - > topology - > getPeer ( writer - > learnedFrom ) ) ;
if ( p )
writer - > rank = ( RR - > topology - > amSupernode ( ) ? p - > lastDirectReceive ( ) : p - > lastUnicastFrame ( ) ) - ZT_MULTICAST_LIKE_EXPIRE ;
else writer - > rank = writer - > timestamp - ( 86400000 + ZT_MULTICAST_LIKE_EXPIRE ) ;
} else {
SharedPtr < Peer > p ( RR - > topology - > getPeer ( writer - > address ) ) ;
if ( p )
writer - > rank = ( RR - > topology - > amSupernode ( ) ? p - > lastDirectReceive ( ) : p - > lastUnicastFrame ( ) ) ;
else writer - > rank = writer - > timestamp - 86400000 ;
}
+ + writer ;
+ + count ;
}
@ -370,12 +371,9 @@ void Multicaster::clean(uint64_t now)
}
if ( count ) {
// There are remaining members, so re-sort them by rank and resize the vector
std : : sort ( mm - > second . members . begin ( ) , writer ) ; // sorts in ascending order of rank
mm - > second . members . resize ( count ) ; // trim off the ones we cut, after writer
mm - > second . members . resize ( count ) ;
+ + mm ;
} else if ( mm - > second . txQueue . empty ( ) ) {
// There are no remaining members and no pending multicasts, so erase the entry
_groups . erase ( mm + + ) ;
} else {
mm - > second . members . clear ( ) ;
@ -384,7 +382,7 @@ void Multicaster::clean(uint64_t now)
}
}
void Multicaster : : _add ( uint64_t now , uint64_t nwid , const MulticastGroup & mg , MulticastGroupStatus & gs , const Address & learnedFrom , const Address & member )
void Multicaster : : _add ( uint64_t now , uint64_t nwid , const MulticastGroup & mg , MulticastGroupStatus & gs , const Address & member )
{
// assumes _groups_m is locked
@ -392,20 +390,14 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi
if ( member = = RR - > identity . address ( ) )
return ;
// Update timestamp and learnedFrom if existing
for ( std : : vector < MulticastGroupMember > : : iterator m ( gs . members . begin ( ) ) ; m ! = gs . members . end ( ) ; + + m ) {
if ( m - > address = = member ) {
if ( m - > learnedFrom ! = member ) // once we learn it directly, remember this forever
m - > learnedFrom = learnedFrom ;
m - > timestamp = now ;
return ;
}
}
// If not existing, add to end of list (highest priority) -- these will
// be resorted on next clean(). In the future we might want to insert
// this somewhere else but we'll try this for now.
gs . members . push_back ( MulticastGroupMember ( member , learnedFrom , now ) ) ;
gs . members . push_back ( MulticastGroupMember ( member , now ) ) ;
//TRACE("..MC %s joined multicast group %.16llx/%s via %s",member.toString().c_str(),nwid,mg.toString().c_str(),((learnedFrom) ? learnedFrom.toString().c_str() : "(direct)"));
@ -414,9 +406,9 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi
SharedPtr < Peer > p ( RR - > topology - > getPeer ( member ) ) ;
if ( ( ! p ) | | ( ! p - > remoteVersionKnown ( ) ) | | ( p - > remoteVersionMajor ( ) > = 1 ) ) {
for ( std : : list < OutboundMulticast > : : iterator tx ( gs . txQueue . begin ( ) ) ; tx ! = gs . txQueue . end ( ) ; ) {
if ( tx - > atLimit ( ) )
if ( tx - > atLimit ( ) ) {
gs . txQueue . erase ( tx + + ) ;
else {
} else {
tx - > sendIfNew ( RR , member ) ;
if ( tx - > atLimit ( ) )
gs . txQueue . erase ( tx + + ) ;