union!
macro.union!
union!
- one macro to rule them all. Provides useful shortcut combinators, combines sync/async chains, transforms tuple of results in result of tuple, supports single and multi thread (sync/async) step by step execution of branches.
Using this macro you can write things like
```rust
use rand::prelude::*; use std::sync::Arc; use union::{union_spawn};
fn generaterandomvec(size: usize) -> Vec
fn is_even(value: &usize) -> bool { value % 2 == 0 }
fn get_sqrt(value: usize) -> usize { { value as f64 }.sqrt() as usize }
fn power2
// Problem: generate vecs filled by random numbers in parallel, make some operations on them in parallel, // find max of each vec in parallel and find final max of 3 vecs
// Solution: fn main() { // Branches will be executed in parallel, each in its own thread let max = unionspawn! { let branch0 = generaterandomvec(1000).into_iter() // Multiply every element by himself |> power2
.filter(iseven).collect::
>() // To share data with branch 1 -> Arc::new // Extract raw vec after sharing ~-> |v| unsafe { &*Arc::into raw(v) } // Find max value .intoiter().max() // Clone value to make &usize => usize |> Clone::clone, let branch1 = generaterandomvec(10000) .intoiter() // Extract sqrt from every element |> getsqrt // Add index in order to compare with the values of branch 0 .enumerate() ~|> { // Get data from branch 0 by cloning arc let branch0 = branch0.clone(); let len = branch0.len(); // Compare every element of branch 1 with element of branch 0 // with the same index and take min move |(index, value)| if index < len && value > branch0[index] { branch0[index] } else { value } } .max(), let branch2 = generaterandomvec(100000).intoiter().max(), map => |max0, max1, max2| { // Find final max *[max0, max1, max2].intoiter().max().unwrap() } }.unwrap(); println!("Max: {}", max); } ```
And like this
```rust no_run
use union::unionasync; use futures::stream::{iter, Stream}; use reqwest::Client; use futures::future::{tryjoinall, ok, ready}; use failure::{formaterr, Error};
fn geturlstocalculatelink_count() -> impl Stream
}
fn geturltogetrandom_number() -> &'static str { "https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new" }
async fn readnumberfrom_stdin() -> Result
let map_parse_error =
|value|
move |error|
format_err!("Value from stdin isn't a correct `u16`: {:?}, input: {}", error, value);
let mut result;
let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new());
while {
println!("Please, enter number (`u16`)");
let next = reader.next();
result = union_async! {
next
|> |value| value.ok_or(format_err!("Unexpected end of input"))
=> |result| ready(result.map_err(|err| format_err!("Failed to apply codec: {:?}", err)))
=> |value|
ready(
value
.parse()
.map_err(map_parse_error(value))
)
!> |error| { eprintln!("Error: {:#?}", error); error}
}.await;
result.is_err()
} {}
result
}
async fn main() {
println!(
"{} {}\n{}",
"Hello.\nThis's is the game where winner is player, which abs(value) is closest to",
"the max count of links (starting with https://
) found on one of random pages.",
"You play against random generator (0-500)."
);
enum GameResult {
Won,
Lost,
Draw
}
let client = Client::new();
let game = union_async! {
// Make requests to several sites
// and calculate count of links starting from `https://`
get_urls_to_calculate_link_count()
|> {
// If pass block statement instead of fn, it will be placed before current step,
// so it will us allow to capture some variables from context
let ref client = client;
move |url|
// `union_async!` wraps its content into `async move { }`
union_async! {
client
.get(url).send()
=> |value| value.text()
=> |body| ok((url, body))
}
}
>.collect::<Vec<_>>()
|> Ok
=> try_join_all
!> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
=> |results|
ok(
results
.into_iter()
.map(|(url, body)| (url, body.matches("https://").collect::<Vec<_>>().len()))
.max_by_key(|(_, link_count)| link_count.clone())
.unwrap()
)
// It waits for input in stdin before log max links count
~?> |result| {
result
.as_ref()
.map(
|(url, count)| {
let split = url.to_owned().split('/').collect::<Vec<_>>();
let domain_name = split.get(2).unwrap_or(&url);
println!("Max `https://` link count found on `{}`: {}", domain_name, count)
}
)
.unwrap_or(());
},
// In parallel it makes request to the site which generates random number
get_url_to_get_random_number()
-> ok
=> {
// If pass block statement instead of fn, it will be placed before current step,
// so it will allow us to capture some variables from context
let ref client = client;
let map_parse_error =
|value|
move |err|
format_err!("Failed to parse random number: {:#?}, value: {}", err, value);
move |url|
union_async! {
client
.get(url)
.send()
=> |value| value.text()
!> |err| format_err!("Error retrieving random number: {:#?}", err)
=> |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
=> |value|
ready(
value
.parse::<u16>()
.map_err(map_parse_error(value))
)
}
}
// It waits for input in stdin before log random value
~?> |random| {
random
.as_ref()
.map(|number| println!("Random: {}", number))
.unwrap_or(());
},
// In parallel it reads value from stdin
read_number_from_stdin(),
// Finally, when we will have all results, we can decide, who is winner
map => |(_url, link_count), random_number, number_from_stdin| {
let random_diff = (link_count as i32 - random_number as i32).abs();
let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
match () {
_ if random_diff > stdin_diff => GameResult::Won,
_ if random_diff < stdin_diff => GameResult::Lost,
_ => GameResult::Draw
}
}
};
let _ = game.await.map(
|result|
println!(
"You {}",
match result {
GameResult::Won => "won!",
GameResult::Lost => "lose...",
_ => "have the same result as random generator!"
}
)
).unwrap();
} ```
Map: |>
expr - value
.map(expr
)
AndThen: =>
expr - value
.and_then(expr
),
Then: ->
expr - expr
(value
)
Dot: >.
expr - value
.expr
Or: <|
expr - value
.or(expr
)
OrElse: <=
expr - value
.or_else(expr
)
MapErr: !>
expr - value
.map_err(expr
)
Inspect: ?>
expr - (|value
| { expr
(&value
); value
})(value
) for sync chains and (|value
| value
.inspect(expr
))(value
) for futures
where value
is the previous value.
Every combinator prefixed by ~
will act as deferred action (all actions will wait until completion in every step and only after move to the next one).
might be one of
map
=> will act as results.map(|(result0, result1, ..)| handler(result0, result1, ..))
and_then
=> will act as results.and_then(|(result0, result1, ..)| handler(result0, result1, ..))
then
=> will act as handler(result0, result1, ..)
or not specified - then Result<(result0, result1, ..), Error> or Option<(result0, result1, ..)> will be returned.
You can specify custom path (futures_crate_path
) at the beginning of macro call
```rust use union::union_async; use futures::future::ok;
async fn main() { let value = unionasync! { futurescrate_path(::futures) ok::<_,u8>(2u16) }.await.unwrap();
println!("{}", value);
} ```
Converts input in series of chained results and joins them step by step.
```rust
use std::error::Error; use union::union;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() { let sum = union! { action1(), action2().map(|v| v as u16), action2().map(|v| v as u16 + 1).andthen(|v| Ok(v * 4)), action1().andthen(|_| Err("5".into())).or(Ok(2)), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
Each branch will represent chain of tasks. All branches will be joined using ::futures::join!
macro and union_async!
will return unpolled
future.
```rust
use std::error::Error; use union::union_async; use futures::future::{ok, err};
type Result
async fn action1() -> Result
async fn main() { let sum = unionasync! { action1(), action2().andthen(|v| ok(v as u16)), action2().map(|v| v.map(|v| v as u16 + 1)).andthen(|v| ok(v * 4u16)), action1().andthen(|| err("5".into())).orelse(|| ok(2u16)), andthen => |a, b, c, d| ok(a + b + c + d) }.await.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
To execute several tasks in parallel you could use union_spawn!
(spawn!
) for sync tasks
and union_async_spawn!
(async_spawn!
) for futures. Since union_async
already provides parallel futures execution in one thread, union_async_spawn!
spawns every branch into tokio
executor so they will be evaluated in multi-threaded executor.
union_spawn
spawns one ::std::thread
per each step of each branch (number of branches is the max thread count at the time).
```rust
use std::error::Error; use union::union_spawn;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() { // Branches will be executed in parallel let sum = unionspawn! { action1(), action2().map(|v| v as usize), action2().map(|v| v as usize + 1).andthen(|v| Ok(v * 4)), action1().andthen(|| Err("5".into())).or(Ok(2)), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
union_async_spawn!
uses ::tokio::spawn
function to spawn tasks so it should be done inside tokio
runtime
(number of branches is the max count of tokio
tasks at the time).
```rust
use std::error::Error; use union::unionasyncspawn; use futures::future::{ok, err};
type Result
async fn action_1() -> Result
async fn action_2() -> Result
async fn main() { let sum = unionasyncspawn! { action1(), action2().andthen(|v| ok(v as u16)), action2().map(|v| v.map(|v| v as u16 + 1)).andthen(|v| ok(v * 4u16)), action1().andthen(|| err("5".into())).orelse(|| ok(2u16)), and_then => |a, b, c, d| ok(a + b + c + d) }.await.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
Using combinators we can rewrite first sync example like
```rust
use std::error::Error; use union::union;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() { let sum = union! { action1(), action2() |> |v| v as u16, action2() |> |v| v as u16 + 1 => |v| Ok(v * 4), action1() => |_| Err("5".into()) <| Ok(2), map => |a, b, c, d| a + b + c + d }.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
By separating chain in actions, you will make actions wait for completion of all of them in current step before go to the next step.
```rust
use std::error::Error; use union::union;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() {
let sum = union! {
action1(),
let result1 = action2() ~|> |v| v as u16 + 1,
action2() ~|> {
let result1 = result1.asref().ok().map(Clone::clone);
move |v| {
// result_1
now is the result of action_2()
[Ok(1u8)]
if result1.issome() {
v as u16 + 1
} else {
unreachable!()
}
}
} ~=> {
let result1 = result1.asref().ok().map(Clone::clone);
move |v| {
// result_1
now is the result of |v| v as u16 + 1
[Ok(2u16)]
if let Some(result1) = result1 {
Ok(v * 4 + result1)
} else {
unreachable!()
}
}
},
action1() ~=> |_| Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
}
```