|
|
|
|
@ -1389,129 +1389,6 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
|
|
PQclear(res); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const int64_t now = OSUtils::now(); |
|
|
|
|
if ((now - lastUpdatedNetworkStatus) > 10000) { |
|
|
|
|
lastUpdatedNetworkStatus = now; |
|
|
|
|
|
|
|
|
|
std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks; |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::mutex> l(_networks_l); |
|
|
|
|
for (auto i = _networks.begin(); i != _networks.end(); ++i) { |
|
|
|
|
networks.push_back(*i); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::stringstream networkUpdate; |
|
|
|
|
networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES "; |
|
|
|
|
bool nwFirstRun = true; |
|
|
|
|
bool networkAdded = false; |
|
|
|
|
for (auto i = networks.begin(); i != networks.end(); ++i) { |
|
|
|
|
char tmp[64]; |
|
|
|
|
Utils::hex(i->first, tmp); |
|
|
|
|
|
|
|
|
|
std::string networkId(tmp); |
|
|
|
|
|
|
|
|
|
std::vector<std::string> &_notUsed = updateMap[networkId]; |
|
|
|
|
(void)_notUsed; |
|
|
|
|
|
|
|
|
|
uint64_t authMemberCount = 0; |
|
|
|
|
uint64_t totalMemberCount = 0; |
|
|
|
|
uint64_t onlineMemberCount = 0; |
|
|
|
|
uint64_t bridgeCount = 0; |
|
|
|
|
uint64_t ts = now; |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::mutex> l2(i->second->lock); |
|
|
|
|
authMemberCount = i->second->authorizedMembers.size(); |
|
|
|
|
totalMemberCount = i->second->members.size(); |
|
|
|
|
bridgeCount = i->second->activeBridgeMembers.size(); |
|
|
|
|
for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { |
|
|
|
|
auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); |
|
|
|
|
if (lo != lastOnlineCumulative.end()) { |
|
|
|
|
if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { |
|
|
|
|
++onlineMemberCount; |
|
|
|
|
} else { |
|
|
|
|
lastOnlineCumulative.erase(lo); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const char *nvals[1] = { |
|
|
|
|
networkId.c_str() |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
res = PQexecParams(conn, |
|
|
|
|
"SELECT id FROM ztc_network WHERE id = $1", |
|
|
|
|
1, |
|
|
|
|
NULL, |
|
|
|
|
nvals, |
|
|
|
|
NULL, |
|
|
|
|
NULL, |
|
|
|
|
0); |
|
|
|
|
|
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) { |
|
|
|
|
fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn)); |
|
|
|
|
PQclear(res); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int nrows = PQntuples(res); |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
if (nrows == 1) { |
|
|
|
|
std::string bc = std::to_string(bridgeCount); |
|
|
|
|
std::string amc = std::to_string(authMemberCount); |
|
|
|
|
std::string omc = std::to_string(onlineMemberCount); |
|
|
|
|
std::string tmc = std::to_string(totalMemberCount); |
|
|
|
|
std::string timestamp = std::to_string(ts); |
|
|
|
|
|
|
|
|
|
if (nwFirstRun) { |
|
|
|
|
nwFirstRun = false; |
|
|
|
|
} else { |
|
|
|
|
networkUpdate << ", "; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", " |
|
|
|
|
<< "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; |
|
|
|
|
|
|
|
|
|
networkAdded = true; |
|
|
|
|
|
|
|
|
|
} else if (nrows > 1) { |
|
|
|
|
fprintf(stderr, "Number of networks > 1?!?!?"); |
|
|
|
|
continue; |
|
|
|
|
} else { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " |
|
|
|
|
<< "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " |
|
|
|
|
<< "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified"; |
|
|
|
|
if (networkAdded) { |
|
|
|
|
res = PQexec(conn, networkUpdate.str().c_str()); |
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK) { |
|
|
|
|
fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res)); |
|
|
|
|
} |
|
|
|
|
PQclear(res); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
|
|
|
|
|
// std::string networkId = it->first;
|
|
|
|
|
// std::vector<std::string> members = it->second;
|
|
|
|
|
// std::stringstream queryBuilder;
|
|
|
|
|
|
|
|
|
|
// std::string membersStr = ::join(members, ",");
|
|
|
|
|
|
|
|
|
|
// queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
|
|
|
|
|
// std::string query = queryBuilder.str();
|
|
|
|
|
|
|
|
|
|
// PGresult *res = PQexec(conn,query.c_str());
|
|
|
|
|
// if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
|
|
|
|
// fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
|
|
|
|
|
// }
|
|
|
|
|
// PQclear(res);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
|
|
|
|
} |
|
|
|
|
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); |
|
|
|
|
|