Skip to content

生产者

cpp
#ifndef MQ_PRODUCER_H
#define MQ_PRODUCER_H

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <string>

class MqProducer
{
public:
    MqProducer(const std::string &host, const std::string &username, const std::string &password);
    ~MqProducer();
    bool init_exchange(const std::string &exchange_name, const std::string &exchange_type);

private:
    amqp_connection_state_t conn;
    amqp_socket_t *socket;
    std::string exchange_name;
};

#endif // MQ_PRODUCER_H
cpp
#include "mq_producer.h"

MqProducer::MqProducer(const std::string &host, const std::string &username, const std::string &password)
{
    conn = amqp_new_connection();
    socket = amqp_tcp_socket_new(conn);

    if (!socket)
    {
        throw std::runtime_error("creating TCP socket");
    }

    // 创建连接
    int status = amqp_socket_open(socket, host.c_str(), 5672);

    if (status)
    {
        throw std::runtime_error("打开SOCKET失败");
    }

    // 登录
    amqp_rpc_reply_t reply = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username.c_str(), password.c_str());

    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        throw std::runtime_error("登录失败");
    }

    // 打开通道
    amqp_channel_open(conn, 1);
    reply = amqp_get_rpc_reply(conn);

    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        throw std::runtime_error("打开通道失败");
    }

    std::cout << "MQ连接成功" << std::endl;
}

MqProducer::~MqProducer()
{
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
}

bool MqProducer::init_exchange(const std::string &exchange_name, const std::string &exchange_type)
{
    this->exchange_name = exchange_name;
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange_name.c_str()), amqp_cstring_bytes(exchange_type.c_str()), 0, 1, 0, 0, amqp_empty_table);
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        std::cout << "声明交换机失败" << std::endl;
        return false;
    }
    return true;
}
cpp
#include "mq_producer.h"

int main()
{
    MqProducer producer("1.95.120.104", "zwh", "zwh123456");
    producer.init_exchange("iot_th", "topic");
    return 0;
}