|
|
|
|
@ -1684,6 +1684,7 @@ void PostgreSQL::onlineNotification_Redis()
|
|
|
|
|
while (_run == 1) { |
|
|
|
|
fprintf(stderr, "onlineNotification tick\n"); |
|
|
|
|
auto start = std::chrono::high_resolution_clock::now(); |
|
|
|
|
uint64_t count = 0; |
|
|
|
|
|
|
|
|
|
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; |
|
|
|
|
{ |
|
|
|
|
@ -1694,10 +1695,10 @@ void PostgreSQL::onlineNotification_Redis()
|
|
|
|
|
if (!lastOnline.empty()) { |
|
|
|
|
if (_rc->clusterMode) { |
|
|
|
|
auto tx = _cluster->transaction(controllerId, true); |
|
|
|
|
_doRedisUpdate(tx, controllerId, lastOnline); |
|
|
|
|
count = _doRedisUpdate(tx, controllerId, lastOnline); |
|
|
|
|
} else { |
|
|
|
|
auto tx = _redis->transaction(true); |
|
|
|
|
_doRedisUpdate(tx, controllerId, lastOnline); |
|
|
|
|
count = _doRedisUpdate(tx, controllerId, lastOnline); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} catch (sw::redis::Error &e) { |
|
|
|
|
@ -1710,11 +1711,15 @@ void PostgreSQL::onlineNotification_Redis()
|
|
|
|
|
|
|
|
|
|
fprintf(stderr, "onlineNotification ran in %llu ms\n", total); |
|
|
|
|
|
|
|
|
|
std::this_thread::yield(); |
|
|
|
|
if (count > 0) { |
|
|
|
|
std::this_thread::yield(); |
|
|
|
|
} else { |
|
|
|
|
std::this_thread::sleep_for(std::chrono::seconds(1)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId,
|
|
|
|
|
uint64_t PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId, |
|
|
|
|
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline)
|
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
@ -1778,6 +1783,8 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
|
|
|
|
|
} |
|
|
|
|
tx.exec(); |
|
|
|
|
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count); |
|
|
|
|
|
|
|
|
|
return count; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|