A rust implementation of AMQP 1.0 protocol based on serde and tokio.
toml
default = []
| Feature | Description |
|---------|-------------|
|"rustls"
| enables TLS integration with tokio-rustls
and rustls
|
|"native-tls"
|enables TLS integration with tokio-native-tls
and native-tls
|
|"acceptor"
|enables ConnectionAcceptor
, SessionAcceptor
, and LinkAcceptor
|
|"transaction"
| enables Controller
, Transaction
, OwnedTransaction
and control_link_acceptor
|
More examples including one showing how to use it with Azure Serivce Bus can be found on the GitHub repo.
Below is an example with a local broker (TestAmqpBroker
)
listening on the localhost. The broker is executed with the following command
powershell
./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
The following code requires the [tokio
] async runtime added to the dependencies.
```rust use fe2o3_amqp::{Connection, Session, Sender, Receiver};
async fn main() { let mut connection = Connection::open( "connection-1", // container id "amqp://guest:guest@localhost:5672" // url ).await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
// Create a sender
let mut sender = Sender::attach(
&mut session, // Session
"rust-sender-link-1", // link name
"q1" // target address
).await.unwrap();
// Create a receiver
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1", // link name
"q1" // source address
).await.unwrap();
// Send a message to the broker and wait for outcome (Disposition)
let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Send a message with batchable field set to true
let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Receive the message from the broker
let delivery = receiver.recv::<String>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
sender.close().await.unwrap(); // Detach sender with closing Detach performatives
receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
session.end().await.unwrap(); // End the session
connection.close().await.unwrap(); // Close the connection
} ```
```rust use tokio::net::TcpListener; use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
async fn main() { let tcplistener = TcpListener::bind("localhost:5672").await.unwrap(); let connectionacceptor = ConnectionAcceptor::new("example-listener");
while let Ok((stream, addr)) = tcp_listener.accept().await {
let mut connection = connection_acceptor.accept(stream).await.unwrap();
let handle = tokio::spawn(async move {
let session_acceptor = SessionAcceptor::new();
while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
let handle = tokio::spawn(async move {
let link_acceptor = LinkAcceptor::new();
match link_acceptor.accept(&mut session).await.unwrap() {
LinkEndpoint::Sender(sender) => { },
LinkEndpoint::Receiver(recver) => { },
}
});
}
});
}
} ```
fe2o3-amqp-ws
is needed for WebSocket binding
```rust use fe2o3amqp::{ types::{messaging::Outcome, primitives::Value}, Connection, Delivery, Receiver, Sender, Session, }; use fe2o3amqp_ws::WebSocketStream;
async fn main() { let (wsstream, _response) = WebSocketStream::connect("ws://localhost:5673") .await .unwrap(); let mut connection = Connection::builder() .containerid("connection-1") .openwithstream(ws_stream) .await .unwrap();
connection.close().await.unwrap();
} ```
More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.
| Name | Description |
|------|-------------|
|serde_amqp_derive
| Custom derive macro for described types as defined in AMQP1.0 protocol |
|serde_amqp
| AMQP1.0 serializer and deserializer as well as primitive types |
|fe2o3-amqp-types
| AMQP1.0 data types |
|fe2o3-amqp
| Implementation of AMQP1.0 Connection
, Session
, and Link
|
|fe2o3-amqp-ext
| Extension types and implementations |
|fe2o3-amqp-ws
| WebSocket binding for fe2o3-amqp
transport |
1.56.0 (ie. 2021 edition)
The items below are listed in the order of priority.
tokio-rustls
tokio-native-tls
fe2o3-amqp-ws
License: MIT/Apache-2.0