An rust implementation of ASMQP 1.0 protocol based on serde and tokio.
default: []
"rustls"
: enables TLS integration with tokio-rustls
and rustls
"native-tls"
: enables TLS integration with tokio-native-tls
and native-tls
"acceptor"
: enables ConnectionAcceptor
, SessionAcceptor
, and LinkAcceptor
"listener"
: TODOBelow is an example with a local broker (
TestAmqpBroker
)
listening on the localhost. The broker is executed with the following command
powershell
./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
The following code requires the [tokio
] async runtime added to the dependencies.
```rust use fe2o3_amqp::{Connection, Session, Sender, Receiver};
async fn main() { let mut connection = Connection::open( "connection-1", // container id "amqp://guest:guest@localhost:5672" // url ).await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
// Create a sender
let mut sender = Sender::attach(
&mut session, // Session
"rust-sender-link-1", // link name
"q1" // target address
).await.unwrap();
// Create a receiver
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1", // link name
"q1" // source address
).await.unwrap();
// Send a message to the broker
sender.send("hello AMQP").await.unwrap();
// Receive the message from the broker
let delivery = receiver.recv::<String>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
// Detach links with closing Detach performatives
sender.close().await.unwrap();
receiver.close().await.unwrap();
// End the session
session.end().await.unwrap();
// Close the connection
connection.close().await.unwrap();
} ```
```rust use tokio::net::TcpListener; use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
async fn main() { let tcplistener = TcpListener::bind("localhost:5672").await.unwrap(); let connectionacceptor = ConnectionAcceptor::new("example-listener");
while let Ok((stream, addr)) = tcp_listener.accept().await {
let mut connection = connection_acceptor.accept(stream).await.unwrap();
let handle = tokio::spawn(async move {
let session_acceptor = SessionAcceptor::new();
while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
let handle = tokio::spawn(async move {
let link_acceptor = LinkAcceptor::new();
match link_acceptor.accept(&mut session).await.unwrap() {
LinkEndpoint::Sender(sender) => { },
LinkEndpoint::Receiver(recver) => { },
}
});
}
});
}
} ```
More examples of sending and receiving can be found on the GitHub repo. The example has been used for testing with a local TestAmqpBroker.
License: MIT/Apache-2.0