@ -18,7 +18,10 @@
# ifdef ZT_CONTROLLER_USE_RETHINKDB
# ifdef ZT_CONTROLLER_USE_RETHINKDB
# include <stdio.h>
# include <stdlib.h>
# include <unistd.h>
# include <unistd.h>
# include <time.h>
# include "RethinkDB.hpp"
# include "RethinkDB.hpp"
# include "EmbeddedNetworkController.hpp"
# include "EmbeddedNetworkController.hpp"
@ -36,6 +39,23 @@ using json = nlohmann::json;
namespace ZeroTier {
namespace ZeroTier {
static const char * _timestr ( )
{
time_t t = time ( 0 ) ;
char * ts = ctime ( & t ) ;
if ( ! ts )
return " error " ;
char * p = ts ;
while ( * p ) {
if ( * p = = ' \n ' ) {
* p = ( char ) 0 ;
break ;
}
+ + p ;
}
return ts ;
}
RethinkDB : : RethinkDB ( EmbeddedNetworkController * const nc , const Identity & myId , const char * path ) :
RethinkDB : : RethinkDB ( EmbeddedNetworkController * const nc , const Identity & myId , const char * path ) :
DB ( nc , myId , path ) ,
DB ( nc , myId , path ) ,
_ready ( 2 ) , // two tables need to be synchronized before we're ready, so this is ready when it reaches 0
_ready ( 2 ) , // two tables need to be synchronized before we're ready, so this is ready when it reaches 0
@ -68,7 +88,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
if ( ( tmp [ " type " ] = = " state " ) & & ( tmp [ " state " ] = = " ready " ) ) {
if ( ( tmp [ " type " ] = = " state " ) & & ( tmp [ " state " ] = = " ready " ) ) {
if ( - - this - > _ready = = 0 ) {
if ( - - this - > _ready = = 0 ) {
if ( _waitNoticePrinted )
if ( _waitNoticePrinted )
fprintf ( stderr , " NOTICE: %.10llx controller RethinkDB data download complete. " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller RethinkDB data download complete." ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
this - > _readyLock . unlock ( ) ;
this - > _readyLock . unlock ( ) ;
}
}
} else {
} else {
@ -84,11 +104,11 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
}
}
}
}
} catch ( std : : exception & e ) {
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (member change stream): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (member change stream): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
} catch ( R : : Error & e ) {
} catch ( R : : Error & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (member change stream): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (member change stream): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
} catch ( . . . ) {
} catch ( . . . ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (member change stream): unknown exception " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (member change stream): unknown exception" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
}
}
@ -109,7 +129,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
if ( ( tmp [ " type " ] = = " state " ) & & ( tmp [ " state " ] = = " ready " ) ) {
if ( ( tmp [ " type " ] = = " state " ) & & ( tmp [ " state " ] = = " ready " ) ) {
if ( - - this - > _ready = = 0 ) {
if ( - - this - > _ready = = 0 ) {
if ( _waitNoticePrinted )
if ( _waitNoticePrinted )
fprintf ( stderr , " NOTICE: %.10llx controller RethinkDB data download complete. " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller RethinkDB data download complete." ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
this - > _readyLock . unlock ( ) ;
this - > _readyLock . unlock ( ) ;
}
}
} else {
} else {
@ -125,11 +145,11 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
}
}
}
}
} catch ( std : : exception & e ) {
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (network change stream): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (network change stream): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
} catch ( R : : Error & e ) {
} catch ( R : : Error & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (network change stream): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (network change stream): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
} catch ( . . . ) {
} catch ( . . . ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (network change stream): unknown exception " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (network change stream): unknown exception" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
}
}
@ -200,16 +220,16 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
}
}
break ;
break ;
} else {
} else {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (insert/update): connect failed (will retry) " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (insert/update): connect failed (will retry)" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
}
} catch ( std : : exception & e ) {
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (insert/update): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (insert/update): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
rdb . reset ( ) ;
rdb . reset ( ) ;
} catch ( R : : Error & e ) {
} catch ( R : : Error & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (insert/update): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (insert/update): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
rdb . reset ( ) ;
rdb . reset ( ) ;
} catch ( . . . ) {
} catch ( . . . ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (insert/update): unknown exception " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (insert/update): unknown exception" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
rdb . reset ( ) ;
rdb . reset ( ) ;
}
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
@ -303,13 +323,13 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
}
}
}
}
} catch ( std : : exception & e ) {
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (node status update): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (node status update): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . what ( ) ) ;
rdb . reset ( ) ;
rdb . reset ( ) ;
} catch ( R : : Error & e ) {
} catch ( R : : Error & e ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (node status update): %s " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (node status update): %s" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) , e . message . c_str ( ) ) ;
rdb . reset ( ) ;
rdb . reset ( ) ;
} catch ( . . . ) {
} catch ( . . . ) {
fprintf ( stderr , " ERROR: %.10llx controller RethinkDB (node status update): unknown exception " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] ERROR: %.10llx controller RethinkDB (node status update): unknown exception" ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
rdb . reset ( ) ;
rdb . reset ( ) ;
}
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 250 ) ) ;
@ -389,7 +409,7 @@ bool RethinkDB::waitForReady()
while ( _ready > 0 ) {
while ( _ready > 0 ) {
if ( ! _waitNoticePrinted ) {
if ( ! _waitNoticePrinted ) {
_waitNoticePrinted = true ;
_waitNoticePrinted = true ;
fprintf ( stderr , " NOTICE: %.10llx controller RethinkDB waiting for initial data download... " ZT_EOL_S , ( unsigned long long ) _myAddress . toInt ( ) ) ;
fprintf ( stderr , " [%s] NOTICE: %.10llx controller RethinkDB waiting for initial data download..." ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
}
_readyLock . lock ( ) ;
_readyLock . lock ( ) ;
_readyLock . unlock ( ) ;
_readyLock . unlock ( ) ;