You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
134 lines
3.7 KiB
134 lines
3.7 KiB
/* |
|
* ZeroTier One - Network Virtualization Everywhere |
|
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ |
|
* |
|
* This program is free software: you can redistribute it and/or modify |
|
* it under the terms of the GNU General Public License as published by |
|
* the Free Software Foundation, either version 3 of the License, or |
|
* (at your option) any later version. |
|
* |
|
* This program is distributed in the hope that it will be useful, |
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
* GNU General Public License for more details. |
|
* |
|
* You should have received a copy of the GNU General Public License |
|
* along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
* |
|
* -- |
|
* |
|
* You can be released from the requirements of the license by purchasing |
|
* a commercial license. Buying such a license is mandatory as soon as you |
|
* develop commercial closed-source software that incorporates or links |
|
* directly against ZeroTier software without disclosing the source code |
|
* of your own application. |
|
*/ |
|
|
|
|
|
#include "RabbitMQ.hpp" |
|
|
|
#ifdef ZT_CONTROLLER_USE_LIBPQ |
|
|
|
#include <amqp.h> |
|
#include <amqp_tcp_socket.h> |
|
#include <stdexcept> |
|
#include <cstring> |
|
|
|
namespace ZeroTier |
|
{ |
|
|
|
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) |
|
: _mqc(cfg) |
|
, _qName(queueName) |
|
, _socket(NULL) |
|
, _status(0) |
|
{ |
|
} |
|
|
|
RabbitMQ::~RabbitMQ() |
|
{ |
|
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); |
|
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); |
|
amqp_destroy_connection(_conn); |
|
} |
|
|
|
void RabbitMQ::init() |
|
{ |
|
struct timeval tval; |
|
memset(&tval, 0, sizeof(struct timeval)); |
|
tval.tv_sec = 5; |
|
|
|
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); |
|
_conn = amqp_new_connection(); |
|
_socket = amqp_tcp_socket_new(_conn); |
|
if (!_socket) { |
|
throw std::runtime_error("Can't create socket for RabbitMQ"); |
|
} |
|
|
|
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); |
|
if (_status) { |
|
throw std::runtime_error("Can't connect to RabbitMQ"); |
|
} |
|
|
|
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, |
|
_mqc->username, _mqc->password); |
|
if (r.reply_type != AMQP_RESPONSE_NORMAL) { |
|
throw std::runtime_error("RabbitMQ Login Error"); |
|
} |
|
|
|
static int chan = 0; |
|
{ |
|
Mutex::Lock l(_chan_m); |
|
_channel = ++chan; |
|
} |
|
amqp_channel_open(_conn, _channel); |
|
r = amqp_get_rpc_reply(_conn); |
|
if(r.reply_type != AMQP_RESPONSE_NORMAL) { |
|
throw std::runtime_error("Error opening communication channel"); |
|
} |
|
|
|
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); |
|
r = amqp_get_rpc_reply(_conn); |
|
if (r.reply_type != AMQP_RESPONSE_NORMAL) { |
|
throw std::runtime_error("Error declaring queue " + std::string(_qName)); |
|
} |
|
|
|
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); |
|
r = amqp_get_rpc_reply(_conn); |
|
if (r.reply_type != AMQP_RESPONSE_NORMAL) { |
|
throw std::runtime_error("Error consuming queue " + std::string(_qName)); |
|
} |
|
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); |
|
} |
|
|
|
std::string RabbitMQ::consume() |
|
{ |
|
amqp_rpc_reply_t res; |
|
amqp_envelope_t envelope; |
|
amqp_maybe_release_buffers(_conn); |
|
|
|
struct timeval timeout; |
|
timeout.tv_sec = 1; |
|
timeout.tv_usec = 0; |
|
|
|
res = amqp_consume_message(_conn, &envelope, &timeout, 0); |
|
if (res.reply_type != AMQP_RESPONSE_NORMAL) { |
|
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { |
|
// timeout waiting for message. Return empty string |
|
return ""; |
|
} else { |
|
throw std::runtime_error("Error getting message"); |
|
} |
|
} |
|
|
|
std::string msg( |
|
(const char*)envelope.message.body.bytes, |
|
envelope.message.body.len |
|
); |
|
amqp_destroy_envelope(&envelope); |
|
return msg; |
|
} |
|
|
|
} |
|
|
|
#endif // ZT_CONTROLLER_USE_LIBPQ
|
|
|