AMQP

RecipeCratesCategories
Lapinlapincat-database

Lapin

lapin lapin-crates.io lapin-github lapin-lib.rs cat-database

lapin is a AMQP client library. It is a pure Rust AMQP 0.9.1 client implementation, that is feature complete, fast, and easy to use.

use futures::stream::StreamExt; /* or: use futures_lite::stream::StreamExt; */
use lapin::BasicProperties;
use lapin::Channel;
use lapin::Connection;
use lapin::ConnectionProperties;
use lapin::Queue;
use lapin::options::*;
use lapin::types::FieldTable;

// AMQP client library for e.g. RabbitMQ

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Connect to RabbitMQ server
    let addr = std::env::var("AMQP_ADDR")
        .unwrap_or_else(|_| "amqp://127.0.0.1:5672".into());
    let conn =
        Connection::connect(&addr, ConnectionProperties::default()).await?;
    // Main entry point for most AMQP operations.
    // Channel serves as a "lightweight connection"
    // and can be obtained from a Connection
    let channel: Channel = conn.create_channel().await?;

    // Declare a queue
    let _queue: Queue = channel
        .queue_declare(
            "my_queue",
            QueueDeclareOptions::default(), /* Whether the queue is passive,
                                             * durable, exclusive,
                                             * auto_delete... */
            FieldTable::default(), // a Map<String, AMQPValue>
        )
        .await?;

    // Publish a message to the queue
    let message = "Hello from Rust!";
    channel
        .basic_publish(
            "",         // exchange
            "my_queue", // routing key
            BasicPublishOptions::default(),
            message.as_bytes(),
            BasicProperties::default(),
        )
        .await?
        .await?; // Wait for confirmation

    println!("Sent message: {}", message);

    // Consume messages from the queue
    let mut consumer = channel
        .basic_consume(
            "my_queue",
            "consumer_tag",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;

    // Process messages
    println!("Waiting for messages...");
    if let Some(delivery) = consumer.next().await {
        let delivery = delivery?;
        println!(
            "Received message: {:?}",
            String::from_utf8_lossy(&delivery.data)
        );

        // Acknowledge the message (i.e., the message is removed from the queue)
        delivery.ack(BasicAckOptions::default()).await?;
        // In this example, we quit after the first and only message.
        // In real life, use `while let`.
    }

    Ok(())
}