PICOKAFKA

![Latest Version] ![Docs badge]

Kafka driver for distributed systems built with tarantool-module. This driver use cbus channels for communication between tokio and tarantool threads. Please familiarize with it first.

Consumer

Create new consumer:

```rust use std::rc::Rc; use picokafka::consumer; use tarantool::fiber::{Fiber, Mutex}; use tarantool::cbus::Endpoint;

pub fn main() { // create cbus endpoint in separate fiber let mut fiber = Fiber::new("f1", &mut |: Box<()>| { let cbusendpoint = Endpoint::new("myendpoint").expect("error on start cbus endpoint"); cbusendpoint.cbus_loop(); 0 }); fiber.start(());

// buffer for consumed messages
let consumed = Rc::new(Mutex::new(vec![]));

// create handler for received messages, this handler will executed in
// tarantool TX thread, so any tarantool API's can used
let message_handler = {
    let consumed = consumed.clone();
    move |msg, _ctrl| consumed.lock().push(msg)
};

// create consumer and set the callback for consumed messages
let consumer =
    consumer::Builder::new("kafka:29092")
        .with_group("group_1")
        .append_topic("topic_1")
        .start("my_endpoint", message_handler);

} ```

You can pass additional configuration parameters for librdkafka, see: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method:

```rust use picokafka::consumer;

let consumer = consumer::Builder::new("kafka:29092") .withopt("enable.auto.offset.store", "false") .withsessiontimeout(Duration::fromsecs(10)) .start("myendpoint", |, _| {}); ```

Note that the callback executed in tarantool TX thread, in special fiber.

Producer

Create new producer:

```rust use picokafka::producer;

let producer = producer::Builder::new(&*BROKERADDR) .withmessagetimeout(Duration::fromsecs(1)) .build("my_endpoint") .unwrap(); ```

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method.

Send a message: ```rust static SEEN_RESULT: AtomicBool = AtomicBool::new(false);

producer.send( Record::new("topic1") .key(String::from("key1")) .payload(String::from("payload1")), Duration::fromsecs(1), |res, | { assert!(res.result.isok()); SEEN_RESULT.store(true, Ordering::SeqCst); }, ); ```

Note that sent callback executed in tarantool TX thread, in special fiber.

SSL and SASL

For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.

Tests

You need start kafka before testing. You can use tests/docker-compose.yml file:

bash docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh docker-compose -f tests/docker-compose.yml up -d

Or create your own environment (set KAFKAADDR and KAFKAREST_ADDR if you do that).

Then use tarantool-test utilit:

cargo build tarantool-test -p ./target/debug/libtests.so

Benchmarks

Run benchmarks (using tarantool-runner util): cargo build tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint

Result of produce 1000 messages: producer_sync 10000 messages (1 samples) [ave.] 32.461472ms 32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)

Result of consume 1000000 messages: consume 1000000 messages (1 samples) [ave.] 6.182463391s 6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)