Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for data management and analysis, deployable on the edge and the cloud. Due to its light-weight architecture, high performance and rich feature set together with its deep integration with Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, high-speed data ingestion and complex data analysis in the IoT industrial fields.
This is the Rust client of Apache IoTDB.
Apache IoTDB website: https://iotdb.apache.org Apache IoTDB Github: https://github.com/apache/iotdb
Put this in your Cargo.toml
:
toml
[dependencies]
iotdb-client-rs="0.1.0"
chrono="0.4.19"
prettytable-rs="0.8.0"
```rust use std::error::Error; use std::vec;
use chrono;
use chrono::Local; use iotdbclientrs::client::remote::{Config, RpcSession}; use iotdbclientrs::client::{DataSet, MeasurementSchema, Session, Tablet, Value}; use iotdbclientrs::protocal::{TSCompressionType, TSDataType, TSEncoding}; use prettytable::{cell, Cell, Row, Table};
fn print_dataset(dataset: Box
let mut title_cells: Vec<Cell> = Vec::new();
let is_ignore_timestamp = dataset.is_ignore_timestamp();
if !is_ignore_timestamp {
title_cells.push(cell!("Time"));
}
dataset
.get_column_names()
.iter()
.for_each(|name| title_cells.push(cell!(name)));
table.set_titles(Row::new(title_cells));
dataset.for_each(|record| {
let mut row_cells: Vec<Cell> = Vec::new();
if !is_ignore_timestamp {
row_cells.push(cell!(record.timestamp.to_string()));
}
record
.values
.iter()
.for_each(|v| row_cells.push(cell!(v.to_string())));
table.add_row(Row::new(row_cells));
});
table.printstd();
Ok(())
}
fn main() -> Result<(), Box
//rpc session
let session = RpcSession::new(&config)?;
run_example(session)?;
//Local filesystem session
// let session = DirectSession::new("/data/apache-iotdb-0.12.3-server-bin");
// run_example(session)?;
Ok(())
}
fn run_example
let tz = session.get_time_zone()?;
if tz != "Asia/Shanghai" {
session.set_time_zone("Asia/Shanghai")?;
}
session.set_storage_group("root.ln1")?;
session.delete_storage_group("root.ln1")?;
session.set_storage_group("root.ln1")?;
session.set_storage_group("root.ln2")?;
session.delete_storage_groups(vec!["root.ln1", "root.ln2"])?;
//create_timeseries
session.create_timeseries(
"root.sg1.dev2.status",
TSDataType::Float,
TSEncoding::Plain,
TSCompressionType::SNAPPY,
None,
None,
None,
None,
)?;
session.delete_timeseries(vec!["root.sg1.dev2.status"])?;
//insert_record
session.insert_record(
"root.sg1.dev5",
vec!["online", "desc"],
vec![Value::Bool(false), Value::Text("F4145".to_string())],
Local::now().timestamp_millis(),
false,
)?;
session.delete_timeseries(vec!["root.sg1.dev5.online", "root.sg1.dev5.desc"])?;
//insert_records
session.insert_records(
vec!["root.sg1.dev1"],
vec![vec![
"restart_count",
"tick_count",
"price",
"temperature",
"description",
"status",
]],
vec![vec![
Value::Int32(1),
Value::Int64(2018),
Value::Double(1988.1),
Value::Float(12.1),
Value::Text("Test Device 1".to_string()),
Value::Bool(false),
]],
vec![Local::now().timestamp_millis()],
)?;
session.delete_timeseries(vec![
"root.sg1.dev1.restart_count",
"root.sg1.dev1.tick_count",
"root.sg1.dev1.price",
"root.sg1.dev1.temperature",
"root.sg1.dev1.description",
"root.sg1.dev1.status",
])?;
//create_multi_timeseries
session.create_multi_timeseries(
vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"],
vec![TSDataType::Float, TSDataType::Text],
vec![TSEncoding::Plain, TSEncoding::Plain],
vec![TSCompressionType::SNAPPY, TSCompressionType::SNAPPY],
None,
None,
None,
None,
)?;
session.delete_timeseries(vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"])?;
//delete_timeseries
session.insert_string_record(
"root.ln.wf02.wt02",
vec!["id", "location"],
vec!["SN:001", "Beijing"],
Local::now().timestamp_millis(),
false,
)?;
session.delete_timeseries(vec!["root.ln.wf02.wt02.id", "root.ln.wf02.wt02.location"])?;
//insert_records_of_one_device
session.insert_records_of_one_device(
"root.sg1.dev0",
vec![
Local::now().timestamp_millis(),
Local::now().timestamp_millis() + 1,
],
vec![
vec!["restart_count", "tick_count", "price"],
vec!["temperature", "description", "status"],
],
vec![
vec![Value::Int32(1), Value::Int64(2018), Value::Double(1988.1)],
vec![
Value::Float(12.1),
Value::Text("Test Device 1".to_string()),
Value::Bool(false),
],
],
false,
)?;
//tablet
let mut ts = Local::now().timestamp_millis();
let tablet1 = create_tablet(5, ts);
ts += 5;
let tablet2 = create_tablet(10, ts);
ts += 10;
let tablet3 = create_tablet(2, ts);
session.insert_tablet(&tablet1, true)?;
session.insert_tablets(vec![&tablet2, &tablet3], true)?;
//delete_data
session.insert_records_of_one_device(
"root.sg1.dev1",
vec![1, 16],
vec![vec!["status"], vec!["status"]],
vec![vec![Value::Bool(true)], vec![Value::Bool(true)]],
true,
)?;
session.delete_data(vec!["root.sg1.dev1.status"], 1, 16)?;
let dataset = session.execute_query_statement("select * from root.ln.device2", None)?;
print_dataset(dataset)?;
// dataset.for_each(|r| println!("timestamp: {} {:?}", r.timestamp, r.values));
// let timestamps: Vec<i64> = dataset.map(|r| r.timestamp).collect();
// let count = dataset.count();
let ds = session.execute_statement("show timeseries", None)?;
print_dataset(ds)?;
session.execute_batch_statement(vec![
"insert into root.sg1.dev6(time,s5) values(1,true)",
"insert into root.sg1.dev6(time,s5) values(2,true)",
"insert into root.sg1.dev6(time,s5) values(3,true)",
])?;
let ds = session.execute_raw_data_query(
vec![
"root.ln.device2.restart_count",
"root.ln.device2.tick_count",
"root.ln.device2.description",
],
0,
i64::MAX,
)?;
print_dataset(ds)?;
if let Some(dataset) = session.execute_update_statement("delete timeseries root.sg1.dev1.*")? {
print_dataset(dataset)?;
}
session.close()?;
Ok(())
}
fn createtablet(rowcount: i32, starttimestamp: i64) -> Tablet { let mut tablet = Tablet::new( "root.ln.device2", vec![ MeasurementSchema::new( String::from("status"), TSDataType::Boolean, TSEncoding::Plain, TSCompressionType::SNAPPY, None, ), MeasurementSchema::new( String::from("restartcount"), TSDataType::Int32, TSEncoding::RLE, TSCompressionType::SNAPPY, None, ), MeasurementSchema::new( String::from("tickcount"), TSDataType::Int64, TSEncoding::RLE, TSCompressionType::SNAPPY, None, ), MeasurementSchema::new( String::from("temperature"), TSDataType::Float, TSEncoding::Plain, TSCompressionType::SNAPPY, None, ), MeasurementSchema::new( String::from("price"), TSDataType::Double, TSEncoding::Gorilla, TSCompressionType::SNAPPY, None, ), MeasurementSchema::new( String::from("description"), TSDataType::Text, TSEncoding::Plain, TSCompressionType::SNAPPY, None, ), ], ); (0..rowcount).foreach(|row| { let ts = starttimestamp + row as i64; tablet.addrow( vec![ Value::Bool(ts % 2 == 0), Value::Int32(row), Value::Int64(row as i64), Value::Float(row as f32 + 0.1), Value::Double(row as f64 + 0.2), Value::Text(format!("ts: {}", ts).tostring()), ], ts, ); }); tablet } ```