Flexible and easy to configure Event Bus for event-driven systems in Rust.
Create your own structs and integrate your services / microservices by sending strongly typed payloads.
A message received from the queue will be sent to a Handler.
Your Message Handler must implement the trait MessageHandler, and inform the type of Message it you handle.
``` struct NotifyUserEventHandler;
impl MessageHandler
// necessary only for broadcasting
fn get_handler_action(&self) -> String {
todo!()
}
} ``` This method will receive the messages with the type that was configured, from the queue the subscriber will be listening.
The HandleError struct is used to inform that the process didn't ocurr corretly, though the message was received.
With the Error object you can also tell the Bus whether this message should be requeued in order to try the process again.
Notice that the Message type we want to send and receive between services is NotifyUserMessage.
When we create the Event Message struct and the Message Handler, we are defining: - the format of the event message we are sending between services. - the struct/method that will handle the incoming messages.
Here you have an example of Message, that we are using in the current example:
```
pub struct NotifyUserMessage { pub userid: String, pub username: String, pub user_name: String, } ``` Notice that your struct must derive from BorshDeserialize and BorshSerialize, so Crosstow Bus is able to serialize the struct you defined to send to RabbitMQ and desserialize the messages coming from RabbitMQ into your customized format.
So, don't forget to add the imports to youw cargo.toml file.
borsh = "0.9.3"
borsh-derive = "0.9.1"
First, let's create a Receiver object
let receiver = CrosstownBus::new_receiver("amqp://guest:guest@localhost:5672".to_owned())?;
After that, call the subscribe_event method, passing the event name / queue name that you want to subbscribe to. If the queue was not created on RabbitMQ, it will be created when the receiver subscribes to it.
_ = receiver.receive("notify_user".to_owned(), NotifyUserEventHandler,
QueueProperties { auto_delete: false, durable: false, use_dead_letter: true });
Note that the subscribeevent_ method in async, therefore, I'm calling await when invoking it.
Another option is to block it, by using the following notation:
futures::executor::block_on(receiver.receive("notify_user".to_owned(), NotifyUserEventHandler, None));
The parameter queue_properties is optional and holds specific queue configurations, such as whether the queue should be auto deleted and durable.
To create the publisher the process is pretty much the same, only a different creation method.
``` let mut publisher = CrosstownBus::newqueuepublisher("amqp://guest:guest@localhost:5672".to_owned())?;
_ = publisher.publishevent("notifyuser".toowned(), NotifyUserMessage { userid: "asdf".toowned(), username: "Billy Gibbons".toowned(), email: "bg@test.com".toowned() }); ``` Since the method publish_event receives a generic parameter as the Message, we can use the same publisher object to publish multiple objects types to multiple queues. Warning: if the message type you are publishing on a queue doesn't match what the subscriber handler is expecting, it will not be possible to parse the message and a message will be logged.
You can also manually close the connection, if needed:
_ = publisher.close_connection();