KFP Pipelines Runtime
The kfp runtime allows you to run workflows within the platform.
The runtime introduces a function of kind kfp
and a task of kind pipeline
.
Prerequisites
Python version and libraries:
python >= 3.9
digitalhub-runtime-kfp
The package is available on PyPI:
HOW TO
With the kfp runtime you can use the function's run()
method to execute a workflow you have defined.
The kfp runtime execution workflow follows roughly these steps:
- Define one or more functions to be executed. These functions can be from other runtimes.
- Define somewhere a pipeline.
- Build the pipeline with the
run(action="build")
method. (Mandatory step!) - Execute the pipeline with
run(action="pipeline")
method. This calls a stepper that executes various KFP ContainerOP.
Pipeline definition
To define a pipeline you need to define function with the def
keyword. You can give the function a name and declare its arguments as usual.
From digitalhub_runtime_kfp.dsl
you must import pipeline_context
. Its a context manager object that allows you to order the various steps of execution and chain them together with inputs and outputs. Once you write the pipeline function, store it in a file .py.
When you define the steps inside the pipeline, you specify also inputs, outputs, parameters and values for the steps.
Step parameters
Parameter | Type | Example | Description |
---|---|---|---|
name | str | "download" | Name of the step |
function | str | "downloader-funct" | Name of the dh function to execute. It must exists in the dh project context |
action | str | "job" | Action to execute |
inputs | dict | {"url": "dataitem_key", "dataset": previous_step.outputs["some_key"]} | Input dh parameters keys (dataitems, artifacts, models). The syntax for the inputs is the same as in the kfp package when it comes to link an output step to an input. |
outputs | dict | {"dataset": "dataset"} | Dh outputs mapped |
parameters | dict | {"param": "value"} | Function generic parameters |
values | list | ["val1", "val2"] | List of non dh outputs referenced as strings |
Workflow definition example
from digitalhub_runtime_kfp.dsl import pipeline_context
def myhandler(url):
# Use pipeline_context() manager
with pipeline_context() as pc:
# Defaine first step
step1 = pc.step(name="download", # Name of the step 1
function="downloader-funct", # Name of the dh function to execute
action="job", # Action to execute
inputs={"url": url}, # Input parameters
outputs={"dataset": "dataset"}) # Mapped outputs
step2 = pc.step(name="extract_parking", # Name of the step 2
function="extract-parkings", # Name of the dh function to execute
action="job", # Action to execute
inputs={"di": step1.outputs['dataset']}, # Input parameters from previous step
outputs={"parkings": "parkings"}) # Mapped outputs
Workflow
The kfp runtime introduces a function of kind kfp
.
Workflow parameters
Name | Type | Description | Default |
---|---|---|---|
project | str | Project name. Required only if creating from library, otherwise MUST NOT be set | |
name | str | Name that identifies the object | required |
kind | str | Workflow kind | required |
uuid | str | ID of the object in form of UUID4 | None |
description | str | Description of the object | None |
labels | list[str] | List of labels | None |
embedded | bool | Flag to determine if object must be embedded in project | True |
code_src | str | URI pointer to source code | None |
code | str | Source code (plain text) | None |
base64 | str | Source code (base64 encoded) | None |
handler | str | Function entrypoint | None |
lang | str | Source code language (hint) | None |
image | str | Image where the workflow will be executed | None |
tag | str | Tag of the image where the workflow will be executed | None |
Workflow kinds
The kind
parameter must be one of the following:
kfp
Workflow example
# From project ...
workflow = project.new_workflow(name="workflow",
kind="kfp",
code_src="pipeline.py",
handler="handler")
# .. or from sdk
workflow = dh.new_workflow(project="my-project",
name="workflow",
kind="kfp",
code_src="pipeline.py",
handler="handler")
Task
The KFP runtime introduces a task of kind pipeline
that allows you to run a workflow.
A Task
is created with the run()
method, so it's not managed directly by the user. The parameters for the task creation are passed directly to the run()
method, and may vary depending on the kind of task.
Task parameters
Name | Type | Description | Default | Kind specific |
---|---|---|---|---|
action | str | Task action | required | |
node_selector | list[dict] | Node selector | None | |
volumes | list[dict] | List of volumes | None | |
resources | dict | Resources restrictions | None | |
affinity | dict | Affinity | None | |
tolerations | list[dict] | Tolerations | None | |
envs | list[dict] | Env variables | None | |
secrets | list[str] | List of secret names | None | |
profile | str | Profile template | None | |
schedule | str | Schedule for the job | None |
Task actions
Actions must be one of the following:
build
pipeline
Task example
Run
The Run
object is, similar to the Task
, created with the run()
method.
The run's parameters are passed alongside the task's ones.
Run parameters
Name | Type | Description | Default |
---|---|---|---|
inputs | dict | Inputs for the pipeline function. | None |
Run example
run_build = workflow.run(action="build")
run = workflow.run(
action="pipeline",
parameters={"dataitem": dataitem.key}
)
Run methods
output
Get run's output by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name
|
str
|
Key of the result. |
required |
as_key
|
bool
|
If True, return result as key. |
False
|
as_dict
|
bool
|
If True, return result as dictionary. |
False
|
Returns:
Type | Description |
---|---|
Entity | dict | str | None
|
Result. |
outputs
Get run's outputs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
as_key
|
bool
|
If True, return results as keys. |
False
|
as_dict
|
bool
|
If True, return results as dictionaries. |
False
|
Returns:
Type | Description |
---|---|
dict
|
List of output objects. |
result
Get result by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result_name
|
str
|
Name of the result. |
required |
Returns:
Type | Description |
---|---|
Any
|
The result. |
results
Get results.
Returns:
Type | Description |
---|---|
dict
|
The results. |
values
Get values.
Returns:
Type | Description |
---|---|
dict
|
The values. |