|
|
|
|
@ -50,7 +50,7 @@ Multicaster::~Multicaster()
|
|
|
|
|
{ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
unsigned int Multicaster::gather(const RuntimeEnvironment *RR,const Address &queryingPeer,uint64_t nwid,MulticastGroup &mg,Packet &appendTo,unsigned int limit) const |
|
|
|
|
unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const |
|
|
|
|
{ |
|
|
|
|
unsigned char *p; |
|
|
|
|
unsigned int n = 0,i,rptr,skipped = 0; |
|
|
|
|
@ -111,6 +111,24 @@ restart_member_scan:
|
|
|
|
|
return n; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::vector<Address> Multicaster::getLegacySubscribers(uint64_t nwid,const MulticastGroup &mg) const |
|
|
|
|
{ |
|
|
|
|
std::vector<Address> ls; |
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
|
|
|
|
|
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::const_iterator gs(_groups.find(std::pair<uint64_t,MulticastGroup>(nwid,mg))); |
|
|
|
|
if (gs == _groups.end()) |
|
|
|
|
return ls; |
|
|
|
|
|
|
|
|
|
for(std::vector<MulticastGroupMember>::const_iterator m(gs->second.members.begin());m!=gs->second.members.end();++m) { |
|
|
|
|
SharedPtr<Peer> p(RR->topology->getPeer(m->address)); |
|
|
|
|
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) |
|
|
|
|
ls.push_back(m->address); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return ls; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Multicaster::send( |
|
|
|
|
const CertificateOfMembership *com, |
|
|
|
|
unsigned int limit, |
|
|
|
|
@ -148,12 +166,24 @@ void Multicaster::send(
|
|
|
|
|
unsigned int count = 0; |
|
|
|
|
|
|
|
|
|
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { |
|
|
|
|
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
|
|
|
|
|
SharedPtr<Peer> p(RR->topology->getPeer(*ast)); |
|
|
|
|
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (count++ >= limit) |
|
|
|
|
break; |
|
|
|
|
out.sendOnly(*(RR->sw),*ast); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { |
|
|
|
|
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
|
|
|
|
|
SharedPtr<Peer> p(RR->topology->getPeer(m->address)); |
|
|
|
|
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (count++ >= limit) |
|
|
|
|
break; |
|
|
|
|
if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) |
|
|
|
|
@ -164,7 +194,6 @@ void Multicaster::send(
|
|
|
|
|
|
|
|
|
|
if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) { |
|
|
|
|
gs.lastExplicitGather = now; |
|
|
|
|
|
|
|
|
|
SharedPtr<Peer> sn(RR->topology->getBestSupernode()); |
|
|
|
|
if (sn) { |
|
|
|
|
Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); |
|
|
|
|
@ -176,13 +205,12 @@ void Multicaster::send(
|
|
|
|
|
outp.armor(sn->key(),true); |
|
|
|
|
sn->send(RR,outp.data(),outp.size(),now); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gatherLimit = 0; // once we've done this we don't need to do it implicitly
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if ((gatherLimit > 0)&&((now - gs.lastImplicitGather) > ZT_MULTICAST_IMPLICIT_GATHER_DELAY)) |
|
|
|
|
gatherLimit = 0; // implicit not needed
|
|
|
|
|
} else if ((now - gs.lastImplicitGather) > ZT_MULTICAST_IMPLICIT_GATHER_DELAY) { |
|
|
|
|
gs.lastImplicitGather = now; |
|
|
|
|
else gatherLimit = 0; |
|
|
|
|
} else { |
|
|
|
|
gatherLimit = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gs.txQueue.push_back(OutboundMulticast()); |
|
|
|
|
OutboundMulticast &out = gs.txQueue.back(); |
|
|
|
|
@ -200,10 +228,23 @@ void Multicaster::send(
|
|
|
|
|
data, |
|
|
|
|
len); |
|
|
|
|
|
|
|
|
|
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) |
|
|
|
|
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { |
|
|
|
|
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
|
|
|
|
|
SharedPtr<Peer> p(RR->topology->getPeer(*ast)); |
|
|
|
|
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
out.sendAndLog(*(RR->sw),*ast); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { |
|
|
|
|
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
|
|
|
|
|
SharedPtr<Peer> p(RR->topology->getPeer(m->address)); |
|
|
|
|
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) |
|
|
|
|
out.sendAndLog(*(RR->sw),m->address); |
|
|
|
|
} |
|
|
|
|
@ -211,7 +252,7 @@ void Multicaster::send(
|
|
|
|
|
|
|
|
|
|
// DEPRECATED / LEGACY / TODO:
|
|
|
|
|
// Currently we also always send a legacy P5_MULTICAST_FRAME packet to our
|
|
|
|
|
// supernode. Our supernode then takes care of relaying it down to all <1.0.0
|
|
|
|
|
// supernode. Our supernode then takes care of relaying it down to <1.0.0
|
|
|
|
|
// nodes. This code can go away (along with support for P5_MULTICAST_FRAME)
|
|
|
|
|
// once there are no more such nodes on the network.
|
|
|
|
|
{ |
|
|
|
|
@ -337,6 +378,12 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const
|
|
|
|
|
|
|
|
|
|
// Try to send to any outgoing multicasts that are waiting for more recipients
|
|
|
|
|
for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) { |
|
|
|
|
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
|
|
|
|
|
SharedPtr<Peer> p(RR->topology->getPeer(member)); |
|
|
|
|
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tx->sendIfNew(*(RR->sw),member); |
|
|
|
|
if (tx->atLimit()) |
|
|
|
|
gs.txQueue.erase(tx++); |
|
|
|
|
|