Flowmium is a workflow orchestrator that uses Kubernetes. You can define and run a YAML workflow of containers or you can run a python workflow where each function runs as a kubernetes pod.
A python workflow would look like this
```python from flowmium import Flow, FlowContext from flowmium.serializers import plaintext, jsontext, pkl
flow = Flow("testing")
@flow.task(serializer=json_text) def foo() -> str: return "Hallo world"
@flow.task({"inputstr": foo}, serializer=plaintext) def replacelettera(inputstr: str, flowctx: FlowContext) -> str: return inputstr.replace("a", "e") + str(flowctx.task_id)
@flow.task({"inputstr": foo}, serializer=pkl) def replacelettert(inputstr: str) -> str: return input_str.replace("t", "d")
@flow.task( {"first": replacelettert, "second": replacelettera}, serializer=plain_text ) def concat(first: str, second: str) -> str: return f"{first} {second}"
if name == "main": flow.run()
```
flowctl
CLIThe flowctl
CLI is used to monitor current status of workflows, submit new workflows and download artifacts.
| Action | Command |
| ------------------- | ----------------------------------------------------------- |
| List workflows | flowctl list
|
| Use explicit URL | flowctl --url http://localhost:8080 list
|
| Submit a YAML flow | flowctl submit flow.yaml
|
| Download artefact | flowctl download <flow-id> <output-name> <local-dir-path>
|
| Subscribe to events | flowctl subscribe
|
| Describe a flow | flowctl describe <id>
|
| Create secrets | flowctl secret create <key> <value>
|
| Update secret | flowctl secret update <key> <value>
|
| Delete secret | flowctl secret delete <key>
|
NOTE: Secrets are stored in the server and can be referred to set environment variable values in YAML definition or the Python workflows. This is so you don't have to commit secrets to your repository. They don't however use Kubernetes secrets, they are set as normal environment variables when workflow tasks are deployed as a Job.
These instructions will allow you to run an example python flow (framework/tests/example_flow.py
) all from local source without pulling from upstream (including the executor).
Use this to validate your changes.
Instructions assume you are at the root of the repo.
Install sqlx CLI
cargo install sqlx-cli
Run a test kubernetes cluster, minio and container registry in local
cd flowmium/
make up
Watch for pods running in the local cluster
cd flowmium/
make watch
Run migrations
cd flowmium/
sqlx migrate run
Run the flowmium server from root of this repo
cd flowmium/
export FLOWMIUM_POSTGRES_URL='postgres://flowmium:flowmium@localhost/flowmium'
export FLOWMIUM_STORE_URL='http://localhost:9000'
export FLOWMIUM_TASK_STORE_URL='http://172.16.238.4:9000'
export FLOWMIUM_BUCKET_NAME='flowmium-test'
export FLOWMIUM_ACCESS_KEY='minio'
export FLOWMIUM_SECRET_KEY='password'
export FLOWMIUM_INIT_CONTAINER_IMAGE='docker.io/shnoo28/flowmium:latest'
export FLOWMIUM_NAMESPACE=default
export KUBECONFIG=./kubeconfig.yaml
cargo run --bin flowmium -- server --port 8080
Watch flow status using flowctl
cd flowmium/
cargo build
watch ./target/debug/flowctl list
Build and push the example python flow (NOTE: You might want to use a different image name if you running the test for the second time or prune docker images on your machine)
cd framework/
docker build . -t py-flow-test
docker tag py-flow-test localhost:5180/py-flow-test:latest
docker push localhost:5180/py-flow-test:latest
Submit the flow to the executor server
python3 -m tests --image registry:5000/py-flow-test:latest --cmd 'python3 -m tests' --flowmium-server http://localhost:8080
For running e2e tests with init container from upstream
make test
For running e2e tests with init container from source
FLOWMIUM_INIT_CONTAINER_IMAGE_FROM_SOURCE=true make test
Run make test
from framework/
path.