Kafka Rust Client Documentation
The documentation includes some examples too.
This crate works with Cargo and is on crates.io. I will be updating the package frequently till we move out of pre-release. So add this to your Cargo.toml
(instead of a specific version):
toml
[dependencies]
kafka = "*"
[Load Metadata] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.loadmetadataall)
rust
extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
// OR
// client.load_metadata(&vec!("my-topic".to_string())); // Loads metadata for vector of topics
}
[For one topic] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetchtopicoffset)
rust
extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
let offsets = client.fetch_topic_offset(&"my-topic".to_string());
}
[For multiple topics] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_offsets)
rust
extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
let topics = client.topic_partitions.keys().cloned().collect();
let offsets = client.fetch_offsets(topics);
}
[Single Message] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.send_message)
rust
extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
client.send_message(
1, // Required Acks
0, // Timeout
&"my-topic".to_string(), // Topic
0, // Partition
&"b".to_string().into_bytes() // Message
)
}
[Multiple Messages] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.send_messages)
rust
extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
let m1 = "a".to_string().into_bytes();
let m2 = "b".to_string().into_bytes();
let req = vec!(utils::ProduceMessage{topic: "my-topic".to_string(), message: m1},
utils::ProduceMessage{topic: "my-topic-2".to_string(), message: m2});
client.send_messages(1, 100, req); // required acks, timeout, messages
}
[Single (topic, partition, offset)] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_messages)
rust
extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
// Topic, partition, offset
let msgs = client.fetch_messages(&"my-topic".to_string(), 0, 0);
}
[Multiple (topic, partition, offset)] (http://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetchmessagesmulti)
rust
extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
let mut client = KafkaClient::new(&vec!("localhost:9092".to_string()));
client.load_metadata_all();
let msgs = client.fetch_messages_multi(vec!(utils::TopicPartitionOffset{
topic: "my-topic".to_string(),
partition: 0,
offset: 0
},
utils::TopicPartitionOffset{
topic: "my-topic-2".to_string(),
partition: 0,
offset: 0
})));
}