![Latest Version] ![Docs badge]
Kafka driver for distributed systems built with tarantool-module.
Create new consumer: ```rust use picokafka::consumer;
let consumer = consumer::Builder::new("kafka:29092") .withgroup("group1") .appendtopic("topic1") .start(); ```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method:
rust
let consumer = consumer::Builder::new("kafka:29092")
.with_opt("enable.auto.offset.store", "false")
.with_session_timeout(Duration::from_secs(10))
.start();
For handling consumer output use Consumer::output
method:
rust
let consumed = consumer.output().collect::<Vec<_>>();
consumed.iter().for_each(|received| {
assert!(received.is_ok());
});
Note that consumer prepare kafka records for output in separate tokio threads.
Create new producer:
```rust use picokafka::producer;
let producer = producer::Builder::new("kafka:29092") .withmessagetimeout(Duration::from_secs(1)) .build() .unwrap(); ```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method.
Send message:
rust
producer.send(
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
Duration::from_secs(1),
move |result| {
println!("send result: {:?}", result);
},
);
Note that sent callback executed in separate tokio threads. If you want to use
tarantool API - use TarantoolProducer
instead of Producer
.
Create: ```rust use picokafka::producer;
let producer = producer::Builder::new("kafka:29092") .withmessagetimeout(Duration::from_secs(1)) .build() .unwrap() .tarantool(); ```
Send:
rust
producer.send(
IdentifiedRecord::new(
2,
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
),
Duration::from_secs(1),
);
Sent result handling:
rust
producer.output().for_each(|(descriptor, _)| {
let descriptor = descriptor.downcast::<i32>().unwrap();
assert!(*descriptor == 2);
});
TarantoolProducer
use IdentifiedRecord
instead of Record
because we need a way to distinguish messages in the output from each other.
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
You need start kafka before testing. You can use tests/docker-compose.yml file:
bash
docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
docker-compose -f tests/docker-compose.yml up -d
Or create your own environment (set KAFKAADDR and KAFKAREST_ADDR if you do that).
Then run cargo test
or cargo test --features "ssl"
if you need ssl feature.
After starting kafka environment use cargo bench
command.