This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log.
It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of [IOx].
This crate has:
It will be a good fit for workloads that:
```rust,no_run
use rskafka::{ client::ClientBuilder, record::Record, }; use time::OffsetDateTime; use std::collections::BTreeMap;
// setup client let connection = "localhost:9093".to_owned(); let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
// create a topic let topic = "mytopic"; let controllerclient = client.controllerclient().await.unwrap(); controllerclient.createtopic( topic, 2, // partitions 1, // replication factor 5000, // timeout (ms) ).await.unwrap();
// get a partition-bound client let partitionclient = client .partitionclient( topic.to_owned(), 0, // partition ) .await .unwrap();
// produce some data let record = Record { key: b"".tovec(), value: b"hello kafka".tovec(), headers: BTreeMap::from([ ("foo".toowned(), b"bar".tovec()), ]), timestamp: OffsetDateTime::nowutc(), }; partitionclient.produce(vec![record]).await.unwrap();
// consume data let (records, highwatermark) = partitionclient .fetchrecords( 0, // offset 1..1000000, // min..max bytes 1000, // max wait time ) .await .unwrap();
```
For more advanced production and consumption, see [crate::client::producer
] and [crate::client::consumer
].
fuzzing
: Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable
feature / API!transport-tls
(default): Allows TLS transport via [rustls].transport-socks5
: Allow transport via SOCKS5 proxy.To run integration tests against [Redpanda], run:
console
$ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:
console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test
in another session.
To run integration tests against [Apache Kafka], run:
console
$ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:
console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9094 cargo test
in another session.
RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have [cargo-fuzz] installed. Select one of the following fuzzers:
protocol_reader
: Selects an API key and API version and then reads message frames and tries to decode the
response object. The message frames are read w/o the length marker for more efficient fuzzing.record_batch_body_reader
: Reads the inner part of a record batch (w/o the prefix that contains length and CRC)
and tries to decode it. In theory this is covered by protocol_reader
as well but the length fields and CRC make it
hard for the fuzzer to traverse this data structure.Then run the fuzzer with:
console
$ cargo +nightly fuzz run protocol_reader
...
Let it running for how long you wish or until it finds a crash:
```text ... Failing input:
fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3
Output of std::fmt::Debug
:
[0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]
Reproduce with:
cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3
Minimize test case with:
cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3
```
Sadly the backtraces that you might get are not really helpful and you need a debugger to detect the exact source locations:
```console $ rust-lldb ./target/x8664-unknown-linux-gnu/release/protocolreader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5 ...
(lldb) r ... Process 177543 launched: '/home/mneumann/src/rskafka/target/x8664-unknown-linux-gnu/release/protocolreader' (x86_64) INFO: Running with entropic power schedule (0xFF, 100). INFO: Seed: 3549747846 ... ==177543==ABORTING (lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report. Process 177543 stopped ...
(lldb) bt
* thread #1, name = 'protocolreader', stop reason = AddressSanitizer detected: allocation-size-too-big
* frame #0: 0x0000555556c04f20 protocolreader::AsanDie() at asan_rtl.cpp:45:7
frame #1: 0x0000555556c1a33c protocol_reader
_sanitizer::Die() at sanitizertermination.cpp:55:7
frame #2: 0x0000555556c01471 protocolreader::~ScopedInErrorReport() at asan_report.cpp:190:7
frame #3: 0x0000555556c021f4 protocol_reader
::ReportAllocationSizeTooBig() at asanreport.cpp:313:1
...
```
Then create a unit test and fix the bug.
For out-of-memory errors [LLDB] does not stop automatically. You can however set a breakpoint before starting the execution that hooks right into the place where it is about to exit:
console
(lldb) b fuzzer::PrintStackTrace()
Licensed under either of these:
Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.
independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest