Ballista: Distributed Scheduler for Apache Arrow DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:

Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.

Rust Version Compatibility

This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.

Starting a cluster

There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the DataFusion User Guide

A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.

bash cargo install --locked ballista-scheduler cargo install --locked ballista-executor

With these crates installed, it is now possible to start a scheduler process.

bash RUST_LOG=info ballista-scheduler

The scheduler will bind to port 50050 by default.

Next, start an executor processes in a new terminal session with the specified concurrency level.

bash RUST_LOG=info ballista-executor -c 4

The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:

bash RUST_LOG=info ballista-executor --bind-port 50052 -c 4

Executing a query

Ballista provides a BallistaContext as a starting point for creating queries. DataFrames can be created by invoking the read_csv, read_parquet, and sql methods.

To build a simple ballista example, add the following dependencies to your Cargo.toml file:

toml [dependencies] ballista = "0.8" datafusion = "12.0.0" tokio = "1.0"

The following example runs a simple aggregate SQL query against a Parquet file (yellow_tripdata_2022-01.parquet) from the New York Taxi and Limousine Commission data set. Download the file and add it to the testdata folder before running the example.

```rust,no_run use ballista::prelude::*; use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions}; use datafusion::arrow::util::pretty; use datafusion::prelude::CsvReadOptions;

[tokio::main]

async fn main() -> Result<()> { // create configuration let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?;

// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

let filename = "testdata/yellow_tripdata_2022-01.parquet";

// define the query using the DataFrame trait
let df = ctx
    .read_parquet(filename, ParquetReadOptions::default())
    .await?
    .select_columns(&["passenger_count", "fare_amount"])?
    .aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])?
    .sort(vec![col("passenger_count").sort(true,true)])?;

// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
//     FROM tripdata
//     GROUP BY passenger_count
//     ORDER BY passenger_count

// print the results
df.show().await?;

Ok(())

} ```

The output should look similar to the following table.

{r eval=FALSE} +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ | passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) | +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ | | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 | | 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 | | 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 | | 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 | | 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 | | 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 | | 5 | -52 | 668 | 12.160378472086954 | 624289.51 | | 6 | -52 | 252.5 | 12.576583325529857 | 402916 | | 7 | 7 | 79 | 61.77777777777778 | 556 | | 8 | 8.3 | 115 | 79.9125 | 639.3 | | 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 | +-----------------+--------------------------+--------------------------+--------------------------+--------------------------+

More examples can be found in the arrow-datafusion repository.