Message Queues (AMQP)

RecipeCratesCategories
Connect to RabbitMQ with lapinlapincat-database

AMQP stands for Advanced Message Queuing Protocol. It is an open standard messaging protocol that is used to reliably exchange messages between applications or systems, even if they are written in different programming languages and running on different platforms. AMQP ensures that messages are delivered reliably, even if there are network issues or system failures. It offers different levels of delivery guarantees, from "at-most-once" to "exactly-once" delivery.

RabbitMQ is an example of a popular message broker that implements the Advanced Message Queuing Protocol. It use cases include microservices communication, task queues, background job processing, and publish-subscribe (pub-sub).

Connect to RabbitMQ with lapin

lapin lapin-crates.io lapin-github lapin-lib.rs cat-database

lapin is a AMQP client library. It is a pure Rust AMQP 0.9.1 client implementation, that is feature complete, fast, and easy to use.

use futures::stream::StreamExt; /* or: use futures_lite::stream::StreamExt; */
use lapin::BasicProperties;
use lapin::Channel;
use lapin::Connection;
use lapin::ConnectionProperties;
use lapin::Queue;
use lapin::options::*;
use lapin::types::FieldTable;

// AMQP client library for e.g. RabbitMQ

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Connect to RabbitMQ server
    let addr = std::env::var("AMQP_ADDR")
        .unwrap_or_else(|_| "amqp://127.0.0.1:5672".into());
    let conn =
        Connection::connect(&addr, ConnectionProperties::default()).await?;
    // Main entry point for most AMQP operations.
    // Channel serves as a "lightweight connection"
    // and can be obtained from a Connection
    let channel: Channel = conn.create_channel().await?;

    // Declare a queue
    let _queue: Queue = channel
        .queue_declare(
            "my_queue",
            QueueDeclareOptions::default(), /* Whether the queue is passive,
                                             * durable, exclusive,
                                             * auto_delete... */
            FieldTable::default(), // a Map<String, AMQPValue>
        )
        .await?;

    // Publish a message to the queue
    let message = "Hello from Rust!";
    channel
        .basic_publish(
            "",         // exchange
            "my_queue", // routing key
            BasicPublishOptions::default(),
            message.as_bytes(),
            BasicProperties::default(),
        )
        .await?
        .await?; // Wait for confirmation

    println!("Sent message: {}", message);

    // Consume messages from the queue
    let mut consumer = channel
        .basic_consume(
            "my_queue",
            "consumer_tag",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;

    // Process messages
    println!("Waiting for messages...");
    if let Some(delivery) = consumer.next().await {
        let delivery = delivery?;
        println!(
            "Received message: {:?}",
            String::from_utf8_lossy(&delivery.data)
        );

        // Acknowledge the message (i.e., the message is removed from the queue)
        delivery.ack(BasicAckOptions::default()).await?;
        // In this example, we quit after the first and only message.
        // In real life, use `while let`.
    }

    Ok(())
}