|
|
|
|
@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
|
|
|
|
|
|
|
std::string setKey = "networks:{" + _myAddressStr + "}"; |
|
|
|
|
|
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
try { |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
_cluster->del(setKey); |
|
|
|
|
} else { |
|
|
|
|
_redis->del(setKey); |
|
|
|
|
} |
|
|
|
|
} catch (sw::redis::Error &e) { |
|
|
|
|
// del can throw an error if the key doesn't exist
|
|
|
|
|
// swallow it and move along
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// try {
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// _cluster->del(setKey);
|
|
|
|
|
// } else {
|
|
|
|
|
// _redis->del(setKey);
|
|
|
|
|
// }
|
|
|
|
|
// } catch (sw::redis::Error &e) {
|
|
|
|
|
// // del can throw an error if the key doesn't exist
|
|
|
|
|
// // swallow it and move along
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
std::unordered_set<std::string> networkSet; |
|
|
|
|
|
|
|
|
|
@ -475,17 +475,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
|
|
|
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
if(!networkSet.empty()) { |
|
|
|
|
if (_rc && _rc->clusterMode) { |
|
|
|
|
auto tx = _cluster->transaction(_myAddressStr, true); |
|
|
|
|
tx.sadd(setKey, networkSet.begin(), networkSet.end()); |
|
|
|
|
tx.exec(); |
|
|
|
|
} else if (_rc && !_rc->clusterMode) { |
|
|
|
|
auto tx = _redis->transaction(true); |
|
|
|
|
tx.sadd(setKey, networkSet.begin(), networkSet.end()); |
|
|
|
|
tx.exec(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if(!networkSet.empty()) {
|
|
|
|
|
// if (_rc && _rc->clusterMode) {
|
|
|
|
|
// auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
// tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
|
|
// tx.exec();
|
|
|
|
|
// } else if (_rc && !_rc->clusterMode) {
|
|
|
|
|
// auto tx = _redis->transaction(true);
|
|
|
|
|
// tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
|
|
// tx.exec();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
if (++this->_ready == 2) { |
|
|
|
|
if (_waitNoticePrinted) { |
|
|
|
|
@ -509,36 +509,36 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
|
|
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; |
|
|
|
|
// std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
|
|
|
|
|
|
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
std::lock_guard<std::mutex> l(_networks_l); |
|
|
|
|
std::unordered_set<std::string> deletes; |
|
|
|
|
for ( auto it : _networks) { |
|
|
|
|
uint64_t nwid_i = it.first; |
|
|
|
|
char nwidTmp[64] = {0}; |
|
|
|
|
OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); |
|
|
|
|
std::string nwid(nwidTmp); |
|
|
|
|
std::string key = setKeyBase + nwid; |
|
|
|
|
deletes.insert(key); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!deletes.empty()) { |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
auto tx = _cluster->transaction(_myAddressStr, true); |
|
|
|
|
for (std::string k : deletes) { |
|
|
|
|
tx.del(k); |
|
|
|
|
} |
|
|
|
|
tx.exec(); |
|
|
|
|
} else { |
|
|
|
|
auto tx = _redis->transaction(true); |
|
|
|
|
for (std::string k : deletes) { |
|
|
|
|
tx.del(k); |
|
|
|
|
} |
|
|
|
|
tx.exec(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// std::lock_guard<std::mutex> l(_networks_l);
|
|
|
|
|
// std::unordered_set<std::string> deletes;
|
|
|
|
|
// for ( auto it : _networks) {
|
|
|
|
|
// uint64_t nwid_i = it.first;
|
|
|
|
|
// char nwidTmp[64] = {0};
|
|
|
|
|
// OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
|
|
// std::string nwid(nwidTmp);
|
|
|
|
|
// std::string key = setKeyBase + nwid;
|
|
|
|
|
// deletes.insert(key);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// if (!deletes.empty()) {
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
// for (std::string k : deletes) {
|
|
|
|
|
// tx.del(k);
|
|
|
|
|
// }
|
|
|
|
|
// tx.exec();
|
|
|
|
|
// } else {
|
|
|
|
|
// auto tx = _redis->transaction(true);
|
|
|
|
|
// for (std::string k : deletes) {
|
|
|
|
|
// tx.del(k);
|
|
|
|
|
// }
|
|
|
|
|
// tx.exec();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
const char *params[1] = { |
|
|
|
|
_myAddressStr.c_str() |
|
|
|
|
@ -578,7 +578,7 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
|
|
std::string memberId(PQgetvalue(res, i, 0)); |
|
|
|
|
std::string networkId(PQgetvalue(res, i, 1)); |
|
|
|
|
|
|
|
|
|
networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId)); |
|
|
|
|
// networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
|
|
|
|
|
|
|
|
|
|
std::string ctime = PQgetvalue(res, i, 5); |
|
|
|
|
config["id"] = memberId; |
|
|
|
|
@ -685,23 +685,23 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
|
|
|
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
if (!networkMembers.empty()) { |
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
auto tx = _cluster->transaction(_myAddressStr, true); |
|
|
|
|
for (auto it : networkMembers) { |
|
|
|
|
tx.sadd(it.first, it.second); |
|
|
|
|
} |
|
|
|
|
tx.exec(); |
|
|
|
|
} else { |
|
|
|
|
auto tx = _redis->transaction(true); |
|
|
|
|
for (auto it : networkMembers) { |
|
|
|
|
tx.sadd(it.first, it.second); |
|
|
|
|
} |
|
|
|
|
tx.exec(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (!networkMembers.empty()) {
|
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
// for (auto it : networkMembers) {
|
|
|
|
|
// tx.sadd(it.first, it.second);
|
|
|
|
|
// }
|
|
|
|
|
// tx.exec();
|
|
|
|
|
// } else {
|
|
|
|
|
// auto tx = _redis->transaction(true);
|
|
|
|
|
// for (auto it : networkMembers) {
|
|
|
|
|
// tx.sadd(it.first, it.second);
|
|
|
|
|
// }
|
|
|
|
|
// tx.exec();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
if (++this->_ready == 2) { |
|
|
|
|
if (_waitNoticePrinted) { |
|
|
|
|
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); |
|
|
|
|
@ -755,7 +755,7 @@ void PostgreSQL::heartbeat()
|
|
|
|
|
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); |
|
|
|
|
std::string now = std::to_string(ts); |
|
|
|
|
std::string host_port = std::to_string(_listenPort); |
|
|
|
|
std::string use_redis = (_rc != NULL) ? "true" : "false"; |
|
|
|
|
std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false";
|
|
|
|
|
const char *values[10] = { |
|
|
|
|
controllerId, |
|
|
|
|
hostname, |
|
|
|
|
@ -788,13 +788,13 @@ void PostgreSQL::heartbeat()
|
|
|
|
|
} |
|
|
|
|
PQclear(res); |
|
|
|
|
} |
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
_cluster->zadd("controllers", controllerId, ts); |
|
|
|
|
} else { |
|
|
|
|
_redis->zadd("controllers", controllerId, ts); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// _cluster->zadd("controllers", controllerId, ts);
|
|
|
|
|
// } else {
|
|
|
|
|
// _redis->zadd("controllers", controllerId, ts);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); |
|
|
|
|
} |
|
|
|
|
@ -833,6 +833,7 @@ void PostgreSQL::membersDbWatcher()
|
|
|
|
|
void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { |
|
|
|
|
char buf[11] = {0}; |
|
|
|
|
std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); |
|
|
|
|
fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str()); |
|
|
|
|
PGresult *res = PQexec(conn, cmd.c_str()); |
|
|
|
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { |
|
|
|
|
fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); |
|
|
|
|
@ -874,7 +875,7 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
|
|
|
|
|
void PostgreSQL::_membersWatcher_Redis() { |
|
|
|
|
char buf[11] = {0}; |
|
|
|
|
std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; |
|
|
|
|
|
|
|
|
|
fprintf(stderr, "Listening to member stream: %s\n", key.c_str()); |
|
|
|
|
while (_run == 1) { |
|
|
|
|
try { |
|
|
|
|
json tmp; |
|
|
|
|
@ -1515,20 +1516,20 @@ void PostgreSQL::commitThread()
|
|
|
|
|
} catch (std::exception &e) { |
|
|
|
|
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
try { |
|
|
|
|
std::string id = (*config)["id"]; |
|
|
|
|
std::string controllerId = _myAddressStr.c_str(); |
|
|
|
|
std::string key = "networks:{" + controllerId + "}"; |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
_cluster->sadd(key, id); |
|
|
|
|
} else { |
|
|
|
|
_redis->sadd(key, id); |
|
|
|
|
} |
|
|
|
|
} catch (sw::redis::Error &e) { |
|
|
|
|
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// try {
|
|
|
|
|
// std::string id = (*config)["id"];
|
|
|
|
|
// std::string controllerId = _myAddressStr.c_str();
|
|
|
|
|
// std::string key = "networks:{" + controllerId + "}";
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// _cluster->sadd(key, id);
|
|
|
|
|
// } else {
|
|
|
|
|
// _redis->sadd(key, id);
|
|
|
|
|
// }
|
|
|
|
|
// } catch (sw::redis::Error &e) {
|
|
|
|
|
// fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
} else if (objtype == "_delete_network") { |
|
|
|
|
try { |
|
|
|
|
std::string networkId = (*config)["nwid"]; |
|
|
|
|
@ -1552,22 +1553,22 @@ void PostgreSQL::commitThread()
|
|
|
|
|
} catch (std::exception &e) { |
|
|
|
|
fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
try { |
|
|
|
|
std::string id = (*config)["id"]; |
|
|
|
|
std::string controllerId = _myAddressStr.c_str(); |
|
|
|
|
std::string key = "networks:{" + controllerId + "}"; |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
_cluster->srem(key, id); |
|
|
|
|
_cluster->del("network-nodes-online:{"+controllerId+"}:"+id); |
|
|
|
|
} else { |
|
|
|
|
_redis->srem(key, id); |
|
|
|
|
_redis->del("network-nodes-online:{"+controllerId+"}:"+id); |
|
|
|
|
} |
|
|
|
|
} catch (sw::redis::Error &e) { |
|
|
|
|
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// try {
|
|
|
|
|
// std::string id = (*config)["id"];
|
|
|
|
|
// std::string controllerId = _myAddressStr.c_str();
|
|
|
|
|
// std::string key = "networks:{" + controllerId + "}";
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// _cluster->srem(key, id);
|
|
|
|
|
// _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
|
|
|
|
|
// } else {
|
|
|
|
|
// _redis->srem(key, id);
|
|
|
|
|
// _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
|
|
|
|
|
// }
|
|
|
|
|
// } catch (sw::redis::Error &e) {
|
|
|
|
|
// fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
} else if (objtype == "_delete_member") { |
|
|
|
|
try { |
|
|
|
|
std::string memberId = (*config)["id"]; |
|
|
|
|
@ -1595,23 +1596,23 @@ void PostgreSQL::commitThread()
|
|
|
|
|
} catch (std::exception &e) { |
|
|
|
|
fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
try { |
|
|
|
|
std::string memberId = (*config)["id"]; |
|
|
|
|
std::string networkId = (*config)["nwid"]; |
|
|
|
|
std::string controllerId = _myAddressStr.c_str(); |
|
|
|
|
std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
_cluster->srem(key, memberId); |
|
|
|
|
_cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); |
|
|
|
|
} else { |
|
|
|
|
_redis->srem(key, memberId); |
|
|
|
|
_redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); |
|
|
|
|
} |
|
|
|
|
} catch (sw::redis::Error &e) { |
|
|
|
|
fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// try {
|
|
|
|
|
// std::string memberId = (*config)["id"];
|
|
|
|
|
// std::string networkId = (*config)["nwid"];
|
|
|
|
|
// std::string controllerId = _myAddressStr.c_str();
|
|
|
|
|
// std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
|
|
|
|
|
// if (_rc->clusterMode) {
|
|
|
|
|
// _cluster->srem(key, memberId);
|
|
|
|
|
// _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
|
|
|
|
|
// } else {
|
|
|
|
|
// _redis->srem(key, memberId);
|
|
|
|
|
// _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
|
|
|
|
|
// }
|
|
|
|
|
// } catch (sw::redis::Error &e) {
|
|
|
|
|
// fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
} else { |
|
|
|
|
fprintf(stderr, "ERROR: unknown objtype"); |
|
|
|
|
} |
|
|
|
|
@ -1619,7 +1620,7 @@ void PostgreSQL::commitThread()
|
|
|
|
|
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
PQfinish(conn); |
|
|
|
|
@ -1634,11 +1635,11 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
|
|
{ |
|
|
|
|
waitForReady(); |
|
|
|
|
|
|
|
|
|
if (_rc != NULL) { |
|
|
|
|
onlineNotification_Redis(); |
|
|
|
|
} else { |
|
|
|
|
// if (_rc != NULL) {
|
|
|
|
|
// onlineNotification_Redis();
|
|
|
|
|
// } else {
|
|
|
|
|
onlineNotification_Postgres(); |
|
|
|
|
} |
|
|
|
|
// }
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PostgreSQL::onlineNotification_Postgres() |
|
|
|
|
@ -1783,7 +1784,7 @@ void PostgreSQL::onlineNotification_Redis()
|
|
|
|
|
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
|
|
|
|
std::this_thread::sleep_for(std::chrono::seconds(10)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|