![Latest Version] ![Docs badge]
Kafka drive for distributed systems built with tarantool-module.
Create new consumer: ```rust use picokafka::consumer;
let consumer = consumer::Builder::new(&*BROKERADDR) .withgroup("group1") .appendtopic("topic_1") .start(); ```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method:
rust
let consumer = consumer::Builder::new(&*BROKER_ADDR)
.with_opt("enable.auto.offset.store", "false")
.with_session_timeout(Duration::from_secs(10))
.start();
For handling consumer output use Consumer::output
method:
rust
let consumed = consumer.output().collect::<Vec<_>>();
consumed.iter().for_each(|received| {
assert!(received.is_ok());
});
Note that consumer prepare kafka records for output in separate tokio threads.
Create new producer:
```rust use picokafka::producer;
let producer = producer::Builder::new(&*BROKERADDR) .withmessagetimeout(Duration::fromsecs(1)) .build() .unwrap(); ```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method.
Send message:
rust
producer.send(
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
Duration::from_secs(1),
move |result| {
println!("send result: {:?}", result);
},
);
Note that sent callback executed in separate tokio threads. If you want to use
tarantool API - use TarantoolProducer
instead of Producer
.
Create: ```rust use picokafka::producer;
let producer = producer::Builder::new(&*BROKERADDR) .withmessagetimeout(Duration::fromsecs(1)) .build() .unwrap() .tarantool(); ```
Send:
rust
producer.send(
IdentifiedRecord::new(
1,
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
),
Duration::from_secs(1),
);
Sent result handling:
rust
producer.output().for_each(|(descriptor, _)| {
let descriptor = descriptor.downcast::<i32>().unwrap();
assert!(*descriptor == 2);
});
TarantoolProducer
use IdentifiedRecord
instead of Record
because we need a way to distinguish messages in the output from each other.
You need start kafka before testing. You can use tests/docker-compose.yml file:
bash
docker-compose -f tests/docker-compose.yml up -d
Or create your own environment (set KAFKAADDR and KAFKAREST_ADDR if you do that).
Then run cargo test
.
After starting kafka environment use cargo bench
command.