|
|
|
|
@ -223,10 +223,10 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
|
|
|
|
|
std::list<_SQE> q; |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l(_sendViaClusterQueue_m); |
|
|
|
|
std::map< Address,std::list<_SQE> >::iterator qe(_sendViaClusterQueue.find(id.address())); |
|
|
|
|
if (qe != _sendViaClusterQueue.end()) { |
|
|
|
|
q.swap(qe->second); // just swap ptr instead of copying
|
|
|
|
|
_sendViaClusterQueue.erase(qe); |
|
|
|
|
for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { |
|
|
|
|
if (qi->toPeerAddress == id.address()) |
|
|
|
|
q.splice(q.end(),_sendViaClusterQueue,qi++); |
|
|
|
|
else ++qi; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi) |
|
|
|
|
@ -368,16 +368,17 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
|
|
|
|
|
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
|
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
_sendViaClusterQueue_m.lock(); |
|
|
|
|
unsigned long queueCount; |
|
|
|
|
unsigned int queueCount = 0; |
|
|
|
|
{ |
|
|
|
|
std::map< Address,std::list<_SQE> >::const_iterator qe(_sendViaClusterQueue.find(fromPeerAddress)); |
|
|
|
|
queueCount = (qe == _sendViaClusterQueue.end()) ? 0 : (unsigned long)qe->second.size(); |
|
|
|
|
} |
|
|
|
|
_sendViaClusterQueue_m.unlock(); |
|
|
|
|
if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { |
|
|
|
|
TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); |
|
|
|
|
return; |
|
|
|
|
Mutex::Lock _l(_sendViaClusterQueue_m); |
|
|
|
|
for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) { |
|
|
|
|
if (qi->fromPeerAddress == fromPeerAddress) { |
|
|
|
|
if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { |
|
|
|
|
TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const uint64_t now = RR->node->now(); |
|
|
|
|
@ -386,9 +387,9 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
|
|
|
|
|
unsigned int mostRecentMemberId = 0xffffffff; |
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l2(_remotePeers_m); |
|
|
|
|
std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(fromPeerAddress,0))); |
|
|
|
|
std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0))); |
|
|
|
|
for(;;) { |
|
|
|
|
if ((rpe == _remotePeers.end())||(rpe->first.first != fromPeerAddress)) |
|
|
|
|
if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress)) |
|
|
|
|
break; |
|
|
|
|
else if (rpe->second > mostRecentTs) { |
|
|
|
|
mostRecentTs = rpe->second; |
|
|
|
|
@ -420,7 +421,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
|
|
|
|
|
if (enqueueAndWait) { |
|
|
|
|
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); |
|
|
|
|
Mutex::Lock _l(_sendViaClusterQueue_m); |
|
|
|
|
_sendViaClusterQueue[fromPeerAddress].push_back(_SQE(now,toPeerAddress,data,len,unite)); |
|
|
|
|
_sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -484,13 +485,8 @@ void Cluster::doPeriodicTasks()
|
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
Mutex::Lock _l2(_sendViaClusterQueue_m); |
|
|
|
|
for(std::map< Address,std::list<_SQE> >::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { |
|
|
|
|
for(std::list<_SQE>::iterator qii(qi->second.begin());qii!=qi->second.end();) { |
|
|
|
|
if ((now - qii->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) |
|
|
|
|
qi->second.erase(qii++); |
|
|
|
|
else ++qii; |
|
|
|
|
} |
|
|
|
|
if (qi->second.empty()) |
|
|
|
|
for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { |
|
|
|
|
if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) |
|
|
|
|
_sendViaClusterQueue.erase(qi++); |
|
|
|
|
else ++qi; |
|
|
|
|
} |
|
|
|
|
|