@ -37,6 +37,7 @@
# include "Peer.hpp"
# include "CMWC4096.hpp"
# include "C25519.hpp"
# include "NodeConfig.hpp"
# include "CertificateOfMembership.hpp"
# include "Logger.hpp"
@ -54,66 +55,67 @@ Multicaster::~Multicaster()
unsigned int Multicaster : : gather ( const Address & queryingPeer , uint64_t nwid , const MulticastGroup & mg , Packet & appendTo , unsigned int limit ) const
{
unsigned char * p ;
unsigned int n = 0 , i , rptr , skipped = 0 ;
uint64_t a , done [ ( ZT_PROTO_MAX_PACKET_LENGTH / 5 ) + 1 ] ;
unsigned int added = 0 , i , k , rptr , totalKnown = 0 ;
uint64_t a , picked [ ( ZT_PROTO_MAX_PACKET_LENGTH / 5 ) + 1 ] ;
Mutex : : Lock _l ( _groups_m ) ;
std : : map < std : : pair < uint64_t , MulticastGroup > , MulticastGroupStatus > : : const_iterator gs ( _groups . find ( std : : pair < uint64_t , MulticastGroup > ( nwid , mg ) ) ) ;
if ( ( gs = = _groups . end ( ) ) | | ( gs - > second . members . empty ( ) ) ) {
appendTo . append ( ( uint32_t ) 0 ) ;
appendTo . append ( ( uint16_t ) 0 ) ;
//TRACE("..MC Multicaster::gather() attached 0 of 0 peers for %.16llx/%s (1)",nwid,mg.toString().c_str());
if ( ! limit )
return 0 ;
if ( limit > 0xffff ) // TODO: multiple return packets not yet supported
limit = 0xffff ;
{ // Return myself if I am a member of this group
SharedPtr < Network > network ( RR - > nc - > network ( nwid ) ) ;
if ( ( network ) & & ( network - > subscribedToMulticastGroup ( mg ) ) ) {
RR - > identity . address ( ) . appendTo ( appendTo ) ;
+ + totalKnown ;
+ + added ;
}
}
if ( limit > gs - > second . members . size ( ) )
limit = ( unsigned int ) gs - > second . members . size ( ) ;
if ( limit > ( ( ZT_PROTO_MAX_PACKET_LENGTH / ZT_ADDRESS_LENGTH ) + 1 ) )
limit = ( ZT_PROTO_MAX_PACKET_LENGTH / ZT_ADDRESS_LENGTH ) + 1 ;
Mutex : : Lock _l ( _groups_m ) ;
unsigned int totalAt = appendTo . size ( ) ;
const unsigned int totalAt = appendTo . size ( ) ;
appendTo . addSize ( 4 ) ; // sizeof(uint32_t)
unsigned int nAt = appendTo . size ( ) ;
const unsigned int added At = appendTo . size ( ) ;
appendTo . addSize ( 2 ) ; // sizeof(uint16_t)
// Members are returned in random order so that repeated gather queries
// will return different subsets of a large multicast group.
while ( ( n < limit ) & & ( ( appendTo . size ( ) + ZT_ADDRESS_LENGTH ) < = ZT_PROTO_MAX_PACKET_LENGTH ) ) {
rptr = ( unsigned int ) RR - > prng - > next32 ( ) ;
std : : map < std : : pair < uint64_t , MulticastGroup > , MulticastGroupStatus > : : const_iterator gs ( _groups . find ( std : : pair < uint64_t , MulticastGroup > ( nwid , mg ) ) ) ;
if ( ( gs ! = _groups . end ( ) ) & & ( ! gs - > second . members . empty ( ) ) ) {
totalKnown + = gs - > second . members . size ( ) ;
// Members are returned in random order so that repeated gather queries
// will return different subsets of a large multicast group.
k = 0 ;
while ( ( added < limit ) & & ( k < gs - > second . members . size ( ) ) & & ( ( appendTo . size ( ) + ZT_ADDRESS_LENGTH ) < = ZT_PROTO_MAX_PACKET_LENGTH ) ) {
rptr = ( unsigned int ) RR - > prng - > next32 ( ) ;
restart_member_scan :
a = gs - > second . members [ rptr % ( unsigned int ) gs - > second . members . size ( ) ] . address . toInt ( ) ;
for ( i = 0 ; i < n ; + + i ) {
if ( done [ i ] = = a ) {
+ + rptr ;
goto restart_member_scan ;
a = gs - > second . members [ rptr % ( unsigned int ) gs - > second . members . size ( ) ] . address . toInt ( ) ;
for ( i = 0 ; i < k ; + + i ) {
if ( picked [ i ] = = a ) {
+ + rptr ;
goto restart_member_scan ;
}
}
picked [ k + + ] = a ;
if ( queryingPeer . toInt ( ) ! = a ) { // do not return the peer that is making the request as a result
p = ( unsigned char * ) appendTo . appendField ( ZT_ADDRESS_LENGTH ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 32 ) & 0xff ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 24 ) & 0xff ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 16 ) & 0xff ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 8 ) & 0xff ) ;
* p = ( unsigned char ) ( a & 0xff ) ;
+ + added ;
}
}
// Log that we've picked this one
done [ n + + ] = a ;
if ( queryingPeer . toInt ( ) = = a ) {
+ + skipped ;
} else {
// Append to packet
p = ( unsigned char * ) appendTo . appendField ( ZT_ADDRESS_LENGTH ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 32 ) & 0xff ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 24 ) & 0xff ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 16 ) & 0xff ) ;
* ( p + + ) = ( unsigned char ) ( ( a > > 8 ) & 0xff ) ;
* p = ( unsigned char ) ( a & 0xff ) ;
}
}
n - = skipped ;
appendTo . setAt ( totalAt , ( uint32_t ) ( gs - > second . members . size ( ) - skipped ) ) ;
appendTo . setAt ( nAt , ( uint16_t ) n ) ;
appendTo . setAt ( totalAt , ( uint32_t ) totalKnown ) ;
appendTo . setAt ( addedAt , ( uint16_t ) added ) ;
//TRACE("..MC Multicaster::gather() attached %u of %u peers for %.16llx/%s (2)",n,(unsigned int)(gs->second.members.size() - skipped),nwid,mg.toString().c_str());
return n ;
return added ;
}
std : : vector < Address > Multicaster : : getLegacySubscribers ( uint64_t nwid , const MulticastGroup & mg ) const
@ -383,7 +385,7 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi
// this somewhere else but we'll try this for now.
gs . members . push_back ( MulticastGroupMember ( member , learnedFrom , now ) ) ;
TRACE ( " ..MC %s joined multicast group %.16llx/%s via %s " , member . toString ( ) . c_str ( ) , nwid , mg . toString ( ) . c_str ( ) , ( ( learnedFrom ) ? learnedFrom . toString ( ) . c_str ( ) : " (direct) " ) ) ;
//TRACE("..MC %s joined multicast group %.16llx/%s via %s",member.toString().c_str(),nwid,mg.toString().c_str(),((learnedFrom) ? learnedFrom.toString().c_str() : "(direct)"));
// Try to send to any outgoing multicasts that are waiting for more recipients
for ( std : : list < OutboundMulticast > : : iterator tx ( gs . txQueue . begin ( ) ) ; tx ! = gs . txQueue . end ( ) ; ) {