Workflows
Workflows allow for organizing the single operations in a advanced management pipelines, to perform a series operation of data processing, ML model training and serving, etc. Workflows represent long-running procedures defined as Directed Acyclic Graphs (DAGs) where each node is a single unit of work performed by the platform (e.g., as a Kubernetes Job).
As in case of functions, it is possible for the platform to have different workflow runtimes. Currently, the only workflow runtime implemented is the one based on Kubeflow Pipelines infrastructure. See KFP Runtime for further details about how the workflow is defined and executed with the Kubeflow Pipelines component of the platform.
Similarly, to functions the workflows may be managed via console UI or via Python SDK.
Management via UI
Workflows can be created and managed as entities from the console. You can access them from the dashboard or the left menu. You can:
create
a new workflowexpand
a workflow to see its 5 latest versionsshow
the details of a workflowedit
a workflowdelete
a workflowfilter
workflows by name and kind
We will now see how to create, read, update and delete workflows using the UI, similarly to what is done with the SDK.
Create
Click CREATE
and a form will be shown:
Mandatory fields are:
Name
: name and identifier of the workflowKind
: kind of workflow
Metadata fields are optional and may be updated later.
Description
: a human-readable descriptionLabels
: list of labelsName
: name of the functionEmbedded
: flag for embedded metadataVersioning
: version of the functionOpenmetadata
: flag to publish metadataAudit
: author of creation and modification
In case of a kfp
workflow, the source code and handler fields are required as well.
Read
Click SHOW
to view a workflow's details.
On the right side, all versions of the resource are listed, with the current one highlighted. By clicking a different version, values displayed will change accordingly.
The INSPECTOR
button will show a dialog containing the resource in JSON format.
The EXPORT
button will download the resource's information as a yaml file.
In case of kfp
workflows, the executions of the workflow instances can be monitored with the corresponding DAG viewer.
Update
You can update a workflow by clicking EDIT
. Greyed-out fields may not be updated.
Delete
You can delete a workflow from either its detail page or the list of workflows, by clicking DELETE
.
Management via SDK
A workflow
can be managed with the following methods.
new_workflow
: create a new workflowget_workflow
: get a workflowupdate_workflow
: update a workflowdelete_workflow
: delete a workflowlist_workflows
: list all workflows
This is done in two ways. The first is through the SDK and the second is through the Workflow
object.
Example:
import digitalhub as dh
project = dh.get_or_create_project("my-project")
workflow = project.new_workflow(name="my-workflow",
kind="kfp",
source={"source": "src/pipeline.py"},
handler="pipeline")
The syntax is the same for all CRUD methods. The following sections describe how to create, read, update and delete a workflow, focusing on managing workflows through the library. If you want to manage workflows from the project, you can use the Project
object and avoid having to specify the project
parameter.
Create
To create a workflow you can use the new_workflow()
method.
Mandatory parameters are:
name
: name of the workflowkind
: kind of the workflow runtime (e.g.,kfp
)- source: source code specification (e.g., file reference)
- handler: name of the pipeline method
Optional parameters are:
uuid
: uuid of the workflow (this is automatically generated if not provided). Must be a valid uuid v4.description
: description of the workflowlabels
: labels of the workflowembedded
: whether the workflow is embedded or not. IfTrue
, the workflow is embedded (all the spec details are expressed) in the project. IfFalse
, the workflow is not embedded in the projectkwargs
: keyword arguments passed to the spec constructor
Example:
workflow = project.new_workflow(name="my-workflow",
kind="kfp",
source={"source": "src/pipeline.py"},
handler="pipeline")
Read
To read a workflow you can use the get_workflow()
or import_workflow()
methods. The first one searches for the workflow into the backend, the second one loads it from a local yaml.
Get
Mandatory parameters are:
project
: the project in which the workflow will be created
Optional parameters are:
entity_name
: to use the name of the workflow as identifier. It returns the latest version of the workflowentity_id
: to use the uuid of the workflow as identifier. It returns the specified version of the workflowkwargs
: keyword arguments passed to the client that communicates with the backend
Example:
workflow = dh.get_workflow(project="my-project",
entity_name="my-workflow")
workflow = dh.get_workflow(project="my-project",
entity_id="uuid-of-my-workflow")
Import
Mandatory parameters are:
file
: file path to the workflow yaml
Example:
workflow = dh.import_workflow(file="./some-path/my-workflow.yaml")
Update
To update a workflow you can use the update_workflow()
method.
Mandatory parameters are:
workflow
: workflow object to be updated
Optional parameters are:
kwargs
: keyword arguments passed to the client that communicates with the backend
Example:
workflow = dh.new_workflow(project="my-project",
name="my-workflow",
kind="kfp",
source={"source": "src/pipeline.py"},
handler="pipeline")
workflow.metadata.description = "My new description"
workflow = dh.update_workflow(workflow=workflow)
Delete
To delete a workflow you can use the delete_workflow()
method.
Mandatory parameters are:
project
: the project in which the workflow exists
Optional parameters are:
entity_name
: to use the name of the workflow as identifierentity_id
: to use the uuid of the workflow as identifierdelete_all_versions
: ifTrue
, all versions of the workflow will be deleted. Mutually exclusive with theentity_id
parameter.kwargs
: keyword arguments passed to the client that communicates with the backend
Example:
dh.delete_workflow(project="my-project",
entity_id=workflow.id)
List
To list all workflows you can use the list_workflows()
method.
Mandatory parameters are:
project
: the project containing the workflows
Optional parameters are:
kwargs
: keyword arguments passed to the client that communicates with the backend
Example:
workflows = dh.list_workflows(project="my-project")
Workflow object
The Workflow
object is built using the new_workflow()
method. There are several variations of the Workflow
object based on the kind
of the workflow. The SDK supports the following kinds:
kfp
: represents a workflow implemented with thr Kbeflow Pipleines runtime.
For each different kind, the Workflow
object has a different set of methods and different spec
, status
and metadata
.
All the Workflow
kinds have a save()
and an export()
method to save and export the entity workflow into backend or locally as yaml.
To create a specific workflow, you must use the desired kind
in the new_workflow()
method.