Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the DataFusion User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
bash
cargo install --locked ballista-scheduler
cargo install --locked ballista-executor
With these crates installed, it is now possible to start a scheduler process.
bash
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050 by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
bash
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:
bash
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
To build a simple ballista example, add the following dependencies to your Cargo.toml
file:
toml
[dependencies]
ballista = "0.6"
datafusion = "7.0"
tokio = "1.0"
The following example runs a simple aggregate SQL query against a CSV file from the New York Taxi and Limousine Commission data set.
```rust,no_run use ballista::prelude::*; use datafusion::arrow::util::pretty; use datafusion::prelude::CsvReadOptions;
async fn main() -> Result<()> { // create configuration let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?;
// connect to Ballista scheduler let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
// register csv file with the execution context ctx.registercsv( "tripdata", "/path/to/yellowtripdata_2020-01.csv", CsvReadOptions::new(), ).await?;
// execute the query let df = ctx.sql( "SELECT passengercount, MIN(fareamount), MAX(fareamount), AVG(fareamount), SUM(fareamount) FROM tripdata GROUP BY passengercount ORDER BY passenger_count", ).await?;
// collect the results and print them to stdout let results = df.collect().await?; pretty::print_batches(&results)?; Ok(()) } ```
More examples can be found in the arrow-datafusion repository.