PICOKAFKA

![Latest Version] ![Docs badge]

Kafka driver for distributed systems built with tarantool-module.

Consumer

Create new consumer: ```rust use picokafka::consumer;

let consumer = consumer::Builder::new("kafka:29092") .withgroup("group1") .appendtopic("topic1") .start(); ```

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method:

rust let consumer = consumer::Builder::new("kafka:29092") .with_opt("enable.auto.offset.store", "false") .with_session_timeout(Duration::from_secs(10)) .start();

For handling consumer output use Consumer::output method: rust let consumed = consumer.output().collect::<Vec<_>>(); consumed.iter().for_each(|received| { assert!(received.is_ok()); });

Note that consumer prepare kafka records for output in separate tokio threads.

Producer

Create new producer:

```rust use picokafka::producer;

let producer = producer::Builder::new("kafka:29092") .withmessagetimeout(Duration::from_secs(1)) .build() .unwrap(); ```

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method.

Send message: rust producer.send( Record::new("topic_1") .key(String::from("key_1")) .payload(String::from("payload_1")), Duration::from_secs(1), move |result| { println!("send result: {:?}", result); }, );

Note that sent callback executed in separate tokio threads. If you want to use tarantool API - use TarantoolProducer instead of Producer.

TarantoolProducer

Create: ```rust use picokafka::producer;

let producer = producer::Builder::new("kafka:29092") .withmessagetimeout(Duration::from_secs(1)) .build() .unwrap() .tarantool(); ```

Send: rust producer.send( IdentifiedRecord::new( 2, Record::new("topic_1") .key(String::from("key_1")) .payload(String::from("payload_1")), ), Duration::from_secs(1), );

Sent result handling: rust producer.output().for_each(|(descriptor, _)| { let descriptor = descriptor.downcast::<i32>().unwrap(); assert!(*descriptor == 2); });

TarantoolProducer use IdentifiedRecord instead of Record because we need a way to distinguish messages in the output from each other.

SSL and SASL

For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.

Tests

You need start kafka before testing. You can use tests/docker-compose.yml file: bash docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh docker-compose -f tests/docker-compose.yml up -d

Or create your own environment (set KAFKAADDR and KAFKAREST_ADDR if you do that).

Then run cargo test or cargo test --features "ssl" if you need ssl feature.

Benchmarks

After starting kafka environment use cargo bench command.