streamline
Reversible Stream-based state machine library for Rust
Here's an example of how to use a Streamline
to create a GitHub repo, Tweet about it, and reverse the whole process if something goes wrong.
```rust // recommended, but not required use asynctrait::asynctrait; // some example clients for communicating through third-party APIs use clients::{Github, Twitter}; use futures::StreamExt; use streamline::{State, Streamline};
const MY_USERNAME: &'static str = "my-github-username";
// clients are stored in context for shared access throughout the life of the streamline
struct Context { github: Github, twitter: Twitter, }
// you should create better errors than this
type MyError = Box
enum MyState { Github { reponame: String }, Twitter { repoid: i32, reponame: String } Done { repoid: i32, reponame: String, tweetid: i32 } }
impl State for MyState { type Context = Context; type Error = MyError;
// every state needs to be mapped to the next async fn next(&self, context: Option<&Self::Context>) -> Result
let next_state = match self {
MyState::Github { repo_name } => {
context
.github
.add_repo(&repo_name)
.await?
.map(|response| Some(MyState::Twitter { repo_id: &response.repo_id, repo_url: repo_name }))
},
MyState::Twitter { repo_name, .. } => {
context
.twitter
.tweet(&format("Look at my new Github repo at https://github.com/{}/{}!", &repo_name))
.await?
.map(|response| Some(MyState::Done { tweet_id: response.tweet_id }))
},
MyState::Done { .. } => None // returning Ok(None) stops the stream!
};
Ok(next_state)
}
// optionally, old states can be cleaned up if something goes wrong async fn revert(&self, context: Option<&Self::Context>) -> Result
let next_state = match self {
MyState::Done { tweet_id, repo_id, repo_name } => {
context
.twitter
.delete_tweet(tweet_id)
.await?;
Some(MyState::Twitter { repo_id, repo_name })
},
MyState::Twitter { repo_id, repo_name } => {
context
.github
.delete_repo(repo_id)
.await?;
Some(MyState::Github { repo_id, repo_name })
},
MyState::Github { .. } => None
};
Ok(next_state)
} }
async fn handletweetingrepo(repo_name: String) { let context = Context { github: Github::new(), twitter: Twitter::new(), };
Streamline::build(MyState { reponame }) .context(context) .run() .foreach(|state| println!("Next state {:?}", &state)) .await; } ```
If one wants to move from one state to the next within a process, it makes sense in Rust to look towards some of the many state machine patterns available through the type system. enum
s, in particular, are a great way of modeling the progress of a process in a way that excludes impossible states along the way. But there's less certainty around handling state for the following scenarios:
enum
, it's much trickier to know when to handle updates to state that is not directly attached to a single variant of the state machine. How does one, for example, handle updating a value in a database as a state machine progresses? What about interacting with third-party services? When should these parts of state be handled?Stream
is easy... just drop the Stream
! But cleaning up after a stream that represents some in-progress state is much more difficult.streamline
solves addresses those problems in the following ways:
futures::Stream
-compatibility: rather than using side effects during state machine execution, this library models every update to a state machine as an Item
in a std::futures::Stream
.Context
: all super-variant state can be accessed with a consistent Context
.Err
, streamline
will (optionally) revert all the states up to that point, returning the original error.run_preemptible
method returns a Stream
and a Cancel
handler that can be used to trigger the revert process of a working stream.