![Latest Version] ![Docs badge]
Kafka driver for distributed systems built with tarantool-module. This driver use cbus channels for communication between tokio and tarantool threads. Please familiarize with it first.
Create new consumer:
```rust use std::rc::Rc; use picokafka::consumer; use tarantool::fiber::{Fiber, Mutex}; use tarantool::cbus::Endpoint;
pub fn main() { // create cbus endpoint in separate fiber let mut fiber = Fiber::new("f1", &mut |: Box<()>| { let cbusendpoint = Endpoint::new("myendpoint").expect("error on start cbus endpoint"); cbusendpoint.cbus_loop(); 0 }); fiber.start(());
// buffer for consumed messages
let consumed = Rc::new(Mutex::new(vec![]));
// create handler for received messages, this handler will executed in
// tarantool TX thread, so any tarantool API's can used
let message_handler = {
let consumed = consumed.clone();
move |msg, _ctrl| consumed.lock().push(msg)
};
// create consumer and set the callback for consumed messages
let consumer =
consumer::Builder::new("kafka:29092")
.with_group("group_1")
.append_topic("topic_1")
.start("my_endpoint", message_handler);
} ```
You can pass additional configuration parameters for librdkafka, see: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method:
```rust use picokafka::consumer;
let consumer = consumer::Builder::new("kafka:29092") .withopt("enable.auto.offset.store", "false") .withsessiontimeout(Duration::fromsecs(10)) .start("myendpoint", |, _| {}); ```
Note that the callback executed in tarantool TX thread, in special fiber.
Create new producer:
```rust use picokafka::producer;
let producer = producer::Builder::new(&*BROKERADDR) .withmessagetimeout(Duration::fromsecs(1)) .build("my_endpoint") .unwrap(); ```
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method.
Send a message: ```rust static SEEN_RESULT: AtomicBool = AtomicBool::new(false);
producer.send( Record::new("topic1") .key(String::from("key1")) .payload(String::from("payload1")), Duration::fromsecs(1), |res, | { assert!(res.result.isok()); SEEN_RESULT.store(true, Ordering::SeqCst); }, ); ```
Note that sent callback executed in tarantool TX thread, in special fiber.
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
You need start kafka before testing. You can use tests/docker-compose.yml file:
bash
docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
docker-compose -f tests/docker-compose.yml up -d
Or create your own environment (set KAFKAADDR and KAFKAREST_ADDR if you do that).
Then use tarantool-test utilit:
cargo build
tarantool-test -p ./target/debug/libtests.so
Run benchmarks (using tarantool-runner util):
cargo build
tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint
Result of produce 1000 messages:
producer_sync 10000 messages (1 samples)
[ave.] 32.461472ms
32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)
Result of consume 1000000 messages:
consume 1000000 messages (1 samples)
[ave.] 6.182463391s
6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)