|
|
|
|
@ -16,6 +16,8 @@
|
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
//#define ZT_CONTROLLER_USE_RETHINKDB
|
|
|
|
|
|
|
|
|
|
#ifdef ZT_CONTROLLER_USE_RETHINKDB |
|
|
|
|
|
|
|
|
|
#include <stdio.h> |
|
|
|
|
@ -164,7 +166,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
|
|
|
|
|
while ((this->_commitQueue.get(config))&&(_run == 1)) { |
|
|
|
|
if (!config) |
|
|
|
|
continue; |
|
|
|
|
json record; |
|
|
|
|
nlohmann::json record; |
|
|
|
|
const char *table = (const char *)0; |
|
|
|
|
std::string deleteId; |
|
|
|
|
try { |
|
|
|
|
@ -182,7 +184,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
|
|
|
|
|
const std::string id = (*config)["id"]; |
|
|
|
|
record["id"] = id; |
|
|
|
|
record["controllerId"] = this->_myAddressStr; |
|
|
|
|
record["config"] = *config; |
|
|
|
|
record["config"] = *config; |
|
|
|
|
table = "Network"; |
|
|
|
|
} else if (objtype == "trace") { |
|
|
|
|
record = *config; |
|
|
|
|
@ -221,7 +223,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,co
|
|
|
|
|
R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); |
|
|
|
|
} else { |
|
|
|
|
//printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str());
|
|
|
|
|
R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb); |
|
|
|
|
R::db(this->_db).table(table).insert(R::Datum::from_json(OSUtils::jsonDump(record,-1)),R::optargs("conflict","update","return_changes",false)).run(*rdb); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
} else { |
|
|
|
|
@ -436,18 +438,8 @@ void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record)
|
|
|
|
|
waitForReady(); |
|
|
|
|
if (orig) { |
|
|
|
|
if (*orig != record) { |
|
|
|
|
nlohmann::json *q = new nlohmann::json(); |
|
|
|
|
try { |
|
|
|
|
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; |
|
|
|
|
for(auto kv=record.begin();kv!=record.end();++kv) { |
|
|
|
|
if ((kv.key() == "id")||(kv.key() == "nwid")||(kv.key() == "objtype")||((*q)[kv.key()] != kv.value())) |
|
|
|
|
(*q)[kv.key()] = kv.value(); |
|
|
|
|
} |
|
|
|
|
_commitQueue.post(new nlohmann::json(record)); |
|
|
|
|
} catch ( ... ) { |
|
|
|
|
delete q; |
|
|
|
|
throw; |
|
|
|
|
} |
|
|
|
|
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; |
|
|
|
|
_commitQueue.post(new nlohmann::json(record)); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
record["revision"] = 1; |
|
|
|
|
|