@ -73,19 +73,6 @@ CV2::CV2(const Identity &myId, const char *path, int listenPort)
Utils : : unhex ( ssoPskHex , _ssoPsk , sizeof ( _ssoPsk ) ) ;
}
auto c = _pool - > borrow ( ) ;
pqxx : : work txn { * c - > c } ;
pqxx : : row r { txn . exec1 ( " SELECT version FROM ztc_database " ) } ;
int dbVersion = r [ 0 ] . as < int > ( ) ;
txn . commit ( ) ;
// if (dbVersion < DB_MINIMUM_VERSION) {
// fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
// exit(1);
// }
_pool - > unborrow ( c ) ;
_readyLock . lock ( ) ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download... " ZT_EOL_S , : : _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
@ -129,7 +116,7 @@ bool CV2::waitForReady()
bool CV2 : : isReady ( )
{
return ( _ready = = 2 ) & & _cldemote ;
return ( _ready = = 2 ) & & _connected ;
}
bool CV2 : : save ( nlohmann : : json & record , bool notifyListeners )
@ -388,9 +375,105 @@ AuthInfo CV2::getSSOAuthInfo(const nlohmann::json &member, const std::string &re
}
void CV2 : : initializeNetworks ( )
{
{ fprintf ( stderr , " Initializing networks... \n " ) ;
try {
// TODO: Update for CV2
char qbuf [ 2048 ] ;
sprintf ( qbuf , " SELECT id, name, description, configuration , created_at, last_modified, revision FROM networks WHERE controller_id = '%s' " , _myAddressStr . c_str ( ) ) ;
auto c = _pool - > borrow ( ) ;
pqxx : : work w ( * c - > c ) ;
fprintf ( stderr , " Load networks from psql... \n " ) ;
auto stream = pqxx : : stream_from : : query ( w , qbuf ) ;
std : : tuple <
std : : string // network ID
, std : : optional < std : : string > // name
, std : : optional < std : : string > // description
, std : : string // configuration
, std : : optional < uint64_t > // created_at
, std : : optional < uint64_t > // last_modified
, std : : optional < uint64_t > // revision
> row ;
uint64_t count = 0 ;
uint64_t total = 0 ;
while ( stream > > row ) {
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
json empty ;
json config ;
initNetwork ( config ) ;
std : : string nwid = std : : get < 0 > ( row ) ;
std : : string name = std : : get < 1 > ( row ) . value_or ( " " ) ;
std : : string description = std : : get < 2 > ( row ) . value_or ( " " ) ;
json cfgtmp = json : : parse ( std : : get < 3 > ( row ) ) ;
std : : optional < uint64_t > created_at = std : : get < 4 > ( row ) ;
std : : optional < uint64_t > last_modified = std : : get < 5 > ( row ) ;
std : : optional < uint64_t > revision = std : : get < 6 > ( row ) ;
config [ " id " ] = nwid ;
config [ " name " ] = name ;
config [ " description " ] = description ;
config [ " creationTime " ] = created_at . value_or ( 0 ) ;
config [ " lastModified " ] = last_modified . value_or ( 0 ) ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " capabilities " ] = cfgtmp [ " capabilities " ] . is_array ( ) ? cfgtmp [ " capabilities " ] : json : : array ( ) ;
config [ " enableBroadcast " ] = cfgtmp [ " enableBroadcast " ] . is_boolean ( ) ? cfgtmp [ " enableBroadcast " ] . get < bool > ( ) : false ;
config [ " mtu " ] = cfgtmp [ " mtu " ] . is_number ( ) ? cfgtmp [ " mtu " ] . get < int32_t > ( ) : 2800 ;
config [ " multicastLimit " ] = cfgtmp [ " multicastLimit " ] . is_number ( ) ? cfgtmp [ " multicastLimit " ] . get < int32_t > ( ) : 64 ;
config [ " private " ] = cfgtmp [ " private " ] . is_boolean ( ) ? cfgtmp [ " private " ] . get < bool > ( ) : true ;
config [ " remoteTraceLevel " ] = cfgtmp [ " remoteTraceLevel " ] . is_number ( ) ? cfgtmp [ " remoteTraceLevel " ] . get < int32_t > ( ) : 0 ;
config [ " remoteTraceTarget " ] = cfgtmp [ " remoteTraceTarget " ] . is_string ( ) ? cfgtmp [ " remoteTraceTarget " ] . get < std : : string > ( ) : nullptr ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " rules " ] = cfgtmp [ " rules " ] . is_array ( ) ? cfgtmp [ " rules " ] : json : : array ( ) ;
config [ " tags " ] = cfgtmp [ " tags " ] . is_array ( ) ? cfgtmp [ " tags " ] : json : : array ( ) ;
if ( cfgtmp [ " v4AssignMode " ] . is_object ( ) ) {
config [ " v4AssignMode " ] = cfgtmp [ " v4AssignMode " ] ;
} else {
config [ " v4AssignMode " ] = json : : object ( ) ;
config [ " v4AssignMode " ] [ " zt " ] = true ;
}
if ( cfgtmp [ " v6AssignMode " ] . is_object ( ) ) {
config [ " v6AssignMode " ] = cfgtmp [ " v6AssignMode " ] ;
} else {
config [ " v6AssignMode " ] = json : : object ( ) ;
config [ " v6AssignMode " ] [ " zt " ] = true ;
config [ " v6AssignMode " ] [ " 6plane " ] = true ;
config [ " v6AssignMode " ] [ " rfc4193 " ] = false ;
}
config [ " ssoEnabled " ] = cfgtmp [ " ssoEnabled " ] . is_boolean ( ) ? cfgtmp [ " ssoEnabled " ] . get < bool > ( ) : false ;
config [ " objtype " ] = " network " ;
config [ " routes " ] = cfgtmp [ " routes " ] . is_array ( ) ? cfgtmp [ " routes " ] : json : : array ( ) ;
config [ " clientId " ] = cfgtmp [ " clientId " ] . is_string ( ) ? cfgtmp [ " clientId " ] . get < std : : string > ( ) : nullptr ;
config [ " authorizationEndpoint " ] = cfgtmp [ " authorizationEndpoint " ] . is_string ( ) ? cfgtmp [ " authorizationEndpoint " ] . get < std : : string > ( ) : nullptr ;
config [ " provider " ] = cfgtmp [ " ssoProvider " ] . is_string ( ) ? cfgtmp [ " ssoProvider " ] . get < std : : string > ( ) : nullptr ;
if ( ! cfgtmp [ " dns " ] . is_object ( ) ) {
cfgtmp [ " dns " ] = json : : object ( ) ;
cfgtmp [ " dns " ] [ " domain " ] = " " ;
cfgtmp [ " dns " ] [ " servers " ] = json : : array ( ) ;
} else {
config [ " dns " ] = cfgtmp [ " dns " ] ;
}
config [ " ipAssignmentPools " ] = cfgtmp [ " assignmentPools " ] . is_array ( ) ? cfgtmp [ " assignmentPools " ] : json : : array ( ) ;
Metrics : : network_count + + ;
_networkChanged ( empty , config , false ) ;
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
auto dur = std : : chrono : : duration_cast < std : : chrono : : microseconds > ( end - start ) ; ;
total + = dur . count ( ) ;
+ + count ;
if ( count > 0 & & count % 10000 = = 0 ) {
fprintf ( stderr , " Averaging %lu us per network \n " , ( total / count ) ) ;
}
}
w . commit ( ) ;
_pool - > unborrow ( c ) ;
fprintf ( stderr , " done. \n " ) ;
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
@ -411,8 +494,123 @@ void CV2::initializeMembers()
std : : string memberId ;
std : : string networkId ;
try {
// TODO: Update for CV2
char qbuf [ 2048 ] ;
sprintf ( qbuf ,
" SELECT dn.device_id, dn.network_id, dn.authorized, dn.active_bridge, dn.ip_assignments, dn.no_auto_assign_ips, "
" dn.sso_exempt, authentication_expiry_time, dn.creation_time, dn.identity, dn.last_authorized_credential, "
" dn.last_authorized_time, dn.last_deauthorized_time, dn.remote_trace_level, dn.remote_trace_target, "
" dn.revision, dn.capabilities, dn.tags "
" FROM device_networks dn "
" INNER JOIN devices d "
" ON dn.device_id = d.id "
" INNER JOIN networks n "
" ON dn.network_id = n.id "
" WHERE n.controller_id = '%s' " , _myAddressStr . c_str ( ) ) ;
auto c = _pool - > borrow ( ) ;
pqxx : : work w ( * c - > c ) ;
fprintf ( stderr , " Load members from psql... \n " ) ;
auto stream = pqxx : : stream_from : : query ( w , qbuf ) ;
std : : tuple <
std : : string // device ID
, std : : string // network ID
, bool // authorized
, std : : optional < bool > // active_bridge
, std : : optional < std : : string > // ip_assignments
, std : : optional < bool > // no_auto_assign_ips
, std : : optional < bool > // sso_exempt
, std : : optional < uint64_t > // authentication_expiry_time
, std : : optional < uint64_t > // creation_time
, std : : string // identity
, std : : optional < std : : string > // last_authorized_credential
, std : : optional < uint64_t > // last_authorized_time
, std : : optional < uint64_t > // last_deauthorized_time
, std : : optional < int32_t > // remote_trace_level
, std : : optional < std : : string > // remote_trace_target
, std : : optional < uint64_t > // revision
, std : : optional < std : : string > // capabilities
, std : : optional < std : : string > // tags
> row ;
uint64_t count = 0 ;
uint64_t total = 0 ;
while ( stream > > row ) {
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
json empty ;
json config ;
initMember ( config ) ;
memberId = std : : get < 0 > ( row ) ;
networkId = std : : get < 1 > ( row ) ;
bool authorized = std : : get < 2 > ( row ) ;
std : : optional < bool > active_bridge = std : : get < 3 > ( row ) ;
std : : string ip_assignments = std : : get < 4 > ( row ) . value_or ( " " ) ;
std : : optional < bool > no_auto_assign_ips = std : : get < 5 > ( row ) ;
std : : optional < bool > sso_exempt = std : : get < 6 > ( row ) ;
std : : optional < uint64_t > authentication_expiry_time = std : : get < 7 > ( row ) ;
std : : optional < uint64_t > creation_time = std : : get < 8 > ( row ) ;
std : : string identity = std : : get < 9 > ( row ) ;
std : : optional < std : : string > last_authorized_credential = std : : get < 10 > ( row ) ;
std : : optional < uint64_t > last_authorized_time = std : : get < 11 > ( row ) ;
std : : optional < uint64_t > last_deauthorized_time = std : : get < 12 > ( row ) ;
std : : optional < int32_t > remote_trace_level = std : : get < 13 > ( row ) ;
std : : optional < std : : string > remote_trace_target = std : : get < 14 > ( row ) ;
std : : optional < uint64_t > revision = std : : get < 15 > ( row ) ;
std : : optional < std : : string > capabilities = std : : get < 16 > ( row ) ;
std : : optional < std : : string > tags = std : : get < 17 > ( row ) ;
config [ " objtype " ] = " member " ;
config [ " id " ] = memberId ;
config [ " address " ] = identity ;
config [ " nwid " ] = networkId ;
config [ " authorized " ] = authorized ;
config [ " activeBridge " ] = active_bridge . value_or ( false ) ;
config [ " ipAssignments " ] = json : : array ( ) ;
if ( ip_assignments ! = " {} " ) {
std : : string tmp = ip_assignments . substr ( 1 , ip_assignments . length ( ) - 2 ) ;
std : : vector < std : : string > addrs = split ( tmp , ' , ' ) ;
for ( auto it = addrs . begin ( ) ; it ! = addrs . end ( ) ; + + it ) {
config [ " ipAssignments " ] . push_back ( * it ) ;
}
}
config [ " capabilities " ] = json : : parse ( capabilities . value_or ( " [] " ) ) ;
config [ " creationTime " ] = creation_time . value_or ( 0 ) ;
config [ " lastAuthorizedCredential " ] = last_authorized_credential . value_or ( " " ) ;
config [ " lastAuthorizedTime " ] = last_authorized_time . value_or ( 0 ) ;
config [ " lastDeauthorizedTime " ] = last_deauthorized_time . value_or ( 0 ) ;
config [ " noAutoAssignIPs " ] = no_auto_assign_ips . value_or ( false ) ;
config [ " remoteTraceLevel " ] = remote_trace_level . value_or ( 0 ) ;
config [ " remoteTraceTarget " ] = remote_trace_target . value_or ( nullptr ) ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " ssoExempt " ] = sso_exempt . value_or ( false ) ;
config [ " authenticationExpiryTime " ] = authentication_expiry_time . value_or ( 0 ) ;
config [ " tags " ] = json : : parse ( tags . value_or ( " [] " ) ) ;
Metrics : : member_count + + ;
_memberChanged ( empty , config , false ) ;
memberId = " " ;
networkId = " " ;
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
auto dur = std : : chrono : : duration_cast < std : : chrono : : microseconds > ( end - start ) ;
total + = dur . count ( ) ;
+ + count ;
if ( count > 0 & & count % 10000 = = 0 ) {
fprintf ( stderr , " Averaging %lu us per member \n " , ( total / count ) ) ;
}
}
if ( count > 0 ) {
fprintf ( stderr , " Took %lu us per member to load \n " , ( total / count ) ) ;
}
stream . complete ( ) ;
w . commit ( ) ;
_pool - > unborrow ( c ) ;
fprintf ( stderr , " done. \n " ) ;
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
@ -445,10 +643,37 @@ void CV2::heartbeat()
const char * publicIdentity = publicId ;
const char * hostname = hostnameTmp ;
// TODO: Update for CV2
while ( _run = = 1 ) {
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 5 ) ) ;
auto c = _pool - > borrow ( ) ;
int64_t ts = OSUtils : : now ( ) ;
if ( c - > c ) {
std : : string major = std : : to_string ( ZEROTIER_ONE_VERSION_MAJOR ) ;
std : : string minor = std : : to_string ( ZEROTIER_ONE_VERSION_MINOR ) ;
std : : string rev = std : : to_string ( ZEROTIER_ONE_VERSION_REVISION ) ;
std : : string version = major + " . " + minor + " . " + rev ;
std : : string versionStr = " v " + version ;
try {
pqxx : : work w { * c - > c } ;
w . exec_params0 ( " INSERT INTO controller (id, hostname, last_heartbeat, public_identity, version) VALUES "
" ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5) "
" ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = EXCLUDED.last_heartbeat, "
" public_identity = EXCLUDED.public_identity, version = EXCLUDED.version " ,
controllerId , hostname , ts , publicIdentity , versionStr ) ;
w . commit ( ) ;
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error in heartbeat: %s \n " , e . what ( ) ) ;
continue ;
} catch ( . . . ) {
fprintf ( stderr , " ERROR: Unknown error in heartbeat \n " ) ;
continue ;
}
}
_pool - > unborrow ( c ) ;
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 1 ) ) ;
}
fprintf ( stderr , " Exited heartbeat thread \n " ) ;
}
@ -490,7 +715,223 @@ void CV2::networksDbWatcher()
void CV2 : : commitThread ( )
{
// TODO: Update for CV2
fprintf ( stderr , " %s: commitThread start \n " , _myAddressStr . c_str ( ) ) ;
std : : pair < nlohmann : : json , bool > qitem ;
while ( _commitQueue . get ( qitem ) & & ( _run = = 1 ) ) {
//fprintf(stderr, "commitThread tick\n");
if ( ! qitem . first . is_object ( ) ) {
fprintf ( stderr , " not an object \n " ) ;
continue ;
}
std : : shared_ptr < PostgresConnection > c ;
try {
c = _pool - > borrow ( ) ;
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %s \n " , e . what ( ) ) ;
continue ;
}
if ( ! c ) {
fprintf ( stderr , " Error getting database connection \n " ) ;
continue ;
}
Metrics : : pgsql_commit_ticks + + ;
try {
nlohmann : : json & config = ( qitem . first ) ;
const std : : string objtype = config [ " objtype " ] ;
if ( objtype = = " member " ) {
// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
std : : string memberId ;
std : : string networkId ;
try {
pqxx : : work w ( * c - > c ) ;
memberId = config [ " id " ] ;
networkId = config [ " nwid " ] ;
std : : string target = " NULL " ;
if ( ! config [ " remoteTraceTarget " ] . is_null ( ) ) {
target = config [ " remoteTraceTarget " ] ;
}
pqxx : : row nwrow = w . exec_params1 ( " SELECT COUNT(id) FROM networks WHERE id = $1 " , networkId ) ;
int nwcount = nwrow [ 0 ] . as < int > ( ) ;
if ( nwcount ! = 1 ) {
fprintf ( stderr , " network %s does not exist. skipping member upsert \n " , networkId . c_str ( ) ) ;
w . abort ( ) ;
_pool - > unborrow ( c ) ;
continue ;
}
// only needed for hooks, and no hooks for now
// pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM device_networks WHERE device_id = $1 AND network_id = $2", memberId, networkId);
// int membercount = mrow[0].as<int>();
// bool isNewMember = (membercount == 0);
pqxx : : result res = w . exec_params0 (
" INSERT INTO devices (id, version_major, version_minor, version_revision, version_protocol) "
" VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO UPDATE SET "
" version_major = EXCLUDED.version_major, version_minor = EXCLUDED.version_minor, "
" version_revision = EXCLUDED.version_revision, version_protocol = EXCLUDED.version_protocol " ,
memberId ,
( int ) config [ " vMajor " ] ,
( int ) config [ " vMinor " ] ,
( int ) config [ " vRev " ] ,
( int ) config [ " vProto " ] ) ;
res = w . exec_params0 (
" INSERT INTO device_networks (device_id, network_id, authorized, active_bridge, ip_assignments, "
" no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, creation_time, "
" identity, last_authorized_credential, last_authorized_time, last_deauthorized_time, "
" remote_trace_level, remote_trace_target, revision, tags) "
" VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, "
" TO_TIMESTAMP($10::double precision/1000), $11, 12, TO_TIMESTAMP($13::double precision/1000), "
" TO_TIMESTAMP($14::double precision/1000), $15, $16, $17, $18) "
" ON CONFLICT (device_id, network_id) DO UPDATE SET "
" authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, "
" ip_assignments = EXCLUDED.ip_assignments, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
" sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = EXCLUDED.authentication_expiry_time, "
" capabilities = EXCLUDED.capabilities, creation_time = EXCLUDED.creation_time, "
" identity = EXCLUDED.identity, last_authorized_credential = EXCLUDED.last_authorized_credential, "
" last_authorized_time = EXCLUDED.last_authorized_time, last_deauthorized_time = EXCLUDED.last_deauthorized_time, "
" remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
" revision = EXCLUDED.revision, tags = EXCLUDED.tags " ,
memberId ,
networkId ,
( bool ) config [ " authorized " ] ,
( bool ) config [ " activeBridge " ] ,
config [ " ipAssignments " ] . get < std : : vector < std : : string > > ( ) ,
( bool ) config [ " noAutoAssignIps " ] ,
( bool ) config [ " ssoExempt " ] ,
( uint64_t ) config [ " authenticationExpiryTime " ] ,
OSUtils : : jsonDump ( config [ " capabilities " ] , - 1 ) ,
( uint64_t ) config [ " creationTime " ] ,
OSUtils : : jsonString ( config [ " identity " ] , " " ) ,
OSUtils : : jsonString ( config [ " lastAuthorizedCredential " ] , " " ) ,
( uint64_t ) config [ " lastAuthorizedTime " ] ,
( uint64_t ) config [ " lastDeauthorizedTime " ] ,
( int ) config [ " remoteTraceLevel " ] ,
target ,
( uint64_t ) config [ " revision " ] ,
OSUtils : : jsonDump ( config [ " tags " ] , - 1 ) ) ;
w . commit ( ) ;
// No hooks for now
// if (_smee != NULL && isNewMember) {
// pqxx::row row = w.exec_params1(
// "SELECT "
// " count(h.hook_id) "
// "FROM "
// " ztc_hook h "
// " INNER JOIN ztc_org o ON o.org_id = h.org_id "
// " INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
// " WHERE "
// "n.id = $1 ",
// networkId
// );
// int64_t hookCount = row[0].as<int64_t>();
// if (hookCount > 0) {
// notifyNewMember(networkId, memberId);
// }
// }
const uint64_t nwidInt = OSUtils : : jsonIntHex ( config [ " nwid " ] , 0ULL ) ;
const uint64_t memberidInt = OSUtils : : jsonIntHex ( config [ " id " ] , 0ULL ) ;
if ( nwidInt & & memberidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json memOrig ;
nlohmann : : json memNew ( config ) ;
get ( nwidInt , nwOrig , memberidInt , memOrig ) ;
_memberChanged ( memOrig , memNew , qitem . second ) ;
} else {
fprintf ( stderr , " %s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu \n " , _myAddressStr . c_str ( ) , ( unsigned long long ) nwidInt , ( unsigned long long ) memberidInt ) ;
}
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error updating member %s-%s: %s \n " , _myAddressStr . c_str ( ) , networkId . c_str ( ) , memberId . c_str ( ) , e . what ( ) ) ;
}
} else if ( objtype = = " network " ) {
try {
// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
pqxx : : work w ( * c - > c ) ;
std : : string id = config [ " id " ] ;
// network must already exist
pqxx : : result res = w . exec_params0 (
" UPDATE networks SET configuration = $1, revision = $2 WHERE id = $3 " ,
OSUtils : : jsonDump ( config , - 1 ) ,
( uint64_t ) config [ " revision " ] ,
id
) ;
w . commit ( ) ;
const uint64_t nwidInt = OSUtils : : jsonIntHex ( config [ " nwid " ] , 0ULL ) ;
if ( nwidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json nwNew ( config ) ;
get ( nwidInt , nwOrig ) ;
_networkChanged ( nwOrig , nwNew , qitem . second ) ;
} else {
fprintf ( stderr , " %s: Can't notify network changed: %llu \n " , _myAddressStr . c_str ( ) , ( unsigned long long ) nwidInt ) ;
}
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error updating network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
} else if ( objtype = = " _delete_network " ) {
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
try {
// don't think we need this. Deletion handled by CV2 API
// pqxx::work w(*c->c);
// std::string networkId = config["nwid"];
// pqxx::result res = w.exec_params0("UPDATE ztc_network SET deleted = true WHERE id = $1",
// networkId);
// w.commit();
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error deleting network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
} else if ( objtype = = " _delete_member " ) {
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
try {
// don't think we need this. Deletion handled by CV2 API
// pqxx::work w(*c->c);
// std::string memberId = config["id"];
// std::string networkId = config["nwid"];
// pqxx::result res = w.exec_params0(
// "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2",
// memberId, networkId);
// w.commit();
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error deleting member: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
} else {
fprintf ( stderr , " %s ERROR: unknown objtype \n " , _myAddressStr . c_str ( ) ) ;
}
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error getting objtype: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
}
_pool - > unborrow ( c ) ;
c . reset ( ) ;
}
fprintf ( stderr , " %s commitThread finished \n " , _myAddressStr . c_str ( ) ) ;
}
void CV2 : : onlineNotificationThread ( ) {
@ -500,7 +941,85 @@ void CV2::onlineNotificationThread() {
nlohmann : : json jtmp1 , jtmp2 ;
while ( _run = = 1 ) {
// TODO: Update for CV2
auto c = _pool - > borrow ( ) ;
auto c2 = _pool - > borrow ( ) ;
try {
fprintf ( stderr , " %s onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
pqxx : : work w ( * c - > c ) ;
pqxx : : work w2 ( * c2 - > c ) ;
bool firstRun = true ;
bool memberAdded = false ;
uint64_t updateCount = 0 ;
pqxx : : pipeline pipe ( w ) ;
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
updateCount + + ;
uint64_t nwid_i = i - > first . first ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , i - > first . second ) ;
if ( ! get ( nwid_i , jtmp1 , i - > first . second , jtmp2 ) ) {
continue ; // skip non existent networks/members
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
try {
pqxx : : row r = w2 . exec_params1 ( " SELECT device_id, network_id FROM device_networks WHERE network_id = $1 AND device_id = $2 " ,
networkId , memberId ) ;
} catch ( pqxx : : unexpected_rows & e ) {
continue ;
}
int64_t ts = i - > second . first ;
std : : string ipAddr = i - > second . second . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
json record = {
{ ipAddr , ts } ,
} ;
// upsert into devices table
std : : string device_insert = " INSERT INTO devices (id, last_seen) VALUES (' " + w2 . esc ( memberId ) + " ', ' " + w2 . esc ( record . dump ( ) ) + " '::JSONB) "
" ON CONFLICT (id) DO UPDATE SET last_seen = last_seen || EXCLUDED.last_seen " ;
pipe . insert ( device_insert ) ;
std : : string device_network_insert = " INSERT INTO device_networks (device_id, network_id, last_seen) "
" VALUES (' " + w2 . esc ( memberId ) + " ', ' " + w2 . esc ( networkId ) + " ', ' " + w2 . esc ( record . dump ( ) ) + " '::JSONB) "
" ON CONFLICT (device_id, network_id) DO UPDATE SET last_seen = last_seen || EXCLUDED.last_seen " ;
pipe . insert ( device_network_insert ) ;
Metrics : : pgsql_node_checkin + + ;
}
pipe . complete ( ) ; ;
w2 . commit ( ) ;
w . commit ( ) ;
fprintf ( stderr , " %s: Updated online status of %lu members \n " , _myAddressStr . c_str ( ) , updateCount ) ;
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s ERROR: Error in onlineNotificationThread: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
} catch ( . . . ) {
fprintf ( stderr , " %s ERROR: Unknown error in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
}
_pool - > unborrow ( c2 ) ;
_pool - > unborrow ( c ) ;
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 10 ) ) ;
}
fprintf ( stderr , " %s: Fell out of run loop in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;