AMQP
Recipe | Crates | Categories |
---|---|---|
Lapin |
Lapin
lapin
is a AMQP client library. It is a pure Rust AMQP 0.9.1 client implementation, that is feature complete, fast, and easy to use.
use futures::stream::StreamExt; /* or: use futures_lite::stream::StreamExt; */ use lapin::BasicProperties; use lapin::Channel; use lapin::Connection; use lapin::ConnectionProperties; use lapin::Queue; use lapin::options::*; use lapin::types::FieldTable; // AMQP client library for e.g. RabbitMQ #[tokio::main] async fn main() -> anyhow::Result<()> { // Connect to RabbitMQ server let addr = std::env::var("AMQP_ADDR") .unwrap_or_else(|_| "amqp://127.0.0.1:5672".into()); let conn = Connection::connect(&addr, ConnectionProperties::default()).await?; // Main entry point for most AMQP operations. // Channel serves as a "lightweight connection" // and can be obtained from a Connection let channel: Channel = conn.create_channel().await?; // Declare a queue let _queue: Queue = channel .queue_declare( "my_queue", QueueDeclareOptions::default(), /* Whether the queue is passive, * durable, exclusive, * auto_delete... */ FieldTable::default(), // a Map<String, AMQPValue> ) .await?; // Publish a message to the queue let message = "Hello from Rust!"; channel .basic_publish( "", // exchange "my_queue", // routing key BasicPublishOptions::default(), message.as_bytes(), BasicProperties::default(), ) .await? .await?; // Wait for confirmation println!("Sent message: {}", message); // Consume messages from the queue let mut consumer = channel .basic_consume( "my_queue", "consumer_tag", BasicConsumeOptions::default(), FieldTable::default(), ) .await?; // Process messages println!("Waiting for messages..."); if let Some(delivery) = consumer.next().await { let delivery = delivery?; println!( "Received message: {:?}", String::from_utf8_lossy(&delivery.data) ); // Acknowledge the message (i.e., the message is removed from the queue) delivery.ack(BasicAckOptions::default()).await?; // In this example, we quit after the first and only message. // In real life, use `while let`. } Ok(()) }