Implementation of the union! macro.

union!

union! - one macro to rule them all. Provides useful shortcut combinators, combines sync/async chains, transforms tuple of results in result of tuple, supports single and multi thread (sync/async) step by step execution of branches.

Using this macro you could do things like

```rust

![recursion_limit="1024"]

extern crate futures; extern crate tokio; extern crate union; extern crate reqwest; extern crate failure;

use union::{unionasync, union}; use futures::stream::{iter, Stream}; use reqwest::Client; use futures::future::{tryjoinall, ok, ready}; use failure::{formaterr, Error};

fn geturlstocalculatelink_count() -> impl Stream { iter( vec![ "https://en.wikipedia.org/w/api.php?format=json&action=query&generator=random&grnnamespace=0&prop=revisions|images&rvprop=content&grnlimit=100", "https://github.com/explore", "https://twitter.com/search?f=tweets&vertical=news&q=%23news&src=unkn" ] )
}

fn geturltogetrandomnumber() -> String { "https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new".toowned() }

async fn readnumberfrom_stdin() -> Result { use tokio::*; use futures::stream::StreamExt;

let map_parse_error =
    |value|
        move |error|
            format_err!("Value from stdin isn't a correct `usize`: {:?}, input: {}", error, value);

let mut result;
let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new());

while {
    println!("Please, enter number (`usize`)");

    let next = reader.next();

    result = union_async! {
        next
            |> |value| value.ok_or(format_err!("Unexpected end of input"))
            => |result| ready(result.map_err(|err| format_err!("Failed to apply codec: {:?}", err)))
            => |value|
                ready(
                    value
                        .parse()
                        .map_err(map_parse_error(value))
                )
            !> |error| { eprintln!("Error: {:#?}", error); error}
    }.await;

    result.is_err()
} {}

result

}

fn main() { let rt = ::tokio::runtime::Runtime::new().unwrap();

rt.block_on(async {

    println!(
        "{} {}\n{}",
        "Hello.\nThis's is the game where winner is player, which abs(value) is closest to",
        "the max count of links (starting with `https://`) found on one of random pages.",
        "You play against random generator (0-500)."
    );

    enum GameResult {
        Won,
        Lost,
        Draw
    }

    let client = Client::new();

    let task = union_async! {
        // This programs makes requests to several sites
        // and calculates count of links starting from `https://`
        get_urls_to_calculate_link_count()
            |> {
                // If pass block statement instead of fn, it will be placed before current step,
                // so it will us allow to capture some variables from context
                let ref client = client;
                move |url| {
                    let client = client.clone();
                    async move {
                        client
                            .get(url)
                            .send()
                            .and_then(|value| value.text())
                            .await
                            .and_then(|body| Ok((url, body)))
                    }
                }
            }
            >.collect::<Vec<_>>()
            |> Ok
            => try_join_all
            !> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
            => |results|
                ok(
                    results
                        .into_iter()
                        .map(|(url, body)| (url.to_owned(), body.matches("https://").collect::<Vec<_>>().len()))
                        .max_by_key(|(_, link_count)| link_count.clone())
                        .unwrap()
                )
            // It waits for input in stdin before log max links count
            ~?> |result| {
                result
                    .as_ref()
                    .map(
                        |(url, count)|
                            println!("Max `https://` link count found on `{}`: {}", url, count)
                    )
                    .unwrap_or(());
            },
        // In parallel it makes request to the site which generates random number
        get_url_to_get_random_number()
            -> ok
            => {
                // If pass block statement instead of fn, it will be placed before current step,
                // so it will allow us to capture some variables from context
                let client = client.clone();
                let map_parse_error =
                    |value|
                        move |err|
                            format_err!("Failed to parse random number: {:#?}, value: {}", err, value);
                move |url| {
                    union_async! {
                        client
                            .get(&url)
                            .send()
                            => |value| value.text()
                            !> |err| format_err!("Error retrieving random number: {:#?}", err)
                            => |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
                            => |value|  
                                ready(
                                    value
                                        .parse::<usize>()
                                        .map_err(map_parse_error(value))
                                )
                    }
                }
            }
            // It waits for input in stdin before log random value
            ~?> |random| {
                random
                    .as_ref()
                    .map(|number| println!("Random: {:?}", number))
                    .unwrap_or(());
            },
        // In parallel it reads value from stdin
        read_number_from_stdin(),
        // Finally, when we will have all results, we can decide, who is winner
        map => |(_url, link_count), random_number, number_from_stdin| {
            let random_diff = (link_count as i32 - random_number as i32).abs();
            let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
            match () {
                _ if random_diff > stdin_diff => GameResult::Won,
                _ if random_diff < stdin_diff => GameResult::Lost,
                _ => GameResult::Draw
            }
        }    
    };

    // Use sync union because we don't need parallel execution and sync map
    // is more convenient
    let _ = union! {
        task
            .await
            |> |result|
                println!(
                    "You {}",
                    match result {
                        GameResult::Won => "won!",
                        GameResult::Lost => "lose...",
                        _ => "have the same result as random generator!"
                    }
                )
    }.unwrap();     

});

} ```

Combinators

where value is the previous value.

Every combinator prefixed by ~ will act as deferred action (all actions will wait until completion in every step and only after move to the next one).

Handler

might be one of

or not specified - then Result<(result0, result1, ..), Error> or Option<(result0, result1, ..)> will be returned.

Single thread combinations

Simple results combination

Converts input in series of chained results and joins them step by step.

```rust extern crate union;

use std::error::Error; use union::union;

type Result = std::result::Result>;

fn action_1() -> Result { Ok(1) }

fn action_2() -> Result { Ok(2) }

fn main() { let sum = union! { action1(), action2().map(|v| v as u16), action2().map(|v| v as u16 + 1).andthen(|v| Ok(v * 4)), action1().andthen(|_| Err("5".into())).or(Ok(2)), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum");

println!("Calculated: {}", sum);

} ```

Futures combination

Each branch will represent chain of tasks. All branches will be joined using join! macro and macro will return unpolled future.

```rust

![recursion_limit="256"]

extern crate union; extern crate futures; extern crate tokio;

use std::error::Error; use union::union_async; use futures::future::{ok, err};

type Result = std::result::Result>;

async fn action1() -> Result { Ok(1) } async fn action2() -> Result { Ok(2) }

[tokio::main]

async fn main() { let sum = unionasync! { action1(), action2().andthen(|v| ok(v as u16)), action2().map(|v| v.map(|v| v as u16 + 1)).andthen(|v| ok(v * 4u16)), action1().andthen(|| err("5".into())).orelse(|| ok(2u16)), andthen => |a, b, c, d| ok(a + b + c + d) }.await.expect("Failed to calculate sum");

println!("Calculated: {}", sum);

} ```

Multi-thread combinations

To execute several tasks in parallel you could use union_spawn! (spawnion!) for sync tasks and union_async_spawn! (async_spawn!) for futures. Since union_async already provides parallel futures execution in one thread, union_async_spawn! spawns every branch into tokio executor so they will be evaluated in multi-threaded executor.

Multi-thread sync branches

union_spawn spawns one ::std::thread per each step of each branch (number of branches is the max thread count at the time).

```rust extern crate union;

use std::error::Error; use union::union_spawn;

type Result = std::result::Result>;

fn action_1() -> Result { Ok(1) }

fn action_2() -> Result { Ok(2) }

fn main() { // Branches will be executed in parallel let sum = unionspawn! { action1(), action2().map(|v| v as usize), action2().map(|v| v as usize + 1).andthen(|v| Ok(v * 4)), action1().andthen(|| Err("5".into())).or(Ok(2)), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum");

println!("Calculated: {}", sum);

} ```

union_async_spawn! uses ::tokio::spawn function to spawn tasks so it should be done inside tokio runtime (number of branches is the max count of tokio tasks at the time).

Multi-thread futures

```rust

![recursion_limit="256"]

extern crate union; extern crate futures; extern crate tokio;

use std::error::Error; use union::unionasyncspawn; use futures::future::{ok, err};

type Result = std::result::Result>;

async fn action_1() -> Result { Ok(1) }

async fn action_2() -> Result { Ok(2) }

[tokio::main]

async fn main() { let sum = unionasyncspawn! { action1(), action2().andthen(|v| ok(v as u16)), action2().map(|v| v.map(|v| v as u16 + 1)).andthen(|v| ok(v * 4u16)), action1().andthen(|| err("5".into())).orelse(|| ok(2u16)), and_then => |a, b, c, d| ok(a + b + c + d) }.await.expect("Failed to calculate sum");

println!("Calculated: {}", sum);

} ```

Using combinators we can rewrite first example like

```rust extern crate union;

use std::error::Error; use union::union;

type Result = std::result::Result>;

fn action_1() -> Result { Ok(1) }

fn action_2() -> Result { Ok(2) }

fn main() { let sum = union! { action1(), action2() |> |v| v as u16, action2() |> |v| v as u16 + 1 => |v| Ok(v * 4), action1() => |_| Err("5".into()) <| Ok(2), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum");

println!("Calculated: {}", sum);

} ```

By separating chain in actions, you will make actions wait for completion of all of them in current step before go to the next step.

```rust

![recursion_limit="256"]

extern crate union;

use std::error::Error; use union::union;

type Result = std::result::Result>;

fn action_1() -> Result { Ok(1) }

fn action_2() -> Result { Ok(2) }

fn main() { let sum = union! { action1(), let result1 = action2() ~|> |v| v as u16 + 1, action2() ~|> { let result1 = result1.asref().ok().map(Clone::clone); move |v| { // result_1 now is the result of action_2() [Ok(1u8)] if result1.issome() { v as u16 + 1 } else { unreachable!() } } } ~=> { let result1 = result1.asref().ok().map(Clone::clone); move |v| { // result_1 now is the result of |v| v as u16 + 1 [Ok(2u16)] if let Some(result1) = result1 { Ok(v * 4 + result1) } else { unreachable!() } } }, action1() ~=> |_| Err("5".into()) <| Ok(2), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum"); println!("Calculated: {}", sum); } ```