join!
macro.join!
Macros which provide useful shortcut combinators, combine sync/async chains, support single and multi thread (sync/async) step by step execution of branches, transform tuple of results to result of tuple.
join!
macros will just return final values. Use it if you are working with iterators/streams etc.try_join!
macros will transpose tuple of Option
s/Result
s in Option
/Result
of tuple. Use it when you are dealing with options or results. If one of branches produces None
/Err
at the end of step, next steps execution will be aborted. In case of async
macro you can only provide Result
s because ::futures::try_join
doesn't support Option
s.Use these docs for development, they are more convenient.
Box::pin
]) - you can check it with cargo expand
.Rust
without macros.async
macros produce futures, so they can be used in non-async
functions.try_join!
- combines Result
s/Option
s, transposes tuple of Result
s/Option
s into Result
/Option
of tuple.
rust
assert_eq!(
try_join!(Ok::<_,()>(1), Ok::<_,()>("2"), Ok::<_,()>(3.0)),
Ok::<_,()>((1, "2", 3.0))
);
try_join_async!
- combines futures, transposes tuple of Result
s into Result
of tuple.
rust
assert_eq!(
try_join_async!(ok::<_,()>(1), ok::<_,()>("2"), ok::<_,()>(3.0)).await,
Ok::<_,()>((1, "2", 3.0))
);
try_join_spawn!
- spawns std::thread
per each branch and joins results, transposes tuple of Result
s/Option
s into Result
/Option
of tuple.
rust
assert_eq!(
try_join_spawn!(Ok::<_,()>(1), Ok::<_,()>("2"), Ok::<_,()>(3.0)),
Ok::<_,()>((1, "2", 3.0))
);
try_spawn!
- alias for try_join_spawn!
.try_join_async_spawn!
- spawns tokio task using tokio::spawn
per each branch, transposes tuple of Result
s into Result
of tuple.
rust
assert_eq!(
try_join_async_spawn!(ok::<_,()>(1), ok::<_,()>("2"), ok::<_,()>(3.0)).await,
Ok::<_,()>((1, "2", 3.0))
);
try_async_spawn!
- alias for try_join_async_spawn!
.join!
- combines values.
rust
assert_eq!(
join!(1, "2", 3.0), (1, "2", 3.0)
);
join_async!
- combines futures.
rust
assert_eq!(
join_async!(ready(1), ready("2"), ready(3.0)).await, (1, "2", 3.0)
);
join_spawn!
- spawns std::thread
per each branch.
rust
assert_eq!(
join_spawn!(1, "2", 3.0), (1, "2", 3.0)
);
spawn!
- alias for join_spawn!
.join_async_spawn!
- spawns tokio task using tokio::spawn
per each branch.
rust
assert_eq!(
join_async_spawn!(ready(1), ready("2"), ready(3.0)).await, (1, "2", 3.0)
);
async_spawn!
- alias for join_async_spawn!
.Then: ->
rust no_run
join! { value -> expr }; // => expr(value)
Map: |>
rust no_run
join! { value |> expr }; // => value.map(expr)
AndThen: =>
rust no_run
join! { value => expr }; // => value.and_then(expr)
Filter: ?>
rust no_run
join! { value ?> expr }; // => value.filter(expr)
Dot: ..
or >.
rust no_run
join! { value .. expr }; // => value.expr
join! { value >. expr }; // => value.expr
Or: <|
rust no_run
join! { value <| expr }; // => value.or(expr)
OrElse: <=
rust no_run
join! { value <= expr }; // => value.or_else(expr)
MapErr: !>
rust no_run
join! { value !> expr }; // => value.map_err(expr)
Collect: =>[]
(type is optional)
rust no_run
join! { value =>[] T }; // => value.collect::<T>()
join! { value =>[] }; // => value.collect()
Chain: >@>
rust no_run
join! { value >@> expr }; // => value.chain(expr)
FindMap: ?|>@
rust no_run
join! { value ?|>@ expr }; // => value.find_map(expr)
FilterMap: ?|>
rust no_run
join! { value ?|> expr }; // => value.filter_map(expr)
Enumerate: |n>
rust no_run
join! { value |n> }; // => value.enumerate()
Partition: ?&!>
rust no_run
join! { value ?&!> expr }; // => value.partition(expr)
Flatten: ^^>
rust no_run
join! { value ^^> }; // => value.flatten()
Fold: ^@
rust no_run
join! { value ^@ init_expr, fn_expr }; // => value.fold(init_expr, fn_expr)
TryFold: ?^@
rust no_run
join! { value ?^@ init_expr, fn_expr }; // => value.try_fold(init_expr, fn_expr)
Find: ?@
rust no_run
join! { value ?@ expr }; // => value.find(expr)
Zip: >^>
rust no_run
join! { value >^> expr }; // => value.zip(expr)
Unzip: <->
(types are optional)
rust no_run
join! { value <-> A, B, FromA, FromB }; // => value.unzip::<A, B, FromA, FromB>()
join! { value <-> }; // => value.unzip()
Inspect: ??
rust no_run
join! { value ?? expr }; // => (|value| { (expr)(&value); value })(value) // for sync
join_async! { value ?? expr }; // => value.inspect(expr) // for async
where value
is the previous value.
Every combinator prefixed by ~
will act as deferred action (all actions will wait until completion in every step and only after move to the next one).
Wrap: combinator
>>>
combinator
(s)...
rust
try_join! { value => >>> |> |v| v + 2 } // => value.and_then(|value| value.map(|v| v + 2))
Use to enter to nested constructions like
rust
a.and_then(
// >>>
|b| b.and_then(
// >>>
|c| c.and_then(
|v| Ok(v + 2)
)
)
)
Unwrap: <<<
rust
try_join! {
value
=> >>>
|> |v| v + 2
<<<
|> |v| Some(v + 4)
} // => value.and_then(|value| value.map(|v| v + 2)).map(|v| Some(v + 4))
Use to move out of nested constructions
rust
a.and_then(
// >>>
|b| b.and_then(
// >>>
|c| c.and_then(
|v| Ok(v + 2)
)
// <<<
)
// <<<
).map(
|v| v + 1
)
might be one of
map
=> Only valid for try
macros. Will act as results.map(|(result0, result1, ..)| handler(result0, result1, ..))
rust
assert_eq!(
try_join! {
Some(1),
Some(2),
Some(3),
map => |a, b, c| a + b + c
},
Some(6)
);
- and_then
=> Only valid for try
macros. Will act as results.and_then(|(result0, result1, ..)| handler(result0, result1, ..))
rust
assert_eq!(
try_join! {
Some(1),
Some(2),
Some(3),
and_then => |a, b, c| Some(a + b + c)
},
Some(6)
);
- then
=> Only valid for not try
macros. Will be executed in any case, act as handler(result0, result1, ..)
rust
assert_eq!(
join! {
Some(1),
Some(2),
Some(3),
then => |a: Option<u8>, b: Option<u8>, c: Option<u8>|
Some(a.unwrap() + b.unwrap() + c.unwrap())
},
Some(6)
);
or not specified - then Result<(result0, result1, ..), Error>
or Option<(result0, result1, ..)>
will be returned for try
macros and (result0, result1, ..)
for not try
macros.
You can specify any params at the beginning of macro call.
futures_crate_path
- specifies custom crate path for futures
crate, which will be used for all futures
-related items, used by async
join!
macros. Only valid for async
macros.custom_joiner
- specifies custom joiner function or macro, which will join active branches in step if their count is greater than 1.transpose_results
- specifies should macro transpose tuple of Result
s/Option
s into Result
/Option
of tuple or not. Useful when provided joiner already returns Result
of tuple and there's no need to transpose it.lazy_branches
- wrap every branch into move || {}
when pass values to joiner. By default true
for try_join_spawn!
, try_spawn!
and join_spawn!
, spawn!
macros because they use thread::spawn
call. Only if active branch count > 1.```rust use join::tryjoinasync; use futures::future::ok;
macrorules! customfuturesjoiner { ($($futures: expr),+) => { ::futures::tryjoin!($($futures),*); } }
async fn main() { let value = tryjoinasync! { futurescratepath(::futures) customjoiner(customfuturesjoiner!) transposeresults(false) ok::<_,()>(2u16), ok::<_,()>(3u16), map => |a, b| a + b }.await.unwrap();
assert_eq!(value, 5);
} ```
Rayon demo
```rust use join::{try_join, join};
fn fib(num: u8) -> usize { let mut prev = 0; let mut cur = if num > 0 { 1 } else { 0 }; for _ in 1..num as usize { let tmp = cur; cur = prev + cur; prev = tmp; } cur }
fn main() {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
let calculated = pool.install(||
tryjoin! {
customjoiner(rayon::join)
|| Some(fib(50)),
|| Some(
join! {
customjoiner(rayon::join)
lazybranches(true)
fib(20) -> |v| v + 25,
fib(30) -> |v| vec![v; 10].intoiter() |n> |> |(index, value)| value + index ..sum::
You can specify let
pattern for each branch in order to share result with other branches, or in case if you need to have mut
value between steps.
rust
assert_eq!(
try_join! {
let mut branch_0 = Ok::<_,()>(1) ~|> |v| v + 1,
let branch_1 = Ok::<_,()>(2) ~|> { let value_0 = branch_0.as_ref().unwrap(); move |v| v + value_0 },
map => |b_0, b_1| b_0 * b_1
}.unwrap(),
6
);
In order to capture variables (for ex. values of other branches in example above) you can pass block statements instead of functions:
rust
let mut some_value = Some("capture me");
assert_eq!(try_join! {
Some(0) |> |v| {
// assign `None` to some_value in step expr
some_value = None;
v
} |> {
// capture value before step and get str len
let captured_len = some_value.as_ref().unwrap().len();
move |v| v + captured_len
}
}.unwrap(), 10);
These blocks will be placed before actual step expressions.
Using this macro you can write things like
```rust use rand::prelude::*; use std::sync::Arc; use join::tryjoinspawn;
// Problem: generate vecs filled by random numbers in parallel, make some operations on them in parallel, // find max of each vec in parallel and find final max of 3 vecs
// Solution:
fn main() {
// Branches will be executed in parallel, each in its own thread
let max = tryjoinspawn! {
let branch0 =
generaterandomvec(1000, 10000000u64)
.intoiter()
// .map(power2) (Multiply every element by itself)
|> power2
// .filter(iseven) (Filter even values)
?> iseven
// .collect::Vec<_>
)
=>[] Vec<_>
// Arc::new(Some(...))
// Use Arc
to share data with branch 1
-> Arc::new -> Some
// Find max and clone its value
// .andthen(|v| v.iter().max().map(Clone::clone))
~=> >>> ..iter().max() |> Clone::clone,
generaterandomvec(10000, 100000000000000f64)
.intoiter()
// .map(getsqrt) (Extract sqrt from every element)
|> getsqrt
// Some(...)
-> Some
// .andthen(|v| v...)
~=> >>>
// .enumerate() (Add index in order to compare with the values of branch0)
|n>
// .map(...)
|> {
// Get data from branch 0 by cloning arc
let branch0 = branch0.asref().unwrap().clone();
let len = branch0.len();
// Compare every element of branch 1 with element of branch0
// with the same index and take min
move |(index, value)|
if index < len && value as u64 > branch0[index] {
branch0[index]
} else {
value as u64
}
}..max(),
generaterandomvec(100000, 100000u32)
.intoiter()
-> Some
// .andthen(|v| v.max())
~=> >>> ..max(),
andthen => |max0, max1, max2|
// Find final max
[max0, max1, max2 as u64].iter().max().map(Clone::clone)
}
.unwrap();
println!("Max: {}", max);
}
fn generaterandomvec
fn is_even
fn getsqrt
fn power2
```rust extern crate rand; extern crate join;
use rand::prelude::*; use join::try_join;
fn main() { let mut rng = rand::thread_rng();
let result = try_join! {
(0..10)
// .map(|index| { let value ... })
|> |index| { let value = rng.gen_range(0, index + 5); if rng.gen_range(0f32, 2.0) > 1.0 { Ok(value) } else { Err(value) }}
// .filter(|result| ...)
?> |result| match result { Ok(_) => true, Err(value) => *value > 2 }
// .map(|v| v.map(|value| value + 1))
|> >>> |> |value| value + 1
<<<
// .try_fold(0i32, |acc, cur| {...})
?^@ 0i32, |acc, cur| {
cur.map(|cur| acc + cur).or_else(|cur| Ok(acc - cur))
}
// .and_then(|value| if ...)
=> |value| if value > 0 { Ok(value as u8) } else { Err(0) }
// Wait for all branches to be successful and then calculate fib
~|> fib,
(0..6)
// .map(|index| { let value ... })
|> |index| { let value = rng.gen_range(0, index + 5); if rng.gen_range(0f32, 2.0) > 1.0 { Some(value) } else { None }}
// .filter_map(|v| v)
?|> >>>
<<<
..sum::<u16>()
// Return `Ok` only if value is less than 20
-> |value| if value < 20 { Ok(value as u8) } else { Err(0) }
// Wait for all branches to be successful and then calculate fib
~|> fib,
// In case of success, multilpy fibs
map => |v_1, v_2| v_1 * v_2
};
result.map(|value| println!("Result: {}", value)).unwrap_or_else(|err| println!("Error: {:#?}", err));
}
fn fib(num: u8) -> usize { println!("CALLED FIB!"); let mut prev = 0; let mut cur = if num > 0 { 1 } else { 0 }; for _ in 1..num as usize { let tmp = cur; cur = prev + cur; prev = tmp; } cur } ```
Pay attention: this demo uses tokio = "0.2.0-alpha.6"
, however join!
macros are compatible with the latest tokio
.
Cargo.toml
```toml [dependencies] futures = { version = "=0.3.0-alpha.19", package = "futures-preview", features=["async-await"] } tokio = "0.2.0-alpha.6" failure = "0.1.6" futures-timer = "1.0.2" reqwest = "0.10.0-alpha.2" ```
And like this:
```rust use join::tryjoinasync; use futures::stream::{iter, Stream}; use reqwest::Client; use futures::future::{tryjoinall, ok, ready}; use failure::{format_err, Error};
async fn main() {
println!(
"{} {}\n{}",
"Hello.\nThis's is the game where winner is player, which number is closest to",
"the max count of links (starting with https://
) found on one of random pages.",
"You play against random generator (0-500)."
);
enum GameResult {
Won,
Lost,
Draw
}
let client = Client::new();
let game = try_join_async! {
// Make requests to several sites
// and calculate count of links starting from `https://`
get_urls_to_calculate_link_count()
|> {
// If pass block statement instead of fn, it will be placed before current step,
// so it will us allow to capture some variables from context
let ref client = client;
move |url|
// `try_join_async!` wraps its content into `Box::pin(async move { })`
try_join_async! {
client
.get(url).send()
=> |value| value.text()
=> |body| ok((url, body.matches("https://").collect::<Vec<_>>().len()))
}
}
// Collect values into `Vec<_>`
=>[] Vec<_>
|> Ok
=> try_join_all
!> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
=> >>>
..into_iter()
.max_by_key(|(_, link_count)| *link_count)
.ok_or(format_err!("Failed to find max link count"))
-> ready
// It waits for input in stdin before log max links count
~?? >>>
..as_ref()
|> |(url, count)| {
let split = url.to_owned().split('/').collect::<Vec<_>>();
let domain_name = split.get(2).unwrap_or(&url);
println!("Max `https://` link count found on `{}`: {}", domain_name, count)
}
..unwrap_or(()),
// Concurrently it makes request to the site which generates random number
get_url_to_get_random_number()
-> ok
=> {
// If pass block statement instead of fn, it will be placed before current step,
// so it will allow us to capture some variables from context
let ref client = client;
let map_parse_error = |error, value| format_err!("Failed to parse random number: {:#?}, value: {}", error, value);
move |url|
try_join_async! {
client
.get(url)
.send()
=> |value| value.text()
!> |err| format_err!("Error retrieving random number: {:#?}", err)
=> |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
=> |value|
ready(
value
.parse::<u16>()
.map_err(|err| map_parse_error(err, value))
)
}
}
// It waits for input in stdin before log random value
~?? >>>
..as_ref()
|> |number| println!("Random: {}", number)
..unwrap_or(()),
// Concurrently it reads value from stdin
read_number_from_stdin() |> Ok,
// Finally, when we will have all results, we can decide, who is winner
map => |(_url, link_count), random_number, number_from_stdin| {
let random_diff = (link_count as i32 - random_number as i32).abs();
let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
match () {
_ if random_diff > stdin_diff => GameResult::Won,
_ if random_diff < stdin_diff => GameResult::Lost,
_ => GameResult::Draw
}
}
};
let _ = game.await.map(
|result|
println!(
"You {}",
match result {
GameResult::Won => "won!",
GameResult::Lost => "lose...",
_ => "have the same result as random generator!"
}
)
).unwrap_or_else(|error| eprintln!("Error: {:#?}", error));
}
fn geturlstocalculatelinkcount() -> impl Stream
}
fn get
loop {
println!("Please, enter number (`u16`)");
let next = reader.next_line();
let result = try_join_async! {
next
=> >>>
..ok_or(Error::new(ErrorKind::Other, "Failed to read value from stdin"))
=> >>>
..parse()
!> |err| Error::new(ErrorKind::Other, format!("Value from stdin isn't a correct `u16`: {:?}", err))
<<<
-> ready
}.await;
if let Ok(value) = result {
break value
}
}
} ```
Converts input in series of chained results and joins them step by step.
```rust use std::error::Error; use join::try_join;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() { let sum = tryjoin! { // action1(), action_1(),
// action_2().map(|v| v as u16),
action_2() |> |v| v as u16,
// action_2().map(|v| v as u16 + 1).and_then(|v| Ok(v * 4)),
action_2() |> |v| v as u16 + 1 => |v| Ok(v * 4),
// action_1().and_then(|_| Err("5".into())).or(Ok(2)),
action_1() => |_| Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
Each branch will represent future chain. All branches will be joined using ::futures::join!
/::futures::try_join!
macro and join_async!
/try_join_async!
will return unpolled
future.
```rust use std::error::Error; use join::tryjoinasync; use futures::future::{ok, err};
type Result
async fn action1() -> Result
async fn main() { let sum = tryjoinasync! { // action1(), action1(),
// action_2().and_then(|v| ok(v as u16)),
action_2() => |v| ok(v as u16),
// action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
action_2() |> |v| v.map(|v| v as u16 + 1) => |v| ok(v * 4u16),
// action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
action_1() => |_| err("5".into()) <= |_| ok(2u16),
and_then => |a, b, c, d| ok(a + b + c + d)
}.await.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
To execute several tasks in parallel you could use join_spawn!
(spawn!
) for sync tasks
and join_async_spawn!
(async_spawn!
) for futures. Since join_async
already provides concurrent futures execution in one thread, join_async_spawn!
spawns every branch into tokio
executor, so they will be evaluated in multi threaded executor.
join_spawn
spawns one ::std::thread
per each step of each branch (number of branches is the max thread count at the time).
```rust
use std::error::Error; use join::tryjoinspawn;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() { // Branches will be executed in parallel let sum = tryjoinspawn! { // thread::spawn(move || action1()), action1(),
// thread::spawn(move || action_2().map(|v| v as usize)),
action_2() |> |v| v as usize,
// thread::spawn(move || action_2().map(|v| v as usize + 1).and_then(|v| Ok(v * 4))),
action_2() |> |v| v as usize + 1 => |v| Ok(v * 4),
// thread::spawn(move || action_1().and_then(|_| Err("5".into())).or(Ok(2))),
action_1() => |_| Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
Thread names
In runtime thread's name will be constructed from name of parent thread and join%branchindex%.
Example with several branches:
```rust extern crate join;
use std::thread;
use join::tryjoinspawn;
fn getcurrentthreadname() -> String { thread::current().name().unwrap().toowned() }
fn printbranchthreadname(index: &Result
fn main() { let _ = tryjoinspawn! { Ok(0) ?? printbranchthreadname, Ok(1) ?? printbranchthreadname, tryjoinspawn! { Ok(2) ?? printbranchthreadname, tryjoinspawn! { Ok(3) ?? printbranchthreadname, } } }.unwrap(); }
// Branch: 0. Thread name: mainjoin0. // Branch: 1. Thread name: mainjoin1. // Branch: 2. Thread name: mainjoin2join0. // Branch: 3. Thread name: mainjoin2join1join0. // Order could be different. ```
join_async_spawn!
uses ::tokio::spawn
function to spawn tasks so it should be done inside tokio
runtime
(number of branches is the max count of tokio
tasks at the time).
```rust use std::error::Error; use join::tryjoinasync_spawn; use futures::future::{ok, err};
type Result
async fn action_1() -> Result
async fn action_2() -> Result
async fn main() { let sum = tryjoinasyncspawn! { // tokio::spawn(Box::pin(action1())) action_1(),
// tokio::spawn(Box::pin(action_2().and_then(|v| ok(v as u16))))
action_2() => |v| ok(v as u16),
// tokio::spawn(Box::pin(action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16))))
action_2() |> |v| v.map(|v| v as u16 + 1) => |v| ok(v * 4u16),
// tokio::spawn(Box::pin(action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16))))
action_1() => |_| err("5".into()) <= |_| ok(2u16),
and_then => |a, b, c, d| ok(a + b + c + d)
}.await.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```
By separating chain in actions, you will make actions wait for completion of all of them in current step before go to the next step.
```rust use std::error::Error; use join::try_join;
type Result
fn action_1() -> Result
fn action_2() -> Result
fn main() {
let sum = tryjoin! {
action1(),
let result1 = action2() ~|> |v| v as u16 + 1,
action2() ~|> {
// result_1
now is the result of action_2()
[Ok(1u8)]
let result1 = result1.asref().ok().map(Clone::clone);
move |v| {
if result1.issome() {
v as u16 + 1
} else {
unreachable!()
}
}
} ~=> {
// result_1
now is the result of |v| v as u16 + 1
[Ok(2u16)]
let result1 = result1.asref().ok().map(Clone::clone);
move |v| {
if let Some(result1) = result1 {
Ok(v * 4 + result1)
} else {
unreachable!()
}
}
},
action1() ~=> || Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
} ```