|
|
|
|
@ -41,7 +41,9 @@
|
|
|
|
|
namespace ZeroTier { |
|
|
|
|
|
|
|
|
|
Multicaster::Multicaster(const RuntimeEnvironment *renv) : |
|
|
|
|
RR(renv) |
|
|
|
|
RR(renv), |
|
|
|
|
_groups(1024), |
|
|
|
|
_groups_m() |
|
|
|
|
{ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -54,7 +56,7 @@ void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &m
|
|
|
|
|
const unsigned char *p = (const unsigned char *)addresses; |
|
|
|
|
const unsigned char *e = p + (5 * count); |
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)]; |
|
|
|
|
MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; |
|
|
|
|
while (p != e) { |
|
|
|
|
_add(now,nwid,mg,gs,Address(p,5)); |
|
|
|
|
p += 5; |
|
|
|
|
@ -64,11 +66,11 @@ void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &m
|
|
|
|
|
void Multicaster::remove(uint64_t nwid,const MulticastGroup &mg,const Address &member) |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator g(_groups.find(std::pair<uint64_t,MulticastGroup>(nwid,mg))); |
|
|
|
|
if (g != _groups.end()) { |
|
|
|
|
for(std::vector<MulticastGroupMember>::iterator m(g->second.members.begin());m!=g->second.members.end();++m) { |
|
|
|
|
MulticastGroupStatus *s = _groups.get(Multicaster::Key(nwid,mg)); |
|
|
|
|
if (s) { |
|
|
|
|
for(std::vector<MulticastGroupMember>::iterator m(s->members.begin());m!=s->members.end();++m) { |
|
|
|
|
if (m->address == member) { |
|
|
|
|
g->second.members.erase(m); |
|
|
|
|
s->members.erase(m); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -102,18 +104,18 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const
|
|
|
|
|
|
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
|
|
|
|
|
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::const_iterator gs(_groups.find(std::pair<uint64_t,MulticastGroup>(nwid,mg))); |
|
|
|
|
if ((gs != _groups.end())&&(!gs->second.members.empty())) { |
|
|
|
|
totalKnown += (unsigned int)gs->second.members.size(); |
|
|
|
|
const MulticastGroupStatus *s = _groups.get(Multicaster::Key(nwid,mg)); |
|
|
|
|
if ((s)&&(!s->members.empty())) { |
|
|
|
|
totalKnown += (unsigned int)s->members.size(); |
|
|
|
|
|
|
|
|
|
// Members are returned in random order so that repeated gather queries
|
|
|
|
|
// will return different subsets of a large multicast group.
|
|
|
|
|
k = 0; |
|
|
|
|
while ((added < limit)&&(k < gs->second.members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_UDP_DEFAULT_PAYLOAD_MTU)) { |
|
|
|
|
while ((added < limit)&&(k < s->members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_UDP_DEFAULT_PAYLOAD_MTU)) { |
|
|
|
|
rptr = (unsigned int)RR->node->prng(); |
|
|
|
|
|
|
|
|
|
restart_member_scan: |
|
|
|
|
a = gs->second.members[rptr % (unsigned int)gs->second.members.size()].address.toInt(); |
|
|
|
|
a = s->members[rptr % (unsigned int)s->members.size()].address.toInt(); |
|
|
|
|
for(i=0;i<k;++i) { |
|
|
|
|
if (picked[i] == a) { |
|
|
|
|
++rptr; |
|
|
|
|
@ -146,10 +148,10 @@ std::vector<Address> Multicaster::getMembers(uint64_t nwid,const MulticastGroup
|
|
|
|
|
{ |
|
|
|
|
std::vector<Address> ls; |
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::const_iterator gs(_groups.find(std::pair<uint64_t,MulticastGroup>(nwid,mg))); |
|
|
|
|
if (gs == _groups.end()) |
|
|
|
|
const MulticastGroupStatus *s = _groups.get(Multicaster::Key(nwid,mg)); |
|
|
|
|
if (!s) |
|
|
|
|
return ls; |
|
|
|
|
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs->second.members.rbegin());m!=gs->second.members.rend();++m) { |
|
|
|
|
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(s->members.rbegin());m!=s->members.rend();++m) { |
|
|
|
|
ls.push_back(m->address); |
|
|
|
|
if (ls.size() >= limit) |
|
|
|
|
break; |
|
|
|
|
@ -173,7 +175,7 @@ void Multicaster::send(
|
|
|
|
|
unsigned long *indexes = idxbuf; |
|
|
|
|
|
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)]; |
|
|
|
|
MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)]; |
|
|
|
|
|
|
|
|
|
if (!gs.members.empty()) { |
|
|
|
|
// Allocate a memory buffer if group is monstrous
|
|
|
|
|
@ -291,18 +293,22 @@ void Multicaster::send(
|
|
|
|
|
void Multicaster::clean(uint64_t now) |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l(_groups_m); |
|
|
|
|
for(std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) { |
|
|
|
|
for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) { |
|
|
|
|
|
|
|
|
|
Multicaster::Key *k = (Multicaster::Key *)0; |
|
|
|
|
MulticastGroupStatus *s = (MulticastGroupStatus *)0; |
|
|
|
|
Hashtable<Multicaster::Key,MulticastGroupStatus>::Iterator mm(_groups); |
|
|
|
|
while (mm.next(k,s)) { |
|
|
|
|
for(std::list<OutboundMulticast>::iterator tx(s->txQueue.begin());tx!=s->txQueue.end();) { |
|
|
|
|
if ((tx->expired(now))||(tx->atLimit())) |
|
|
|
|
mm->second.txQueue.erase(tx++); |
|
|
|
|
s->txQueue.erase(tx++); |
|
|
|
|
else ++tx; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
unsigned long count = 0; |
|
|
|
|
{ |
|
|
|
|
std::vector<MulticastGroupMember>::iterator reader(mm->second.members.begin()); |
|
|
|
|
std::vector<MulticastGroupMember>::iterator reader(s->members.begin()); |
|
|
|
|
std::vector<MulticastGroupMember>::iterator writer(reader); |
|
|
|
|
while (reader != mm->second.members.end()) { |
|
|
|
|
while (reader != s->members.end()) { |
|
|
|
|
if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) { |
|
|
|
|
*writer = *reader; |
|
|
|
|
++writer; |
|
|
|
|
@ -313,13 +319,11 @@ void Multicaster::clean(uint64_t now)
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (count) { |
|
|
|
|
mm->second.members.resize(count); |
|
|
|
|
++mm; |
|
|
|
|
} else if (mm->second.txQueue.empty()) { |
|
|
|
|
_groups.erase(mm++); |
|
|
|
|
s->members.resize(count); |
|
|
|
|
} else if (s->txQueue.empty()) { |
|
|
|
|
_groups.erase(*k); |
|
|
|
|
} else { |
|
|
|
|
mm->second.members.clear(); |
|
|
|
|
++mm; |
|
|
|
|
s->members.clear(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|