Ballista: Distributed Scheduler for Apache Arrow DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:

Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.

Rust Version Compatbility

This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.

Starting a cluster

There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the DataFusion User Guide

A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.

bash cargo install ballista-scheduler cargo install ballista-executor

With these crates installed, it is now possible to start a scheduler process.

bash RUST_LOG=info ballista-scheduler

The scheduler will bind to port 50050 by default.

Next, start an executor processes in a new terminal session with the specified concurrency level.

bash RUST_LOG=info ballista-executor -c 4

The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:

bash RUST_LOG=info ballista-executor --bind-port 50052 -c 4

Executing a query

Ballista provides a BallistaContext as a starting point for creating queries. DataFrames can be created by invoking the read_csv, read_parquet, and sql methods.

The following example runs a simple aggregate SQL query against a CSV file from the New York Taxi and Limousine Commission data set.

```rust,no_run use ballista::prelude::*; use datafusion::arrow::util::pretty; use datafusion::prelude::CsvReadOptions;

[tokio::main]

async fn main() -> Result<()> { // create configuration let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?;

// connect to Ballista scheduler let ctx = BallistaContext::remote("localhost", 50050, &config);

// register csv file with the execution context ctx.registercsv( "tripdata", "/path/to/yellowtripdata_2020-01.csv", CsvReadOptions::new(), ).await?;

// execute the query let df = ctx.sql( "SELECT passengercount, MIN(fareamount), MAX(fareamount), AVG(fareamount), SUM(fareamount) FROM tripdata GROUP BY passengercount ORDER BY passenger_count", ).await?;

// collect the results and print them to stdout let results = df.collect().await?; pretty::print_batches(&results)?; Ok(()) } ```