RSKafka

CircleCI Crates.io Documentation License

This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log.

It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of [IOx].

This crate has:

It will be a good fit for workloads that:

Usage

```rust,no_run

async fn test() {

use rskafka::{ client::{ ClientBuilder, partition::{Compression, UnknownTopicHandling}, }, record::Record, }; use chrono::{TimeZone, Utc}; use std::collections::BTreeMap;

// setup client let connection = "localhost:9093".to_owned(); let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic let topic = "mytopic"; let controllerclient = client.controllerclient().unwrap(); controllerclient.createtopic( topic, 2, // partitions 1, // replication factor 5000, // timeout (ms) ).await.unwrap();

// get a partition-bound client let partitionclient = client .partitionclient( topic.to_owned(), 0, // partition UnknownTopicHandling::Retry, ) .await .unwrap();

// produce some data let record = Record { key: None, value: Some(b"hello kafka".tovec()), headers: BTreeMap::from([ ("foo".toowned(), b"bar".tovec()), ]), timestamp: Utc.timestampmillis(42), }; partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data let (records, highwatermark) = partitionclient .fetchrecords( 0, // offset 1..1000000, // min..max bytes 1000, // max wait time ) .await .unwrap();

}

```

For more advanced production and consumption, see [crate::client::producer] and [crate::client::consumer].

Features

Testing

Redpanda

To run integration tests against [Redpanda], run:

console $ docker-compose -f docker-compose-redpanda.yml up

in one session, and then run:

console $ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

in another session.

Apache Kafka

To run integration tests against [Apache Kafka], run:

console $ docker-compose -f docker-compose-kafka.yml up

in one session, and then run:

console $ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.

Using a SOCKS5 Proxy

To run the integration test via a SOCKS5 proxy, you need to set the environment variable SOCKS_PROXY. The following command requires a running proxy on the local machine.

console $ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full

The SOCKS5 proxy will automatically be started by the docker compose files. Note that KAFKA_CONNECT was extended by addresses that are reachable via the proxy.

Java Interopt

To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the TEST_JAVA_INTEROPT=1 environment variable set.

Fuzzing

RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have [cargo-fuzz] installed. Select one of the following fuzzers:

Then run the fuzzer with:

console $ cargo +nightly fuzz run protocol_reader ...

Let it running for how long you wish or until it finds a crash:

```text ... Failing input:

    fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Output of std::fmt::Debug:

    [0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]

Reproduce with:

    cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Minimize test case with:

    cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

```

Sadly the backtraces that you might get are not really helpful and you need a debugger to detect the exact source locations:

```console $ rust-lldb ./target/x8664-unknown-linux-gnu/release/protocolreader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5 ...

(lldb) r ... Process 177543 launched: '/home/mneumann/src/rskafka/target/x8664-unknown-linux-gnu/release/protocolreader' (x86_64) INFO: Running with entropic power schedule (0xFF, 100). INFO: Seed: 3549747846 ... ==177543==ABORTING (lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report. Process 177543 stopped ...

(lldb) bt * thread #1, name = 'protocolreader', stop reason = AddressSanitizer detected: allocation-size-too-big * frame #0: 0x0000555556c04f20 protocolreader::AsanDie() at asan_rtl.cpp:45:7 frame #1: 0x0000555556c1a33c protocol_reader_sanitizer::Die() at sanitizertermination.cpp:55:7 frame #2: 0x0000555556c01471 protocolreader::~ScopedInErrorReport() at asan_report.cpp:190:7 frame #3: 0x0000555556c021f4 protocol_reader::ReportAllocationSizeTooBig() at asanreport.cpp:313:1 ... ```

Then create a unit test and fix the bug.

For out-of-memory errors [LLDB] does not stop automatically. You can however set a breakpoint before starting the execution that hooks right into the place where it is about to exit:

console (lldb) b fuzzer::PrintStackTrace()

Benchmarks

Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

console $ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here for the parallel/rskafka benchmark):

console $ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \ bench --all-features --bench write_throughput -- \ --bench --noplot parallel/rskafka

Have a look at the report:

console $ perf report

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest