This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log. This is a fork from the original RSKafka with support for WebAssembly compilation target. That allows Kafka apps to run inside the WasmEdge Runtime as a lightweight and secure alternative to natively compiled apps in Linux container.
It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of [IOx].
This crate has:
It will be a good fit for workloads that:
```rust,no_run
use rskafka::{ client::{ ClientBuilder, partition::{Compression, UnknownTopicHandling}, }, record::Record, }; use chrono::{TimeZone, Utc}; use std::collections::BTreeMap;
// setup client let connection = "localhost:9093".to_owned(); let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
// create a topic let topic = "mytopic"; let controllerclient = client.controllerclient().unwrap(); controllerclient.createtopic( topic, 2, // partitions 1, // replication factor 5000, // timeout (ms) ).await.unwrap();
// get a partition-bound client let partitionclient = client .partitionclient( topic.to_owned(), 0, // partition UnknownTopicHandling::Retry, ) .await .unwrap();
// produce some data let record = Record { key: None, value: Some(b"hello kafka".tovec()), headers: BTreeMap::from([ ("foo".toowned(), b"bar".tovec()), ]), timestamp: Utc.timestampmillis(42), }; partition_client.produce(vec![record], Compression::default()).await.unwrap();
// consume data let (records, highwatermark) = partitionclient .fetchrecords( 0, // offset 1..1000000, // min..max bytes 1000, // max wait time ) .await .unwrap();
```
For more advanced production and consumption, see [crate::client::producer
] and [crate::client::consumer
].
compression-gzip
(default): Support compression and decompression of messages using [gzip].compression-lz4
(default): Support compression and decompression of messages using [LZ4].compression-snappy
(default): Support compression and decompression of messages using [Snappy].compression-zstd
(default): Support compression and decompression of messages using [zstd].full
: Includes all stable features (compression-gzip
, compression-lz4
, compression-snappy
,
compression-zstd
, transport-socks5
, transport-tls
).transport-socks5
: Allow transport via SOCKS5 proxy.To run integration tests against [Redpanda], run:
console
$ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:
console
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
in another session.
To run integration tests against [Apache Kafka], run:
console
$ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:
console
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.
Licensed under either of these:
Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.
independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest