@ -18,168 +18,181 @@
# ifdef ZT_ENABLE_CLUSTER
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
# include <stdint.h>
# include <unistd.h>
# include <sys/types.h>
# include <sys/stat.h>
# include <sys/wait.h>
# include <signal.h>
# include <errno.h>
# include <iostream>
# include <math.h>
# include <cmath>
# include "ClusterGeoIpService.hpp"
# include "../node/Utils.hpp"
# include "../node/InetAddress.hpp"
# include "../osdep/OSUtils.hpp"
// 120 days
# define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL 10368000000ULL
# define ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY 10000
namespace ZeroTier {
ClusterGeoIpService : : ClusterGeoIpService ( const char * pathToExe ) :
_pathToExe ( pathToExe ) ,
_sOutputFd ( - 1 ) ,
_sInputFd ( - 1 ) ,
_sPid ( 0 ) ,
_run ( true )
ClusterGeoIpService : : ClusterGeoIpService ( ) :
_pathToCsv ( ) ,
_ipStartColumn ( - 1 ) ,
_ipEndColumn ( - 1 ) ,
_latitudeColumn ( - 1 ) ,
_longitudeColumn ( - 1 ) ,
_lastFileCheckTime ( 0 ) ,
_csvModificationTime ( 0 ) ,
_csvFileSize ( 0 )
{
_thread = Thread : : start ( this ) ;
}
ClusterGeoIpService : : ~ ClusterGeoIpService ( )
{
_run = false ;
long p = _sPid ;
if ( p > 0 ) {
: : kill ( p , SIGTERM ) ;
Thread : : sleep ( 500 ) ;
: : kill ( p , SIGKILL ) ;
}
Thread : : join ( _thread ) ;
}
bool ClusterGeoIpService : : locate ( const InetAddress & ip , int & x , int & y , int & z )
{
InetAddress ipNoPort ( ip ) ;
ipNoPort . setPort ( 0 ) ; // we index cache by IP only
const uint64_t now = OSUtils : : now ( ) ;
bool r = false ;
{
Mutex : : Lock _l ( _cache_m ) ;
std : : map < InetAddress , _CE > : : iterator c ( _cache . find ( ipNoPort ) ) ;
if ( c ! = _cache . end ( ) ) {
x = c - > second . x ;
y = c - > second . y ;
z = c - > second . z ;
if ( ( now - c - > second . ts ) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL )
return true ;
else r = true ; // return true but refresh as well
}
Mutex : : Lock _l ( _lock ) ;
if ( ( _pathToCsv . length ( ) > 0 ) & & ( ( OSUtils : : now ( ) - _lastFileCheckTime ) > ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY ) ) {
_lastFileCheckTime = OSUtils : : now ( ) ;
if ( ( _csvFileSize ! = OSUtils : : getFileSize ( _pathToCsv . c_str ( ) ) ) | | ( _csvModificationTime ! = OSUtils : : getLastModified ( _pathToCsv . c_str ( ) ) ) )
_load ( _pathToCsv . c_str ( ) , _ipStartColumn , _ipEndColumn , _latitudeColumn , _longitudeColumn ) ;
}
{
Mutex : : Lock _l ( _sOutputLock ) ;
if ( _sOutputFd > = 0 ) {
std : : string ips ( ipNoPort . toIpString ( ) ) ;
ips . push_back ( ' \n ' ) ;
//fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str());
: : write ( _sOutputFd , ips . data ( ) , ips . length ( ) ) ;
/* We search by looking up the upper bound of the sorted vXdb vectors
* and then iterating down for a matching IP range . We stop when we hit
* the beginning or an entry whose start and end are before the IP we
* are searching . */
if ( ( ip . ss_family = = AF_INET ) & & ( _v4db . size ( ) > 0 ) ) {
_V4E key ;
key . start = Utils : : ntoh ( ( uint32_t ) ( reinterpret_cast < const struct sockaddr_in * > ( & ip ) - > sin_addr . s_addr ) ) ;
std : : vector < _V4E > : : const_iterator i ( std : : upper_bound ( _v4db . begin ( ) , _v4db . end ( ) , key ) ) ;
while ( i ! = _v4db . begin ( ) ) {
- - i ;
if ( ( key - > start > = i - > start ) & & ( key - > start < = i - > end ) ) {
x = i - > x ;
y = i - > y ;
z = i - > z ;
return true ;
} else if ( ( key - > start > i - > start ) & & ( key - > start > i - > end ) )
break ;
}
} else if ( ( ip . ss_family = = AF_INET6 ) & & ( _v6db . size ( ) > 0 ) ) {
_V6E key ;
memcpy ( key . start , reinterpret_cast < const struct sockaddr_in6 * > ( & ip ) - > sin6_addr . s6_addr , 16 ) ;
std : : vector < _V6E > : : const_iterator i ( std : : upper_bound ( _v6db . begin ( ) , _v6db . end ( ) , key ) ) ;
while ( i ! = _v6db . begin ( ) ) {
- - i ;
const int s_vs_s = memcmp ( key - > start , i - > start , 16 ) ;
const int s_vs_e = memcmp ( key - > start , i - > end , 16 ) ;
if ( ( s_vs_s > = 0 ) & & ( s_vs_e < = 0 ) ) {
x = i - > x ;
y = i - > y ;
z = i - > z ;
return true ;
} else if ( ( s_vs_s > 0 ) & & ( s_vs_e > 0 ) )
break ;
}
}
return r ;
return false ;
}
void ClusterGeoIpService : : threadMain ( )
throw ( )
static void _parseLine ( const char * line , std : : vector < _V4E > & v4db , std : : vector < _V6E > & v6db , int ipStartColumn , int ipEndColumn , int latitudeColumn , int longitudeColumn )
{
char linebuf [ 65536 ] ;
char buf [ 65536 ] ;
long n , lineptr ;
while ( _run ) {
{
Mutex : : Lock _l ( _sOutputLock ) ;
_sOutputFd = - 1 ;
_sInputFd = - 1 ;
_sPid = 0 ;
int stdinfds [ 2 ] = { 0 , 0 } ; // sub-process's stdin, our output
int stdoutfds [ 2 ] = { 0 , 0 } ; // sub-process's stdout, our input
: : pipe ( stdinfds ) ;
: : pipe ( stdoutfds ) ;
long p = ( long ) : : vfork ( ) ;
if ( p < 0 ) {
Thread : : sleep ( 500 ) ;
continue ;
} else if ( p = = 0 ) {
: : close ( stdinfds [ 1 ] ) ;
: : close ( stdoutfds [ 0 ] ) ;
: : dup2 ( stdinfds [ 0 ] , STDIN_FILENO ) ;
: : dup2 ( stdoutfds [ 1 ] , STDOUT_FILENO ) ;
: : execl ( _pathToExe . c_str ( ) , _pathToExe . c_str ( ) , ( const char * ) 0 ) ;
: : exit ( 1 ) ;
} else {
: : close ( stdinfds [ 0 ] ) ;
: : close ( stdoutfds [ 1 ] ) ;
_sOutputFd = stdinfds [ 1 ] ;
_sInputFd = stdoutfds [ 0 ] ;
_sPid = p ;
std : : vector < std : : string > ls ( Utils : : split ( line , " , \t " , " \\ " , " \" ' " ) ) ;
if ( ( ( ipStartColumn > = 0 ) & & ( ipStartColumn < ( int ) ls . size ( ) ) ) & &
( ( ipEndColumn > = 0 ) & & ( ipEndColumn < ( int ) ls . size ( ) ) ) & &
( ( latitudeColumn > = 0 ) & & ( latitudeColumn < ( int ) ls . size ( ) ) ) & &
( ( longitudeColumn > = 0 ) & & ( longitudeColumn < ( int ) ls . size ( ) ) ) ) {
InetAddress ipStart ( ls [ ipStartColumn ] . c_str ( ) , 0 ) ;
InetAddress ipEnd ( ls [ ipEndColumn ] . c_str ( ) , 0 ) ;
const double lat = strtod ( ls [ latitudeColumn ] . c_str ( ) , ( char * * ) 0 ) ;
const double lon = strtod ( ls [ longitudeColumn ] . c_str ( ) , ( char * * ) 0 ) ;
if ( ( ipStart . ss_family = = ipEnd . ss_family ) & & ( ipStart ) & & ( ipEnd ) & & ( std : : isfinite ( lat ) ) & & ( std : : isfinite ( lon ) ) ) {
const double latRadians = lat * 0.01745329251994 ; // PI / 180
const double lonRadians = lon * 0.01745329251994 ; // PI / 180
const double cosLat = cos ( latRadians ) ;
const int x = ( int ) round ( ( - 6371.0 ) * cosLat * Math . cos ( lonRadians ) ) ; // 6371 == Earth's approximate radius in kilometers
const int y = ( int ) round ( 6371.0 * sin ( latRadians ) ) ;
const int z = ( int ) round ( 6371.0 * cosLat * Math . sin ( lonRadians ) ) ;
if ( ipStart . ss_family = = AF_INET ) {
v4db . push_back ( _V4E ( ) ) ;
v4db . back ( ) . start = Utils : : ntoh ( ( uint32_t ) ( reinterpret_cast < const struct sockaddr_in * > ( & ipStart ) - > sin_addr . s_addr ) ) ;
v4db . back ( ) . end = Utils : : ntoh ( ( uint32_t ) ( reinterpret_cast < const struct sockaddr_in * > ( & ipEnd ) - > sin_addr . s_addr ) ) ;
v4db . back ( ) . x = x ;
v4db . back ( ) . y = y ;
v4db . back ( ) . z = z ;
} else if ( ipStart . ss_family = = AF_INET6 ) {
v6db . push_back ( _V6E ( ) ) ;
memcpy ( v6db . back ( ) . start , reinterpret_cast < const struct sockaddr_in6 * > ( & ipStart ) - > sin6_addr . s6_addr , 16 ) ;
memcpy ( v6db . back ( ) . end , reinterpret_cast < const struct sockaddr_in6 * > ( & ipEnd ) - > sin6_addr . s6_addr , 16 ) ;
v6db . back ( ) . x = x ;
v6db . back ( ) . y = y ;
v6db . back ( ) . z = z ;
}
}
}
}
lineptr = 0 ;
while ( _run ) {
n = : : read ( _sInputFd , buf , sizeof ( buf ) ) ;
if ( n < = 0 ) {
if ( errno = = EINTR )
continue ;
else break ;
}
for ( long i = 0 ; i < n ; + + i ) {
if ( lineptr > ( long ) sizeof ( linebuf ) )
lineptr = 0 ;
if ( ( buf [ i ] = = ' \n ' ) | | ( buf [ i ] = = ' \r ' ) ) {
long ClusterGeoIpService : : _load ( const char * pathToCsv , int ipStartColumn , int ipEndColumn , int latitudeColumn , int longitudeColumn )
{
// assumes _lock is locked
FILE * f = fopen ( pathToCsv , " rb " ) ;
if ( ! f )
return - 1 ;
std : : vector < _V4E > v4db ;
std : : vector < _V6E > v6db ;
char buf [ 4096 ] ;
char linebuf [ 1024 ] ;
unsigned int lineptr = 0 ;
for ( ; ; ) {
int n = ( int ) fread ( buf , 1 , sizeof ( buf ) , f ) ;
if ( n < = 0 )
break ;
for ( int i = 0 ; i < n ; + + i ) {
if ( ( buf [ i ] = = ' \r ' ) | | ( buf [ i ] = = ' \n ' ) | | ( buf [ i ] = = ( char ) 0 ) ) {
if ( lineptr ) {
linebuf [ lineptr ] = ( char ) 0 ;
if ( lineptr > 0 ) {
//fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf);
try {
std : : vector < std : : string > result ( Utils : : split ( linebuf , " , " , " " , " " ) ) ;
if ( ( result . size ( ) > = 7 ) & & ( result [ 1 ] = = " 1 " ) ) {
InetAddress rip ( result [ 0 ] , 0 ) ;
if ( ( rip . ss_family = = AF_INET ) | | ( rip . ss_family = = AF_INET6 ) ) {
_CE ce ;
ce . ts = OSUtils : : now ( ) ;
ce . x = ( int ) : : strtol ( result [ 4 ] . c_str ( ) , ( char * * ) 0 , 10 ) ;
ce . y = ( int ) : : strtol ( result [ 5 ] . c_str ( ) , ( char * * ) 0 , 10 ) ;
ce . z = ( int ) : : strtol ( result [ 6 ] . c_str ( ) , ( char * * ) 0 , 10 ) ;
//fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z);
{
Mutex : : Lock _l2 ( _cache_m ) ;
_cache [ rip ] = ce ;
}
}
}
} catch ( . . . ) { }
}
lineptr = 0 ;
} else linebuf [ lineptr + + ] = buf [ i ] ;
}
_parseLine ( linebuf , v4db , v6db , ipStartColumn , ipEndColumn , latitudeColumn , longitudeColumn ) ;
}
lineptr = 0 ;
} else if ( lineptr < ( unsigned int ) sizeof ( linebuf ) )
linebuf [ lineptr + + ] = buf [ i ] ;
}
}
if ( lineptr ) {
linebuf [ lineptr ] = ( char ) 0 ;
_parseLine ( linebuf , v4db , v6db , ipStartColumn , ipEndColumn , latitudeColumn , longitudeColumn ) ;
}
fclose ( f ) ;
if ( ( v4db . size ( ) > 0 ) | | ( v6db . size ( ) > 0 ) ) {
std : : sort ( v4db . begin ( ) , v4db . end ( ) ) ;
std : : sort ( v6db . begin ( ) , v6db . end ( ) ) ;
_pathToCsv = pathToCsv ;
_ipStartColumn = ipStartColumn ;
_ipEndColumn = ipEndColumn ;
_latitudeColumn = latitudeColumn ;
_longitudeColumn = longitudeColumn ;
_lastFileCheckTime = OSUtils : : now ( ) ;
_csvModificationTime = OSUtils : : getLastModified ( pathToCsv ) ;
_csvFileSize = OSUtils : : getFileSize ( pathToCsv ) ;
_v4db . swap ( v4db ) ;
_v6db . swap ( v6db ) ;
: : close ( _sOutputFd ) ;
: : close ( _sInputFd ) ;
: : kill ( _sPid , SIGTERM ) ;
Thread : : sleep ( 250 ) ;
: : kill ( _sPid , SIGKILL ) ;
: : waitpid ( _sPid , ( int * ) 0 , 0 ) ;
return ( long ) ( _v4db . size ( ) + _v6db . size ( ) ) ;
} else {
return 0 ;
}
}