Dataflow

image

CI Status Current Crates.io Version Documentation

Dataflow is a data processing library, primarily for machine learning. It provides efficient pipeline primitives to build a directed acyclic dataflow graph, and a dataloader to run the graph in a seperate thread. It also provides common tokenizers and batching tools to work with textual data.

Usage

To build a pipeline, first start with a loader Node: ```rust use dataflow::pipeline::RandomLoader;

fn main() { let pipeline = RandomLoader::new(vec!["file1.txt".tostring(), "file2.txt".tostring()]); } The RandomLoader by default loads individual lines randomly from files. Next add a transformation to it with the map() function: rust let pipeline = RandomLoader::new(vec!["file1.txt".tostring(), "file2.txt".tostring()]) .map(|line| format!("Hello {}", line)); `` This creates a new Node, which processes the data one sample at a time. Important node: **All functions and closures are also Nodes!** This means that whenever we want to add a node, we could simple use a function. In this case, the closure takes in a single datapoint and outputs a single datapoint. If we want to do batch processing, we can use.node()` which takes a Node that can process a batch at a time.

Now we've added "Hello " to every line, let's create a tokenizer and put it in our pipeline: ```rust // Our tokenizer let tokenizer = dataflow::tokenization::WordpieceTokenizer::load();

// Our pipeline let pipeline = RandomLoader::new(vec!["file1.txt".tostring(), "file2.txt".tostring()]) .map(|lines| format!("Hello {}", line)) .node(tokenizer); // This will tokenize the strings in batches

Great! Now our data gets efficiently tokenized in batches. Right now, we will get single tokenized sentences out of the pipeline one at a time. But what if we wanted to get batches out? Let's use a Batch node: rust use dataflow::pipeline::Stateful;

// Our tokenizer let tokenizer = dataflow::tokenization::WordpieceTokenizer::load();

// Our pipeline let pipeline = RandomLoader::new(vec!["file1.txt".tostring(), "file2.txt".tostring()]) .map(|lines| format!("Hello {}", line)) .node(tokenizer) // This will tokenize the strings in batches .node(Batch::new(64)); // We'll use 64 as the batch size ``` That's it! We'll now get batches of 64 tokenized sentences.

Loader Nodes

So far it seems we've only used two types of Nodes, Stateless and Stateful (Stateless was generated when we used .add_fn(), and a Batch node is Stateless). Actually we used three, because RandomLoader is a Node as well! It takes as input Vec<()>, which is what the pipeline will start with, and produces data (Vec) to send through the pipeline.

Custom Nodes

In fact, you can implement your own Nodes as well, by implementing the Node trait! Just implement fn process(Vec<Input>) -> Vec<Output> in the trait, and optionally fn reset(&mut) which gets called at the beginning of an epoch, and fn data_remaining(&self) -> usize which should return how much data remains availiable to the node (the number of lines we haven't loaded yet for RandomLoader, or usize::MAX for a non-loader Node) and you have your own Node to integrate into the pipeline!

Dataloader

Since we built this cool pipeline, what can we do with it? Well for starters, we could simply call process() and feed in some data, but let's do something cooler. Let's put it in a Dataloader and use it in an ML training loop: ```rust // Make the dataloader let mut dataloader = dataflow::dataloader::Dataloader(pipeline);

// Training loop for example in &mut dataloader { // Now example is a vector of tokenized strings! // Do with them what you please... } ```

To Do: - [ ] Spin out all NLP related stuff into a dataflow-nlp crate. - [ ] Make dataloader use a multiqueue instead of draining all examples into buffer on main thread - [ ] Make auto-parallel pipeline Node using rayon - [ ] Add async ability and remote sources. - [ ] Simplify the type magic used to make functions-as-closures work. Allow implementation of Node directly. Remove duplicate impls of Node and ExplicitNode

Codebase Visualization

Visualization of the codebase