@ -22,6 +22,7 @@
# include <libpq-fe.h>
# include <sstream>
# include <climits>
using json = nlohmann : : json ;
@ -113,11 +114,8 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
fprintf ( stderr , " Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance " , DB_MINIMUM_VERSION ) ;
exit ( 1 ) ;
}
PQclear ( res ) ;
res = NULL ;
PQfinish ( conn ) ;
conn = NULL ;
if ( _rc ! = NULL ) {
sw : : redis : : ConnectionOptions opts ;
@ -137,6 +135,16 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
}
_readyLock . lock ( ) ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download... " ZT_EOL_S , : : _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
_waitNoticePrinted = true ;
initializeNetworks ( conn ) ;
initializeMembers ( conn ) ;
PQfinish ( conn ) ;
conn = NULL ;
_heartbeatThread = std : : thread ( & PostgreSQL : : heartbeat , this ) ;
_membersDbWatcher = std : : thread ( & PostgreSQL : : membersDbWatcher , this ) ;
_networksDbWatcher = std : : thread ( & PostgreSQL : : networksDbWatcher , this ) ;
@ -165,10 +173,6 @@ PostgreSQL::~PostgreSQL()
bool PostgreSQL : : waitForReady ( )
{
while ( _ready < 2 ) {
if ( ! _waitNoticePrinted ) {
_waitNoticePrinted = true ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download... " ZT_EOL_S , : : _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . lock ( ) ;
_readyLock . unlock ( ) ;
}
@ -229,12 +233,15 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
tmp . first [ " objtype " ] = " _delete_network " ;
tmp . second = true ;
_commitQueue . post ( tmp ) ;
nlohmann : : json nullJson ;
_networkChanged ( tmp . first , nullJson , true ) ;
}
void PostgreSQL : : eraseMember ( const uint64_t networkId , const uint64_t memberId )
{
char tmp2 [ 24 ] ;
std : : pair < nlohmann : : json , bool > tmp ;
waitForReady ( ) ;
std : : pair < nlohmann : : json , bool > tmp , nw ;
Utils : : hex ( networkId , tmp2 ) ;
tmp . first [ " nwid " ] = tmp2 ;
Utils : : hex ( memberId , tmp2 ) ;
@ -242,6 +249,8 @@ void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
tmp . first [ " objtype " ] = " _delete_member " ;
tmp . second = true ;
_commitQueue . post ( tmp ) ;
nlohmann : : json nullJson ;
_memberChanged ( tmp . first , nullJson , true ) ;
}
void PostgreSQL : : nodeIsOnline ( const uint64_t networkId , const uint64_t memberId , const InetAddress & physicalAddress )
@ -261,11 +270,30 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
fprintf ( stderr , " Bad Database Connection: %s " , PQerrorMessage ( conn ) ) ;
exit ( 1 ) ;
}
std : : string setKey = " networks:{ " + _myAddressStr + " } " ;
if ( _rc ! = NULL ) {
try {
if ( _rc - > clusterMode ) {
_cluster - > del ( setKey ) ;
} else {
_redis - > del ( setKey ) ;
}
} catch ( sw : : redis : : Error & e ) {
// del can throw an error if the key doesn't exist
// swallow it and move along
}
}
std : : unordered_set < std : : string > networkSet ;
const char * params [ 1 ] = {
_myAddressStr . c_str ( )
} ;
fprintf ( stderr , " Initializing Networks... \n " ) ;
PGresult * res = PQexecParams ( conn , " SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
" enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
" remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
@ -291,9 +319,12 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
const char * nwidparam [ 1 ] = {
PQgetvalue ( res , i , 0 )
} ;
std : : string nwid = PQgetvalue ( res , i , 0 ) ;
networkSet . insert ( nwid ) ;
config [ " id " ] = PQgetvalue ( res , i , 0 ) ;
config [ " nwid " ] = PQgetvalue ( res , i , 0 ) ;
config [ " id " ] = nwid ;
config [ " nwid " ] = nwid ;
try {
config [ " creationTime " ] = std : : stoull ( PQgetvalue ( res , i , 1 ) ) ;
} catch ( std : : exception & e ) {
@ -406,14 +437,29 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
PQclear ( res ) ;
if ( ! networkSet . empty ( ) ) {
if ( _rc & & _rc - > clusterMode ) {
auto tx = _cluster - > transaction ( _myAddressStr , true ) ;
tx . sadd ( setKey , networkSet . begin ( ) , networkSet . end ( ) ) ;
tx . exec ( ) ;
} else if ( _rc & & ! _rc - > clusterMode ) {
auto tx = _redis - > transaction ( true ) ;
tx . sadd ( setKey , networkSet . begin ( ) , networkSet . end ( ) ) ;
tx . exec ( ) ;
}
}
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error initializing networks in Redis: %s \n " , e . what ( ) ) ;
exit ( - 1 ) ;
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error initializing networks: %s " , e . what ( ) ) ;
fprintf ( stderr , " ERROR: Error initializing networks: %s \n " , e . what ( ) ) ;
exit ( - 1 ) ;
}
}
@ -425,11 +471,44 @@ void PostgreSQL::initializeMembers(PGconn *conn)
fprintf ( stderr , " Bad Database Connection: %s " , PQerrorMessage ( conn ) ) ;
exit ( 1 ) ;
}
std : : string setKeyBase = " network-nodes-all:{ " + _myAddressStr + " }: " ;
if ( _rc ! = NULL ) {
std : : lock_guard < std : : mutex > l ( _networks_l ) ;
std : : unordered_set < std : : string > deletes ;
for ( auto it : _networks ) {
uint64_t nwid_i = it . first ;
char nwidTmp [ 64 ] = { 0 } ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
std : : string nwid ( nwidTmp ) ;
std : : string key = setKeyBase + nwid ;
deletes . insert ( key ) ;
}
if ( ! deletes . empty ( ) ) {
if ( _rc - > clusterMode ) {
auto tx = _cluster - > transaction ( _myAddressStr , true ) ;
for ( std : : string k : deletes ) {
tx . del ( k ) ;
}
tx . exec ( ) ;
} else {
auto tx = _redis - > transaction ( true ) ;
for ( std : : string k : deletes ) {
tx . del ( k ) ;
}
tx . exec ( ) ;
}
}
}
const char * params [ 1 ] = {
_myAddressStr . c_str ( )
} ;
std : : unordered_map < std : : string , std : : string > networkMembers ;
fprintf ( stderr , " Initializing Members... \n " ) ;
PGresult * res = PQexecParams ( conn ,
" SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
" EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
@ -460,6 +539,9 @@ void PostgreSQL::initializeMembers(PGconn *conn)
std : : string memberId ( PQgetvalue ( res , i , 0 ) ) ;
std : : string networkId ( PQgetvalue ( res , i , 1 ) ) ;
networkMembers . insert ( std : : pair < std : : string , std : : string > ( setKeyBase + networkId , memberId ) ) ;
std : : string ctime = PQgetvalue ( res , i , 5 ) ;
config [ " id " ] = memberId ;
config [ " nwid " ] = networkId ;
@ -565,12 +647,31 @@ void PostgreSQL::initializeMembers(PGconn *conn)
PQclear ( res ) ;
if ( ! networkMembers . empty ( ) ) {
if ( _rc ! = NULL ) {
if ( _rc - > clusterMode ) {
auto tx = _cluster - > transaction ( _myAddressStr , true ) ;
for ( auto it : networkMembers ) {
tx . sadd ( it . first , it . second ) ;
}
tx . exec ( ) ;
} else {
auto tx = _redis - > transaction ( true ) ;
for ( auto it : networkMembers ) {
tx . sadd ( it . first , it . second ) ;
}
tx . exec ( ) ;
}
}
}
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error initializing members (redis): %s \n " , e . what ( ) ) ;
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error initializing members: %s \n " , e . what ( ) ) ;
exit ( - 1 ) ;
@ -608,12 +709,13 @@ void PostgreSQL::heartbeat()
PQfinish ( conn ) ;
exit ( 6 ) ;
}
int64_t ts = OSUtils : : now ( ) ;
if ( conn ) {
std : : string major = std : : to_string ( ZEROTIER_ONE_VERSION_MAJOR ) ;
std : : string minor = std : : to_string ( ZEROTIER_ONE_VERSION_MINOR ) ;
std : : string rev = std : : to_string ( ZEROTIER_ONE_VERSION_REVISION ) ;
std : : string build = std : : to_string ( ZEROTIER_ONE_VERSION_BUILD ) ;
std : : string now = std : : to_string ( OSU tils : : now ( ) ) ;
std : : string now = std : : to_string ( ts ) ;
std : : string host_port = std : : to_string ( _listenPort ) ;
std : : string use_redis = ( _rc ! = NULL ) ? " true " : " false " ;
const char * values [ 10 ] = {
@ -630,7 +732,7 @@ void PostgreSQL::heartbeat()
} ;
PGresult * res = PQexecParams ( conn ,
" INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port,use_redis) "
" INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) "
" VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
" ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
" public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
@ -648,6 +750,13 @@ void PostgreSQL::heartbeat()
}
PQclear ( res ) ;
}
if ( _rc ! = NULL ) {
if ( _rc - > clusterMode ) {
_cluster - > zadd ( " controllers " , controllerId , ts ) ;
} else {
_redis - > zadd ( " controllers " , controllerId , ts ) ;
}
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
}
@ -666,8 +775,6 @@ void PostgreSQL::membersDbWatcher()
exit ( 1 ) ;
}
initializeMembers ( conn ) ;
if ( _rc ) {
PQfinish ( conn ) ;
conn = NULL ;
@ -731,50 +838,54 @@ void PostgreSQL::_membersWatcher_Redis() {
std : : string key = " member-stream:{ " + std : : string ( _myAddress . toString ( buf ) ) + " } " ;
while ( _run = = 1 ) {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
_cluster - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
} else {
_redis - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
# ifdef ZT_TRACE
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
# endif
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
# ifdef ZT_TRACE
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
# endif
for ( auto a : attrs ) {
# ifdef ZT_TRACE
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
# endif
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_memberChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
try {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
_cluster - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
} else {
_redis - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
# ifdef ZT_TRACE
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
# endif
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
# ifdef ZT_TRACE
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
# endif
for ( auto a : attrs ) {
# ifdef ZT_TRACE
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
# endif
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_memberChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
}
}
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
}
}
}
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in Redis members watcher: %s \n " , e . what ( ) ) ;
}
}
fprintf ( stderr , " membersWatcher ended \n " ) ;
@ -789,8 +900,6 @@ void PostgreSQL::networksDbWatcher()
exit ( 1 ) ;
}
initializeNetworks ( conn ) ;
if ( _rc ) {
PQfinish ( conn ) ;
conn = NULL ;
@ -852,51 +961,55 @@ void PostgreSQL::_networksWatcher_Redis() {
std : : string key = " network-stream:{ " + std : : string ( _myAddress . toString ( buf ) ) + " } " ;
while ( _run = = 1 ) {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
_cluster - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
} else {
_redis - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
try {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
_cluster - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
} else {
_redis - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
# ifdef ZT_TRACE
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
# endif
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
# ifdef ZT_TRACE
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
# endif
for ( auto a : attrs ) {
for ( auto a : attrs ) {
# ifdef ZT_TRACE
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
# endif
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_networkChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_networkChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
}
}
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
}
}
}
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in Redis networks watcher: %s \n " , e . what ( ) ) ;
}
}
fprintf ( stderr , " networksWatcher ended \n " ) ;
@ -1332,6 +1445,20 @@ void PostgreSQL::commitThread()
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error updating member: %s \n " , e . what ( ) ) ;
}
if ( _rc ! = NULL ) {
try {
std : : string id = ( * config ) [ " id " ] ;
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string key = " networks:{ " + controllerId + " } " ;
if ( _rc - > clusterMode ) {
_cluster - > sadd ( key , id ) ;
} else {
_redis - > sadd ( key , id ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error adding network to Redis: %s \n " , e . what ( ) ) ;
}
}
} else if ( objtype = = " _delete_network " ) {
try {
std : : string networkId = ( * config ) [ " nwid " ] ;
@ -1355,6 +1482,22 @@ void PostgreSQL::commitThread()
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error deleting network: %s \n " , e . what ( ) ) ;
}
if ( _rc ! = NULL ) {
try {
std : : string id = ( * config ) [ " id " ] ;
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string key = " networks:{ " + controllerId + " } " ;
if ( _rc - > clusterMode ) {
_cluster - > srem ( key , id ) ;
_cluster - > del ( " network-nodes-online:{ " + controllerId + " }: " + id ) ;
} else {
_redis - > srem ( key , id ) ;
_redis - > del ( " network-nodes-online:{ " + controllerId + " }: " + id ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error adding network to Redis: %s \n " , e . what ( ) ) ;
}
}
} else if ( objtype = = " _delete_member " ) {
try {
std : : string memberId = ( * config ) [ " id " ] ;
@ -1382,6 +1525,23 @@ void PostgreSQL::commitThread()
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: Error deleting member: %s \n " , e . what ( ) ) ;
}
if ( _rc ! = NULL ) {
try {
std : : string memberId = ( * config ) [ " id " ] ;
std : : string networkId = ( * config ) [ " nwid " ] ;
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string key = " network-nodes-all:{ " + controllerId + " }: " + networkId ;
if ( _rc - > clusterMode ) {
_cluster - > srem ( key , memberId ) ;
_cluster - > del ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId ) ;
} else {
_redis - > srem ( key , memberId ) ;
_redis - > del ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error deleting member from Redis: %s \n " , e . what ( ) ) ;
}
}
} else {
fprintf ( stderr , " ERROR: unknown objtype " ) ;
}
@ -1401,6 +1561,17 @@ void PostgreSQL::commitThread()
}
void PostgreSQL : : onlineNotificationThread ( )
{
waitForReady ( ) ;
if ( _rc ! = NULL ) {
onlineNotification_Redis ( ) ;
} else {
onlineNotification_Postgres ( ) ;
}
}
void PostgreSQL : : onlineNotification_Postgres ( )
{
PGconn * conn = getPgConn ( ) ;
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
@ -1410,9 +1581,7 @@ void PostgreSQL::onlineNotificationThread()
}
_connected = 1 ;
//int64_t lastUpdatedNetworkStatus = 0;
std : : unordered_map < std : : pair < uint64_t , uint64_t > , int64_t , _PairHasher > lastOnlineCumulative ;
nlohmann : : json jtmp1 , jtmp2 ;
while ( _run = = 1 ) {
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " ERROR: Online Notification thread lost connection to Postgres. " ) ;
@ -1420,9 +1589,6 @@ void PostgreSQL::onlineNotificationThread()
exit ( 5 ) ;
}
// map used to send notifications to front end
std : : unordered_map < std : : string , std : : vector < std : : string > > updateMap ;
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
@ -1443,20 +1609,13 @@ void PostgreSQL::onlineNotificationThread()
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , i - > first . second ) ;
auto found = _networks . find ( nwid_i ) ;
if ( found = = _networks . end ( ) ) {
continue ; // skip members trying to join non-existant networks
if ( ! get ( nwid_i , jtmp1 , i - > first . second , jtmp2 ) ) {
continue ; // skip non existent networks/members
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
std : : vector < std : : string > & members = updateMap [ networkId ] ;
members . push_back ( memberId ) ;
lastOnlineCumulative [ i - > first ] = i - > second . first ;
const char * qvals [ 2 ] = {
networkId . c_str ( ) ,
memberId . c_str ( )
@ -1516,7 +1675,7 @@ void PostgreSQL::onlineNotificationThread()
PQclear ( res ) ;
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
}
fprintf ( stderr , " %s: Fell out of run loop in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
PQfinish ( conn ) ;
@ -1526,6 +1685,95 @@ void PostgreSQL::onlineNotificationThread()
}
}
void PostgreSQL : : onlineNotification_Redis ( )
{
_connected = 1 ;
char buf [ 11 ] = { 0 } ;
std : : string controllerId = std : : string ( _myAddress . toString ( buf ) ) ;
while ( _run = = 1 ) {
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
try {
if ( ! lastOnline . empty ( ) ) {
if ( _rc - > clusterMode ) {
auto tx = _cluster - > transaction ( controllerId , true ) ;
_doRedisUpdate ( tx , controllerId , lastOnline ) ;
} else {
auto tx = _redis - > transaction ( true ) ;
_doRedisUpdate ( tx , controllerId , lastOnline ) ;
}
}
} catch ( sw : : redis : : Error & e ) {
# ifdef ZT_TRACE
fprintf ( stderr , " Error in online notification thread (redis): %s \n " , e . what ( ) ) ;
# endif
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
}
}
void PostgreSQL : : _doRedisUpdate ( sw : : redis : : Transaction & tx , std : : string & controllerId ,
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > & lastOnline )
{
nlohmann : : json jtmp1 , jtmp2 ;
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
uint64_t nwid_i = i - > first . first ;
uint64_t memberid_i = i - > first . second ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , memberid_i ) ;
if ( ! get ( nwid_i , jtmp1 , memberid_i , jtmp2 ) ) {
continue ; // skip non existent members/networks
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
int64_t ts = i - > second . first ;
std : : string ipAddr = i - > second . second . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
std : : unordered_map < std : : string , std : : string > record = {
{ " id " , memberId } ,
{ " address " , ipAddr } ,
{ " last_updated " , std : : to_string ( ts ) }
} ;
tx . zadd ( " nodes-online:{ " + controllerId + " } " , memberId , ts )
. zadd ( " nodes-online2:{ " + controllerId + " } " , networkId + " - " + memberId , ts )
. zadd ( " network-nodes-online:{ " + controllerId + " }: " + networkId , memberId , ts )
. zadd ( " active-networks:{ " + controllerId + " } " , networkId , ts )
. sadd ( " network-nodes-all:{ " + controllerId + " }: " + networkId , memberId )
. hmset ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId , record . begin ( ) , record . end ( ) ) ;
}
// expire records from all-nodes and network-nodes member list
uint64_t expireOld = OSUtils : : now ( ) - 300000 ;
tx . zremrangebyscore ( " nodes-online:{ " + controllerId + " } " , sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
tx . zremrangebyscore ( " nodes-online2:{ " + controllerId + " } " , sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
tx . zremrangebyscore ( " active-networks:{ " + controllerId + " } " , sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
{
std : : lock_guard < std : : mutex > l ( _networks_l ) ;
for ( const auto & it : _networks ) {
uint64_t nwid_i = it . first ;
char nwidTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
tx . zremrangebyscore ( " network-nodes-online:{ " + controllerId + " }: " + nwidTmp ,
sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
}
}
tx . exec ( ) ;
}
PGconn * PostgreSQL : : getPgConn ( OverrideMode m )
{
if ( m = = ALLOW_PGBOUNCER_OVERRIDE ) {