164 changed files with 1995 additions and 1410 deletions
@ -0,0 +1,6 @@
|
||||
CompileFlags: |
||||
Add: |
||||
- "-std=c++17" |
||||
- "-I../ext" |
||||
- "-I../ext/prometheus-cpp-lite-1.0/core/include" |
||||
- "-I../ext/prometheus-cpp-lite-1.0/simpleapi/include" |
||||
@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Copyright (c)2013-2021 ZeroTier, Inc. |
||||
* |
||||
* Use of this software is governed by the Business Source License included |
||||
* in the LICENSE.TXT file in the project's root directory. |
||||
* |
||||
* Change Date: 2026-01-01 |
||||
* |
||||
* On the date above, in accordance with the Business Source License, use |
||||
* of this software will be governed by version 2.0 of the Apache License. |
||||
*/ |
||||
/****/ |
||||
|
||||
#include "PacketMultiplexer.hpp" |
||||
|
||||
#include "Node.hpp" |
||||
#include "RuntimeEnvironment.hpp" |
||||
#include "Constants.hpp" |
||||
|
||||
#include <stdio.h> |
||||
#include <stdlib.h> |
||||
|
||||
namespace ZeroTier { |
||||
|
||||
PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) |
||||
{ |
||||
RR = renv; |
||||
}; |
||||
|
||||
void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) |
||||
{ |
||||
#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) |
||||
RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); |
||||
return; |
||||
#endif |
||||
|
||||
if (!_enabled) { |
||||
RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); |
||||
return; |
||||
} |
||||
|
||||
PacketRecord* packet; |
||||
_rxPacketVector_m.lock(); |
||||
if (_rxPacketVector.empty()) { |
||||
packet = new PacketRecord; |
||||
} |
||||
else { |
||||
packet = _rxPacketVector.back(); |
||||
_rxPacketVector.pop_back(); |
||||
} |
||||
_rxPacketVector_m.unlock(); |
||||
|
||||
packet->tPtr = tPtr; |
||||
packet->nwid = nwid; |
||||
packet->nuptr = nuptr; |
||||
packet->source = source.toInt(); |
||||
packet->dest = dest.toInt(); |
||||
packet->etherType = etherType; |
||||
packet->vlanId = vlanId; |
||||
packet->len = len; |
||||
packet->flowId = flowId; |
||||
memcpy(packet->data, data, len); |
||||
|
||||
int bucket = flowId % _concurrency; |
||||
_rxPacketQueues[bucket]->postLimit(packet, 2048); |
||||
} |
||||
|
||||
void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) |
||||
{ |
||||
#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) |
||||
return; |
||||
#endif |
||||
_enabled = true; |
||||
_concurrency = concurrency; |
||||
bool _enablePinning = cpuPinningEnabled; |
||||
|
||||
for (unsigned int i = 0; i < _concurrency; ++i) { |
||||
fprintf(stderr, "Reserved queue for thread %d\n", i); |
||||
_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>()); |
||||
} |
||||
|
||||
// Each thread picks from its own queue to feed into the core
|
||||
for (unsigned int i = 0; i < _concurrency; ++i) { |
||||
_rxThreads.push_back(std::thread([this, i, _enablePinning]() { |
||||
fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i); |
||||
|
||||
PacketRecord* packet = nullptr; |
||||
for (;;) { |
||||
if (! _rxPacketQueues[i]->get(packet)) { |
||||
break; |
||||
} |
||||
if (! packet) { |
||||
break; |
||||
} |
||||
|
||||
// fprintf(stderr, "popped packet from queue %d\n", i);
|
||||
|
||||
MAC sourceMac = MAC(packet->source); |
||||
MAC destMac = MAC(packet->dest); |
||||
|
||||
RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len); |
||||
{ |
||||
Mutex::Lock l(_rxPacketVector_m); |
||||
_rxPacketVector.push_back(packet); |
||||
} |
||||
/*
|
||||
if (ZT_ResultCode_isFatal(err)) { |
||||
char tmp[256]; |
||||
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); |
||||
Mutex::Lock _l(_termReason_m); |
||||
_termReason = ONE_UNRECOVERABLE_ERROR; |
||||
_fatalErrorMessage = tmp; |
||||
this->terminate(); |
||||
break; |
||||
} |
||||
*/ |
||||
} |
||||
})); |
||||
} |
||||
} |
||||
|
||||
} // namespace ZeroTier
|
||||
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright (c)2013-2021 ZeroTier, Inc. |
||||
* |
||||
* Use of this software is governed by the Business Source License included |
||||
* in the LICENSE.TXT file in the project's root directory. |
||||
* |
||||
* Change Date: 2026-01-01 |
||||
* |
||||
* On the date above, in accordance with the Business Source License, use |
||||
* of this software will be governed by version 2.0 of the Apache License. |
||||
*/ |
||||
/****/ |
||||
|
||||
#ifndef ZT_PACKET_MULTIPLEXER_HPP |
||||
#define ZT_PACKET_MULTIPLEXER_HPP |
||||
|
||||
#include "../osdep/BlockingQueue.hpp" |
||||
#include "MAC.hpp" |
||||
#include "Mutex.hpp" |
||||
#include "RuntimeEnvironment.hpp" |
||||
|
||||
#include <thread> |
||||
#include <vector> |
||||
|
||||
namespace ZeroTier { |
||||
|
||||
struct PacketRecord { |
||||
void* tPtr; |
||||
uint64_t nwid; |
||||
void** nuptr; |
||||
uint64_t source; |
||||
uint64_t dest; |
||||
unsigned int etherType; |
||||
unsigned int vlanId; |
||||
uint8_t data[ZT_MAX_MTU]; |
||||
unsigned int len; |
||||
unsigned int flowId; |
||||
}; |
||||
|
||||
class PacketMultiplexer { |
||||
public: |
||||
const RuntimeEnvironment* RR; |
||||
|
||||
PacketMultiplexer(const RuntimeEnvironment* renv); |
||||
|
||||
void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled); |
||||
|
||||
void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); |
||||
|
||||
std::vector<BlockingQueue<PacketRecord*>*> _rxPacketQueues; |
||||
|
||||
unsigned int _concurrency; |
||||
// pool
|
||||
std::vector<PacketRecord*> _rxPacketVector; |
||||
std::vector<std::thread> _rxPacketThreads; |
||||
Mutex _rxPacketVector_m, _rxPacketThreads_m; |
||||
|
||||
std::vector<std::thread> _rxThreads; |
||||
unsigned int _rxThreadCount; |
||||
bool _enabled; |
||||
}; |
||||
|
||||
} // namespace ZeroTier
|
||||
|
||||
#endif // ZT_PACKET_MULTIPLEXER_HPP
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue