生产者
cpp
#ifndef MQ_PRODUCER_H
#define MQ_PRODUCER_H
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <string>
class MqProducer
{
public:
MqProducer(const std::string &host, const std::string &username, const std::string &password);
~MqProducer();
bool init_exchange(const std::string &exchange_name, const std::string &exchange_type);
private:
amqp_connection_state_t conn;
amqp_socket_t *socket;
std::string exchange_name;
};
#endif // MQ_PRODUCER_H
cpp
#include "mq_producer.h"
MqProducer::MqProducer(const std::string &host, const std::string &username, const std::string &password)
{
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket)
{
throw std::runtime_error("creating TCP socket");
}
// 创建连接
int status = amqp_socket_open(socket, host.c_str(), 5672);
if (status)
{
throw std::runtime_error("打开SOCKET失败");
}
// 登录
amqp_rpc_reply_t reply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username.c_str(), password.c_str());
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
throw std::runtime_error("登录失败");
}
// 打开通道
amqp_channel_open(conn, 1);
reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
throw std::runtime_error("打开通道失败");
}
std::cout << "MQ连接成功" << std::endl;
}
MqProducer::~MqProducer()
{
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
}
bool MqProducer::init_exchange(const std::string &exchange_name, const std::string &exchange_type)
{
this->exchange_name = exchange_name;
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange_name.c_str()), amqp_cstring_bytes(exchange_type.c_str()), 0, 1, 0, 0, amqp_empty_table);
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
std::cout << "声明交换机失败" << std::endl;
return false;
}
return true;
}
cpp
#include "mq_producer.h"
int main()
{
MqProducer producer("1.95.120.104", "zwh", "zwh123456");
producer.init_exchange("iot_th", "topic");
return 0;
}