/*
* Copyright ( c ) 2025 ZeroTier , Inc .
*
* Use of this software is governed by the Business Source License included
* in the LICENSE . TXT file in the project ' s root directory .
*
* Change Date : 2026 - 01 - 01
*
* On the date above , in accordance with the Business Source License , use
* of this software will be governed by version 2.0 of the Apache License .
*/
/****/
# include "CV2.hpp"
# ifdef ZT_CONTROLLER_USE_LIBPQ
# include "../node/Constants.hpp"
# include "../node/SHA512.hpp"
# include "../version.h"
# include "CtlUtil.hpp"
# include "EmbeddedNetworkController.hpp"
# include <chrono>
# include <climits>
# include <iomanip>
# include <libpq-fe.h>
# include <sstream>
using json = nlohmann : : json ;
namespace {
}
using namespace ZeroTier ;
CV2 : : CV2 ( const Identity & myId , const char * path , int listenPort ) : DB ( ) , _pool ( ) , _myId ( myId ) , _myAddress ( myId . address ( ) ) , _ready ( 0 ) , _connected ( 1 ) , _run ( 1 ) , _waitNoticePrinted ( false ) , _listenPort ( listenPort )
{
fprintf ( stderr , " CV2::CV2 \n " ) ;
char myAddress [ 64 ] ;
_myAddressStr = myId . address ( ) . toString ( myAddress ) ;
_connString = std : : string ( path ) ;
auto f = std : : make_shared < PostgresConnFactory > ( _connString ) ;
_pool = std : : make_shared < ConnectionPool < PostgresConnection > > ( 15 , 5 , std : : static_pointer_cast < ConnectionFactory > ( f ) ) ;
memset ( _ssoPsk , 0 , sizeof ( _ssoPsk ) ) ;
char * const ssoPskHex = getenv ( " ZT_SSO_PSK " ) ;
# ifdef ZT_TRACE
fprintf ( stderr , " ZT_SSO_PSK: %s \n " , ssoPskHex ) ;
# endif
if ( ssoPskHex ) {
// SECURITY: note that ssoPskHex will always be null-terminated if libc actually
// returns something non-NULL. If the hex encodes something shorter than 48 bytes,
// it will be padded at the end with zeroes. If longer, it'll be truncated.
Utils : : unhex ( ssoPskHex , _ssoPsk , sizeof ( _ssoPsk ) ) ;
}
_readyLock . lock ( ) ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download... " ZT_EOL_S , : : _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
_waitNoticePrinted = true ;
initializeNetworks ( ) ;
initializeMembers ( ) ;
_heartbeatThread = std : : thread ( & CV2 : : heartbeat , this ) ;
_membersDbWatcher = std : : thread ( & CV2 : : membersDbWatcher , this ) ;
_networksDbWatcher = std : : thread ( & CV2 : : networksDbWatcher , this ) ;
for ( int i = 0 ; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS ; + + i ) {
_commitThread [ i ] = std : : thread ( & CV2 : : commitThread , this ) ;
}
_onlineNotificationThread = std : : thread ( & CV2 : : onlineNotificationThread , this ) ;
}
CV2 : : ~ CV2 ( )
{
_run = 0 ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
_heartbeatThread . join ( ) ;
_membersDbWatcher . join ( ) ;
_networksDbWatcher . join ( ) ;
_commitQueue . stop ( ) ;
for ( int i = 0 ; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS ; + + i ) {
_commitThread [ i ] . join ( ) ;
}
_onlineNotificationThread . join ( ) ;
}
bool CV2 : : waitForReady ( )
{
while ( _ready < 2 ) {
_readyLock . lock ( ) ;
_readyLock . unlock ( ) ;
}
return true ;
}
bool CV2 : : isReady ( )
{
return ( _ready = = 2 ) & & _connected ;
}
bool CV2 : : save ( nlohmann : : json & record , bool notifyListeners )
{
bool modified = false ;
try {
if ( ! record . is_object ( ) ) {
fprintf ( stderr , " record is not an object?!? \n " ) ;
return false ;
}
const std : : string objtype = record [ " objtype " ] ;
if ( objtype = = " network " ) {
// fprintf(stderr, "network save\n");
const uint64_t nwid = OSUtils : : jsonIntHex ( record [ " id " ] , 0ULL ) ;
if ( nwid ) {
nlohmann : : json old ;
get ( nwid , old ) ;
if ( ( ! old . is_object ( ) ) | | ( ! _compareRecords ( old , record ) ) ) {
record [ " revision " ] = OSUtils : : jsonInt ( record [ " revision " ] , 0ULL ) + 1ULL ;
_commitQueue . post ( std : : pair < nlohmann : : json , bool > ( record , notifyListeners ) ) ;
modified = true ;
}
}
}
else if ( objtype = = " member " ) {
std : : string networkId = record [ " nwid " ] ;
std : : string memberId = record [ " id " ] ;
const uint64_t nwid = OSUtils : : jsonIntHex ( record [ " nwid " ] , 0ULL ) ;
const uint64_t id = OSUtils : : jsonIntHex ( record [ " id " ] , 0ULL ) ;
// fprintf(stderr, "member save %s-%s\n", networkId.c_str(), memberId.c_str());
if ( ( id ) & & ( nwid ) ) {
nlohmann : : json network , old ;
get ( nwid , network , id , old ) ;
if ( ( ! old . is_object ( ) ) | | ( ! _compareRecords ( old , record ) ) ) {
// fprintf(stderr, "commit queue post\n");
record [ " revision " ] = OSUtils : : jsonInt ( record [ " revision " ] , 0ULL ) + 1ULL ;
_commitQueue . post ( std : : pair < nlohmann : : json , bool > ( record , notifyListeners ) ) ;
modified = true ;
}
else {
// fprintf(stderr, "no change\n");
}
}
}
else {
fprintf ( stderr , " uhh waaat \n " ) ;
}
}
catch ( std : : exception & e ) {
fprintf ( stderr , " Error on PostgreSQL::save: %s \n " , e . what ( ) ) ;
}
catch ( . . . ) {
fprintf ( stderr , " Unknown error on PostgreSQL::save \n " ) ;
}
return modified ;
}
void CV2 : : eraseNetwork ( const uint64_t networkId )
{
fprintf ( stderr , " PostgreSQL::eraseNetwork \n " ) ;
char tmp2 [ 24 ] ;
waitForReady ( ) ;
Utils : : hex ( networkId , tmp2 ) ;
std : : pair < nlohmann : : json , bool > tmp ;
tmp . first [ " id " ] = tmp2 ;
tmp . first [ " objtype " ] = " _delete_network " ;
tmp . second = true ;
_commitQueue . post ( tmp ) ;
nlohmann : : json nullJson ;
_networkChanged ( tmp . first , nullJson , true ) ;
}
void CV2 : : eraseMember ( const uint64_t networkId , const uint64_t memberId )
{
fprintf ( stderr , " PostgreSQL::eraseMember \n " ) ;
char tmp2 [ 24 ] ;
waitForReady ( ) ;
std : : pair < nlohmann : : json , bool > tmp , nw ;
Utils : : hex ( networkId , tmp2 ) ;
tmp . first [ " nwid " ] = tmp2 ;
Utils : : hex ( memberId , tmp2 ) ;
tmp . first [ " id " ] = tmp2 ;
tmp . first [ " objtype " ] = " _delete_member " ;
tmp . second = true ;
_commitQueue . post ( tmp ) ;
nlohmann : : json nullJson ;
_memberChanged ( tmp . first , nullJson , true ) ;
}
void CV2 : : nodeIsOnline ( const uint64_t networkId , const uint64_t memberId , const InetAddress & physicalAddress , const char * osArch )
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
NodeOnlineRecord & i = _lastOnline [ std : : pair < uint64_t , uint64_t > ( networkId , memberId ) ] ;
i . lastSeen = OSUtils : : now ( ) ;
if ( physicalAddress ) {
i . physicalAddress = physicalAddress ;
}
i . osArch = std : : string ( osArch ) ;
}
void CV2 : : nodeIsOnline ( const uint64_t networkId , const uint64_t memberId , const InetAddress & physicalAddress )
{
this - > nodeIsOnline ( networkId , memberId , physicalAddress , " unknown/unknown " ) ;
}
AuthInfo CV2 : : getSSOAuthInfo ( const nlohmann : : json & member , const std : : string & redirectURL )
{
// TODO: Redo this for CV2
Metrics : : db_get_sso_info + + ;
// NONCE is just a random character string. no semantic meaning
// state = HMAC SHA384 of Nonce based on shared sso key
//
// need nonce timeout in database? make sure it's used within X time
// X is 5 minutes for now. Make configurable later?
//
// how do we tell when a nonce is used? if auth_expiration_time is set
std : : string networkId = member [ " nwid " ] ;
std : : string memberId = member [ " id " ] ;
char authenticationURL [ 4096 ] = { 0 } ;
AuthInfo info ;
info . enabled = true ;
// if (memberId == "a10dccea52" && networkId == "8056c2e21c24673d") {
// fprintf(stderr, "invalid authinfo for grant's machine\n");
// info.version=1;
// return info;
// }
// fprintf(stderr, "PostgreSQL::updateMemberOnLoad: %s-%s\n", networkId.c_str(), memberId.c_str());
std : : shared_ptr < PostgresConnection > c ;
try {
// c = _pool->borrow();
// pqxx::work w(*c->c);
// char nonceBytes[16] = {0};
// std::string nonce = "";
// // check if the member exists first.
// pqxx::row count = w.exec_params1("SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, networkId);
// if (count[0].as<int>() == 1) {
// // get active nonce, if exists.
// pqxx::result r = w.exec_params("SELECT nonce FROM ztc_sso_expiry "
// "WHERE network_id = $1 AND member_id = $2 "
// "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
// networkId, memberId);
// if (r.size() == 0) {
// // no active nonce.
// // find an unused nonce, if one exists.
// pqxx::result r = w.exec_params("SELECT nonce FROM ztc_sso_expiry "
// "WHERE network_id = $1 AND member_id = $2 "
// "AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
// networkId, memberId);
// if (r.size() == 1) {
// // we have an existing nonce. Use it
// nonce = r.at(0)[0].as<std::string>();
// Utils::unhex(nonce.c_str(), nonceBytes, sizeof(nonceBytes));
// } else if (r.empty()) {
// // create a nonce
// Utils::getSecureRandom(nonceBytes, 16);
// char nonceBuf[64] = {0};
// Utils::hex(nonceBytes, sizeof(nonceBytes), nonceBuf);
// nonce = std::string(nonceBuf);
// pqxx::result ir = w.exec_params0("INSERT INTO ztc_sso_expiry "
// "(nonce, nonce_expiration, network_id, member_id) VALUES "
// "($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)",
// nonce, OSUtils::now() + 300000, networkId, memberId);
// w.commit();
// } else {
// // > 1 ?!? Thats an error!
// fprintf(stderr, "> 1 unused nonce!\n");
// exit(6);
// }
// } else if (r.size() == 1) {
// nonce = r.at(0)[0].as<std::string>();
// Utils::unhex(nonce.c_str(), nonceBytes, sizeof(nonceBytes));
// } else {
// // more than 1 nonce in use? Uhhh...
// fprintf(stderr, "> 1 nonce in use for network member?!?\n");
// exit(7);
// }
// r = w.exec_params(
// "SELECT oc.client_id, oc.authorization_endpoint, oc.issuer, oc.provider, oc.sso_impl_version "
// "FROM ztc_network AS n "
// "INNER JOIN ztc_org o "
// " ON o.owner_id = n.owner_id "
// "LEFT OUTER JOIN ztc_network_oidc_config noc "
// " ON noc.network_id = n.id "
// "LEFT OUTER JOIN ztc_oidc_config oc "
// " ON noc.client_id = oc.client_id AND oc.org_id = o.org_id "
// "WHERE n.id = $1 AND n.sso_enabled = true", networkId);
// std::string client_id = "";
// std::string authorization_endpoint = "";
// std::string issuer = "";
// std::string provider = "";
// uint64_t sso_version = 0;
// if (r.size() == 1) {
// client_id = r.at(0)[0].as<std::optional<std::string>>().value_or("");
// authorization_endpoint = r.at(0)[1].as<std::optional<std::string>>().value_or("");
// issuer = r.at(0)[2].as<std::optional<std::string>>().value_or("");
// provider = r.at(0)[3].as<std::optional<std::string>>().value_or("");
// sso_version = r.at(0)[4].as<std::optional<uint64_t>>().value_or(1);
// } else if (r.size() > 1) {
// fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", networkId.c_str());
// } else {
// fprintf(stderr, "No client or auth endpoint?!?\n");
// }
// info.version = sso_version;
// // no catch all else because we don't actually care if no records exist here. just continue as normal.
// if ((!client_id.empty())&&(!authorization_endpoint.empty())) {
// uint8_t state[48];
// HMACSHA384(_ssoPsk, nonceBytes, sizeof(nonceBytes), state);
// char state_hex[256];
// Utils::hex(state, 48, state_hex);
// if (info.version == 0) {
// char url[2048] = {0};
// OSUtils::ztsnprintf(url, sizeof(authenticationURL),
// "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s",
// authorization_endpoint.c_str(),
// url_encode(redirectURL).c_str(),
// nonce.c_str(),
// state_hex,
// client_id.c_str());
// info.authenticationURL = std::string(url);
// } else if (info.version == 1) {
// info.ssoClientID = client_id;
// info.issuerURL = issuer;
// info.ssoProvider = provider;
// info.ssoNonce = nonce;
// info.ssoState = std::string(state_hex) + "_" +networkId;
// info.centralAuthURL = redirectURL;
// #ifdef ZT_DEBUG
// fprintf(
// stderr,
// "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: %s\n",
// info.ssoClientID.c_str(),
// info.issuerURL.c_str(),
// info.ssoNonce.c_str(),
// info.ssoState.c_str(),
// info.centralAuthURL.c_str(),
// provider.c_str());
// #endif
// }
// } else {
// fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str());
// }
// }
// _pool->unborrow(c);
}
catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error updating member on load for network %s: %s \n " , networkId . c_str ( ) , e . what ( ) ) ;
}
return info ; // std::string(authenticationURL);
}
void CV2 : : initializeNetworks ( )
{
fprintf ( stderr , " Initializing networks... \n " ) ;
try {
char qbuf [ 2048 ] ;
sprintf (
qbuf ,
" SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000)::bigint, "
" (EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision "
" FROM networks_ctl WHERE controller_id = '%s' " ,
_myAddressStr . c_str ( ) ) ;
auto c = _pool - > borrow ( ) ;
pqxx : : work w ( * c - > c ) ;
fprintf ( stderr , " Load networks from psql... \n " ) ;
auto stream = pqxx : : stream_from : : query ( w , qbuf ) ;
std : : tuple <
std : : string // network ID
,
std : : optional < std : : string > // name
,
std : : string // configuration
,
std : : optional < uint64_t > // creation_time
,
std : : optional < uint64_t > // last_modified
,
std : : optional < uint64_t > // revision
>
row ;
uint64_t count = 0 ;
uint64_t total = 0 ;
while ( stream > > row ) {
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
json empty ;
json config ;
initNetwork ( config ) ;
std : : string nwid = std : : get < 0 > ( row ) ;
std : : string name = std : : get < 1 > ( row ) . value_or ( " " ) ;
json cfgtmp = json : : parse ( std : : get < 2 > ( row ) ) ;
std : : optional < uint64_t > created_at = std : : get < 3 > ( row ) ;
std : : optional < uint64_t > last_modified = std : : get < 4 > ( row ) ;
std : : optional < uint64_t > revision = std : : get < 5 > ( row ) ;
config [ " id " ] = nwid ;
config [ " name " ] = name ;
config [ " creationTime " ] = created_at . value_or ( 0 ) ;
config [ " lastModified " ] = last_modified . value_or ( 0 ) ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " capabilities " ] = cfgtmp [ " capabilities " ] . is_array ( ) ? cfgtmp [ " capabilities " ] : json : : array ( ) ;
config [ " enableBroadcast " ] = cfgtmp [ " enableBroadcast " ] . is_boolean ( ) ? cfgtmp [ " enableBroadcast " ] . get < bool > ( ) : false ;
config [ " mtu " ] = cfgtmp [ " mtu " ] . is_number ( ) ? cfgtmp [ " mtu " ] . get < int32_t > ( ) : 2800 ;
config [ " multicastLimit " ] = cfgtmp [ " multicastLimit " ] . is_number ( ) ? cfgtmp [ " multicastLimit " ] . get < int32_t > ( ) : 64 ;
config [ " private " ] = cfgtmp [ " private " ] . is_boolean ( ) ? cfgtmp [ " private " ] . get < bool > ( ) : true ;
config [ " remoteTraceLevel " ] = cfgtmp [ " remoteTraceLevel " ] . is_number ( ) ? cfgtmp [ " remoteTraceLevel " ] . get < int32_t > ( ) : 0 ;
config [ " remoteTraceTarget " ] = cfgtmp [ " remoteTraceTarget " ] . is_string ( ) ? cfgtmp [ " remoteTraceTarget " ] . get < std : : string > ( ) : " " ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " rules " ] = cfgtmp [ " rules " ] . is_array ( ) ? cfgtmp [ " rules " ] : json : : array ( ) ;
config [ " tags " ] = cfgtmp [ " tags " ] . is_array ( ) ? cfgtmp [ " tags " ] : json : : array ( ) ;
if ( cfgtmp [ " v4AssignMode " ] . is_object ( ) ) {
config [ " v4AssignMode " ] = cfgtmp [ " v4AssignMode " ] ;
}
else {
config [ " v4AssignMode " ] = json : : object ( ) ;
config [ " v4AssignMode " ] [ " zt " ] = true ;
}
if ( cfgtmp [ " v6AssignMode " ] . is_object ( ) ) {
config [ " v6AssignMode " ] = cfgtmp [ " v6AssignMode " ] ;
}
else {
config [ " v6AssignMode " ] = json : : object ( ) ;
config [ " v6AssignMode " ] [ " zt " ] = true ;
config [ " v6AssignMode " ] [ " 6plane " ] = true ;
config [ " v6AssignMode " ] [ " rfc4193 " ] = false ;
}
config [ " ssoEnabled " ] = cfgtmp [ " ssoEnabled " ] . is_boolean ( ) ? cfgtmp [ " ssoEnabled " ] . get < bool > ( ) : false ;
config [ " objtype " ] = " network " ;
config [ " routes " ] = cfgtmp [ " routes " ] . is_array ( ) ? cfgtmp [ " routes " ] : json : : array ( ) ;
config [ " clientId " ] = cfgtmp [ " clientId " ] . is_string ( ) ? cfgtmp [ " clientId " ] . get < std : : string > ( ) : " " ;
config [ " authorizationEndpoint " ] = cfgtmp [ " authorizationEndpoint " ] . is_string ( ) ? cfgtmp [ " authorizationEndpoint " ] . get < std : : string > ( ) : nullptr ;
config [ " provider " ] = cfgtmp [ " ssoProvider " ] . is_string ( ) ? cfgtmp [ " ssoProvider " ] . get < std : : string > ( ) : " " ;
if ( ! cfgtmp [ " dns " ] . is_object ( ) ) {
cfgtmp [ " dns " ] = json : : object ( ) ;
cfgtmp [ " dns " ] [ " domain " ] = " " ;
cfgtmp [ " dns " ] [ " servers " ] = json : : array ( ) ;
}
else {
config [ " dns " ] = cfgtmp [ " dns " ] ;
}
config [ " ipAssignmentPools " ] = cfgtmp [ " ipAssignmentPools " ] . is_array ( ) ? cfgtmp [ " ipAssignmentPools " ] : json : : array ( ) ;
Metrics : : network_count + + ;
_networkChanged ( empty , config , false ) ;
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
auto dur = std : : chrono : : duration_cast < std : : chrono : : microseconds > ( end - start ) ;
;
total + = dur . count ( ) ;
+ + count ;
if ( count > 0 & & count % 10000 = = 0 ) {
fprintf ( stderr , " Averaging %lu us per network \n " , ( total / count ) ) ;
}
}
w . commit ( ) ;
_pool - > unborrow ( c ) ;
fprintf ( stderr , " done. \n " ) ;
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
}
fprintf ( stderr , " network init done \n " ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error initializing networks: %s \n " , e . what ( ) ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 5000 ) ) ;
exit ( - 1 ) ;
}
}
void CV2 : : initializeMembers ( )
{
std : : string memberId ;
std : : string networkId ;
try {
char qbuf [ 2048 ] ;
sprintf (
qbuf ,
" SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, nm.no_auto_assign_ips, "
" nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, "
" (EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, "
" (EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
" (EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
" nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags "
" FROM network_memberships_ctl nm "
" INNER JOIN networks_ctl n "
" ON nm.network_id = n.id "
" WHERE n.controller_id = '%s' " ,
_myAddressStr . c_str ( ) ) ;
auto c = _pool - > borrow ( ) ;
pqxx : : work w ( * c - > c ) ;
fprintf ( stderr , " Load members from psql... \n " ) ;
auto stream = pqxx : : stream_from : : query ( w , qbuf ) ;
std : : tuple <
std : : string // device ID
,
std : : string // network ID
,
bool // authorized
,
std : : optional < bool > // active_bridge
,
std : : optional < std : : string > // ip_assignments
,
std : : optional < bool > // no_auto_assign_ips
,
std : : optional < bool > // sso_exempt
,
std : : optional < uint64_t > // authentication_expiry_time
,
std : : optional < uint64_t > // creation_time
,
std : : optional < std : : string > // identity
,
std : : optional < uint64_t > // last_authorized_time
,
std : : optional < uint64_t > // last_deauthorized_time
,
std : : optional < int32_t > // remote_trace_level
,
std : : optional < std : : string > // remote_trace_target
,
std : : optional < uint64_t > // revision
,
std : : optional < std : : string > // capabilities
,
std : : optional < std : : string > // tags
>
row ;
uint64_t count = 0 ;
uint64_t total = 0 ;
while ( stream > > row ) {
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
json empty ;
json config ;
initMember ( config ) ;
memberId = std : : get < 0 > ( row ) ;
networkId = std : : get < 1 > ( row ) ;
bool authorized = std : : get < 2 > ( row ) ;
std : : optional < bool > active_bridge = std : : get < 3 > ( row ) ;
std : : string ip_assignments = std : : get < 4 > ( row ) . value_or ( " " ) ;
std : : optional < bool > no_auto_assign_ips = std : : get < 5 > ( row ) ;
std : : optional < bool > sso_exempt = std : : get < 6 > ( row ) ;
std : : optional < uint64_t > authentication_expiry_time = std : : get < 7 > ( row ) ;
std : : optional < uint64_t > creation_time = std : : get < 8 > ( row ) ;
std : : optional < std : : string > identity = std : : get < 9 > ( row ) ;
std : : optional < uint64_t > last_authorized_time = std : : get < 10 > ( row ) ;
std : : optional < uint64_t > last_deauthorized_time = std : : get < 11 > ( row ) ;
std : : optional < int32_t > remote_trace_level = std : : get < 12 > ( row ) ;
std : : optional < std : : string > remote_trace_target = std : : get < 13 > ( row ) ;
std : : optional < uint64_t > revision = std : : get < 14 > ( row ) ;
std : : optional < std : : string > capabilities = std : : get < 15 > ( row ) ;
std : : optional < std : : string > tags = std : : get < 16 > ( row ) ;
config [ " objtype " ] = " member " ;
config [ " id " ] = memberId ;
config [ " address " ] = identity . value_or ( " " ) ;
config [ " nwid " ] = networkId ;
config [ " authorized " ] = authorized ;
config [ " activeBridge " ] = active_bridge . value_or ( false ) ;
config [ " ipAssignments " ] = json : : array ( ) ;
if ( ip_assignments ! = " {} " ) {
std : : string tmp = ip_assignments . substr ( 1 , ip_assignments . length ( ) - 2 ) ;
std : : vector < std : : string > addrs = split ( tmp , ' , ' ) ;
for ( auto it = addrs . begin ( ) ; it ! = addrs . end ( ) ; + + it ) {
config [ " ipAssignments " ] . push_back ( * it ) ;
}
}
config [ " capabilities " ] = json : : parse ( capabilities . value_or ( " [] " ) ) ;
config [ " creationTime " ] = creation_time . value_or ( 0 ) ;
config [ " lastAuthorizedTime " ] = last_authorized_time . value_or ( 0 ) ;
config [ " lastDeauthorizedTime " ] = last_deauthorized_time . value_or ( 0 ) ;
config [ " noAutoAssignIPs " ] = no_auto_assign_ips . value_or ( false ) ;
config [ " remoteTraceLevel " ] = remote_trace_level . value_or ( 0 ) ;
config [ " remoteTraceTarget " ] = remote_trace_target . value_or ( nullptr ) ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " ssoExempt " ] = sso_exempt . value_or ( false ) ;
config [ " authenticationExpiryTime " ] = authentication_expiry_time . value_or ( 0 ) ;
config [ " tags " ] = json : : parse ( tags . value_or ( " [] " ) ) ;
Metrics : : member_count + + ;
_memberChanged ( empty , config , false ) ;
memberId = " " ;
networkId = " " ;
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
auto dur = std : : chrono : : duration_cast < std : : chrono : : microseconds > ( end - start ) ;
total + = dur . count ( ) ;
+ + count ;
if ( count > 0 & & count % 10000 = = 0 ) {
fprintf ( stderr , " Averaging %lu us per member \n " , ( total / count ) ) ;
}
}
if ( count > 0 ) {
fprintf ( stderr , " Took %lu us per member to load \n " , ( total / count ) ) ;
}
stream . complete ( ) ;
w . commit ( ) ;
_pool - > unborrow ( c ) ;
fprintf ( stderr , " done. \n " ) ;
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
}
fprintf ( stderr , " member init done \n " ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error initializing member: %s-%s %s \n " , networkId . c_str ( ) , memberId . c_str ( ) , e . what ( ) ) ;
exit ( - 1 ) ;
}
}
void CV2 : : heartbeat ( )
{
char publicId [ 1024 ] ;
char hostnameTmp [ 1024 ] ;
_myId . toString ( false , publicId ) ;
if ( gethostname ( hostnameTmp , sizeof ( hostnameTmp ) ) ! = 0 ) {
hostnameTmp [ 0 ] = ( char ) 0 ;
}
else {
for ( int i = 0 ; i < ( int ) sizeof ( hostnameTmp ) ; + + i ) {
if ( ( hostnameTmp [ i ] = = ' . ' ) | | ( hostnameTmp [ i ] = = 0 ) ) {
hostnameTmp [ i ] = ( char ) 0 ;
break ;
}
}
}
const char * controllerId = _myAddressStr . c_str ( ) ;
const char * publicIdentity = publicId ;
const char * hostname = hostnameTmp ;
while ( _run = = 1 ) {
auto c = _pool - > borrow ( ) ;
int64_t ts = OSUtils : : now ( ) ;
if ( c - > c ) {
std : : string major = std : : to_string ( ZEROTIER_ONE_VERSION_MAJOR ) ;
std : : string minor = std : : to_string ( ZEROTIER_ONE_VERSION_MINOR ) ;
std : : string rev = std : : to_string ( ZEROTIER_ONE_VERSION_REVISION ) ;
std : : string version = major + " . " + minor + " . " + rev ;
std : : string versionStr = " v " + version ;
try {
pqxx : : work w { * c - > c } ;
w . exec_params0 (
" INSERT INTO controllers_ctl (id, hostname, last_heartbeat, public_identity, version) VALUES "
" ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5) "
" ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = EXCLUDED.last_heartbeat, "
" public_identity = EXCLUDED.public_identity, version = EXCLUDED.version " ,
controllerId ,
hostname ,
ts ,
publicIdentity ,
versionStr ) ;
w . commit ( ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error in heartbeat: %s \n " , e . what ( ) ) ;
continue ;
}
catch ( . . . ) {
fprintf ( stderr , " ERROR: Unknown error in heartbeat \n " ) ;
continue ;
}
}
_pool - > unborrow ( c ) ;
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 1 ) ) ;
}
fprintf ( stderr , " Exited heartbeat thread \n " ) ;
}
void CV2 : : membersDbWatcher ( )
{
auto c = _pool - > borrow ( ) ;
std : : string stream = " member_ " + _myAddressStr ;
fprintf ( stderr , " Listening to member stream: %s \n " , stream . c_str ( ) ) ;
MemberNotificationReceiver m ( this , * c - > c , stream ) ;
while ( _run = = 1 ) {
c - > c - > await_notification ( 5 , 0 ) ;
}
_pool - > unborrow ( c ) ;
fprintf ( stderr , " Exited membersDbWatcher \n " ) ;
}
void CV2 : : networksDbWatcher ( )
{
std : : string stream = " network_ " + _myAddressStr ;
fprintf ( stderr , " Listening to member stream: %s \n " , stream . c_str ( ) ) ;
auto c = _pool - > borrow ( ) ;
NetworkNotificationReceiver n ( this , * c - > c , stream ) ;
while ( _run = = 1 ) {
c - > c - > await_notification ( 5 , 0 ) ;
}
_pool - > unborrow ( c ) ;
fprintf ( stderr , " Exited networksDbWatcher \n " ) ;
}
void CV2 : : commitThread ( )
{
fprintf ( stderr , " %s: commitThread start \n " , _myAddressStr . c_str ( ) ) ;
std : : pair < nlohmann : : json , bool > qitem ;
while ( _commitQueue . get ( qitem ) & & ( _run = = 1 ) ) {
// fprintf(stderr, "commitThread tick\n");
if ( ! qitem . first . is_object ( ) ) {
fprintf ( stderr , " not an object \n " ) ;
continue ;
}
std : : shared_ptr < PostgresConnection > c ;
try {
c = _pool - > borrow ( ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %s \n " , e . what ( ) ) ;
continue ;
}
if ( ! c ) {
fprintf ( stderr , " Error getting database connection \n " ) ;
continue ;
}
Metrics : : pgsql_commit_ticks + + ;
try {
nlohmann : : json & config = ( qitem . first ) ;
const std : : string objtype = config [ " objtype " ] ;
if ( objtype = = " member " ) {
// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
std : : string memberId ;
std : : string networkId ;
try {
pqxx : : work w ( * c - > c ) ;
memberId = config [ " id " ] ;
networkId = config [ " nwid " ] ;
std : : string target = " NULL " ;
if ( ! config [ " remoteTraceTarget " ] . is_null ( ) ) {
target = config [ " remoteTraceTarget " ] ;
}
pqxx : : row nwrow = w . exec_params1 ( " SELECT COUNT(id) FROM networks WHERE id = $1 " , networkId ) ;
int nwcount = nwrow [ 0 ] . as < int > ( ) ;
if ( nwcount ! = 1 ) {
fprintf ( stderr , " network %s does not exist. skipping member upsert \n " , networkId . c_str ( ) ) ;
w . abort ( ) ;
_pool - > unborrow ( c ) ;
continue ;
}
// only needed for hooks, and no hooks for now
// pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM device_networks WHERE device_id = $1 AND network_id = $2", memberId, networkId);
// int membercount = mrow[0].as<int>();
// bool isNewMember = (membercount == 0);
pqxx : : result res = w . exec_params0 (
" INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, ip_assignments, "
" no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, creation_time, "
" identity, last_authorized_time, last_deauthorized_time, "
" remote_trace_level, remote_trace_target, revision, tags, version_major, version_minor, "
" version_revision, version_protocol) "
" VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, "
" TO_TIMESTAMP($10::double precision/1000), $11, TO_TIMESTAMP($12::double precision/1000), "
" TO_TIMESTAMP($13::double precision/1000), $14, $15, $16, $17, $18, $19, $20, $21) "
" ON CONFLICT (device_id, network_id) DO UPDATE SET "
" authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, "
" ip_assignments = EXCLUDED.ip_assignments, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
" sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = EXCLUDED.authentication_expiry_time, "
" capabilities = EXCLUDED.capabilities, creation_time = EXCLUDED.creation_time, "
" identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
" last_deauthorized_time = EXCLUDED.last_deauthorized_time, "
" remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
" revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = EXCLUDED.version_major, "
" version_minor = EXCLUDED.version_minor, version_revision = EXCLUDED.version_revision, "
" version_protocol = EXCLUDED.version_protocol " ,
memberId ,
networkId ,
( bool ) config [ " authorized " ] ,
( bool ) config [ " activeBridge " ] ,
config [ " ipAssignments " ] . get < std : : vector < std : : string > > ( ) ,
( bool ) config [ " noAutoAssignIps " ] ,
( bool ) config [ " ssoExempt " ] ,
( uint64_t ) config [ " authenticationExpiryTime " ] ,
OSUtils : : jsonDump ( config [ " capabilities " ] , - 1 ) ,
( uint64_t ) config [ " creationTime " ] ,
OSUtils : : jsonString ( config [ " identity " ] , " " ) ,
( uint64_t ) config [ " lastAuthorizedTime " ] ,
( uint64_t ) config [ " lastDeauthorizedTime " ] ,
( int ) config [ " remoteTraceLevel " ] ,
target ,
( uint64_t ) config [ " revision " ] ,
OSUtils : : jsonDump ( config [ " tags " ] , - 1 ) ,
( int ) config [ " vMajor " ] ,
( int ) config [ " vMinor " ] ,
( int ) config [ " vRev " ] ,
( int ) config [ " vProto " ] ) ;
w . commit ( ) ;
// No hooks for now
// if (_smee != NULL && isNewMember) {
// pqxx::row row = w.exec_params1(
// "SELECT "
// " count(h.hook_id) "
// "FROM "
// " ztc_hook h "
// " INNER JOIN ztc_org o ON o.org_id = h.org_id "
// " INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
// " WHERE "
// "n.id = $1 ",
// networkId
// );
// int64_t hookCount = row[0].as<int64_t>();
// if (hookCount > 0) {
// notifyNewMember(networkId, memberId);
// }
// }
const uint64_t nwidInt = OSUtils : : jsonIntHex ( config [ " nwid " ] , 0ULL ) ;
const uint64_t memberidInt = OSUtils : : jsonIntHex ( config [ " id " ] , 0ULL ) ;
if ( nwidInt & & memberidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json memOrig ;
nlohmann : : json memNew ( config ) ;
get ( nwidInt , nwOrig , memberidInt , memOrig ) ;
_memberChanged ( memOrig , memNew , qitem . second ) ;
}
else {
fprintf ( stderr , " %s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu \n " , _myAddressStr . c_str ( ) , ( unsigned long long ) nwidInt , ( unsigned long long ) memberidInt ) ;
}
}
catch ( pqxx : : data_exception & e ) {
std : : string cfgDump = OSUtils : : jsonDump ( config , 2 ) ;
fprintf ( stderr , " Member save %s-%s: %s \n " , networkId . c_str ( ) , memberId . c_str ( ) , cfgDump . c_str ( ) ) ;
const pqxx : : sql_error * s = dynamic_cast < const pqxx : : sql_error * > ( & e ) ;
fprintf ( stderr , " %s ERROR: Error updating member: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
if ( s ) {
fprintf ( stderr , " %s ERROR: SQL error: %s \n " , _myAddressStr . c_str ( ) , s - > query ( ) . c_str ( ) ) ;
}
}
catch ( std : : exception & e ) {
std : : string cfgDump = OSUtils : : jsonDump ( config , 2 ) ;
fprintf ( stderr , " %s ERROR: Error updating member %s-%s: %s \n jsonDump: %s \n " , _myAddressStr . c_str ( ) , networkId . c_str ( ) , memberId . c_str ( ) , e . what ( ) , cfgDump . c_str ( ) ) ;
}
}
else if ( objtype = = " network " ) {
try {
// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
pqxx : : work w ( * c - > c ) ;
std : : string id = config [ " id " ] ;
// network must already exist
pqxx : : result res = w . exec_params0 (
" INSERT INTO networks_ctl (id, name, configuration, controller_id, revision) "
" VALUES ($1, $2, $3, $4, $5) "
" ON CONFLICT (id) DO UPDATE SET "
" name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1 " ,
id ,
OSUtils : : jsonString ( config [ " name " ] , " " ) ,
OSUtils : : jsonDump ( config , - 1 ) ,
_myAddressStr ,
( ( uint64_t ) config [ " revision " ] ) ) ;
w . commit ( ) ;
const uint64_t nwidInt = OSUtils : : jsonIntHex ( config [ " nwid " ] , 0ULL ) ;
if ( nwidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json nwNew ( config ) ;
get ( nwidInt , nwOrig ) ;
_networkChanged ( nwOrig , nwNew , qitem . second ) ;
}
else {
fprintf ( stderr , " %s: Can't notify network changed: %llu \n " , _myAddressStr . c_str ( ) , ( unsigned long long ) nwidInt ) ;
}
}
catch ( pqxx : : data_exception & e ) {
const pqxx : : sql_error * s = dynamic_cast < const pqxx : : sql_error * > ( & e ) ;
fprintf ( stderr , " %s ERROR: Error updating network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
if ( s ) {
fprintf ( stderr , " %s ERROR: SQL error: %s \n " , _myAddressStr . c_str ( ) , s - > query ( ) . c_str ( ) ) ;
}
}
catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error updating network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
}
else if ( objtype = = " _delete_network " ) {
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
try {
// don't think we need this. Deletion handled by CV2 API
pqxx : : work w ( * c - > c ) ;
std : : string networkId = config [ " id " ] ;
w . exec_params0 ( " DELETE FROM network_memberships_ctl WHERE network_id = $1 " , networkId ) ;
w . exec_params0 ( " DELETE FROM networks_ctl WHERE id = $1 " , networkId ) ;
w . commit ( ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error deleting network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
}
else if ( objtype = = " _delete_member " ) {
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
try {
pqxx : : work w ( * c - > c ) ;
std : : string memberId = config [ " id " ] ;
std : : string networkId = config [ " nwid " ] ;
pqxx : : result res = w . exec_params0 ( " DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2 " , memberId , networkId ) ;
w . commit ( ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error deleting member: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
}
else {
fprintf ( stderr , " %s ERROR: unknown objtype \n " , _myAddressStr . c_str ( ) ) ;
}
}
catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error getting objtype: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
_pool - > unborrow ( c ) ;
c . reset ( ) ;
}
fprintf ( stderr , " %s commitThread finished \n " , _myAddressStr . c_str ( ) ) ;
}
void CV2 : : onlineNotificationThread ( )
{
waitForReady ( ) ;
_connected = 1 ;
nlohmann : : json jtmp1 , jtmp2 ;
while ( _run = = 1 ) {
auto c = _pool - > borrow ( ) ;
auto c2 = _pool - > borrow ( ) ;
try {
fprintf ( stderr , " %s onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
std : : unordered_map < std : : pair < uint64_t , uint64_t > , NodeOnlineRecord , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
pqxx : : work w ( * c - > c ) ;
pqxx : : work w2 ( * c2 - > c ) ;
bool firstRun = true ;
bool memberAdded = false ;
uint64_t updateCount = 0 ;
pqxx : : pipeline pipe ( w ) ;
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
updateCount + + ;
uint64_t nwid_i = i - > first . first ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , i - > first . second ) ;
if ( ! get ( nwid_i , jtmp1 , i - > first . second , jtmp2 ) ) {
continue ; // skip non existent networks/members
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
try {
pqxx : : row r = w2 . exec_params1 ( " SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = $1 AND device_id = $2 " , networkId , memberId ) ;
}
catch ( pqxx : : unexpected_rows & e ) {
continue ;
}
int64_t ts = i - > second . lastSeen ;
std : : string ipAddr = i - > second . physicalAddress . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
std : : string osArch = i - > second . osArch ;
std : : vector < std : : string > osArchSplit = split ( osArch , ' / ' ) ;
std : : string os = osArchSplit [ 0 ] ;
std : : string arch = osArchSplit [ 1 ] ;
if ( ipAddr . empty ( ) ) {
ipAddr = " relayed " ;
}
json record = {
{ ipAddr , ts } ,
} ;
std : : string device_network_insert = " INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
" VALUES (' "
+ w2 . esc ( memberId ) + " ', ' " + w2 . esc ( networkId ) + " ', ' " + w2 . esc ( record . dump ( ) )
+ " '::JSONB, "
" ' "
+ w2 . esc ( os ) + " ', ' " + w2 . esc ( arch )
+ " ') "
" ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
" last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen " ;
pipe . insert ( device_network_insert ) ;
Metrics : : pgsql_node_checkin + + ;
}
pipe . complete ( ) ;
;
w2 . commit ( ) ;
w . commit ( ) ;
fprintf ( stderr , " %s: Updated online status of %lu members \n " , _myAddressStr . c_str ( ) , updateCount ) ;
}
catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error in onlineNotificationThread: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
catch ( . . . ) {
fprintf ( stderr , " %s ERROR: Unknown error in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
}
_pool - > unborrow ( c2 ) ;
_pool - > unborrow ( c ) ;
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 10 ) ) ;
}
fprintf ( stderr , " %s: Fell out of run loop in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s onlineNotificationThread should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 6 ) ;
}
}
# endif // ZT_CONTROLLER_USE_LIBPQ