Define a Hera Pipeline
This section describes how to define a Hera pipeline function. A pipeline function is a Python function that returns a Hera Workflow object. Any arguments in the function signature are passed as parameters during run(action="build").
Pipeline Function Anatomy
Define a pipeline by creating a Python function that returns a Hera Workflow object:
from hera.workflows import Workflow, DAG, Parameter
from digitalhub_runtime_hera.dsl import step
def pipeline(url: str):
# Create a new Workflow with an entrypoint DAG and parameters
with Workflow(entrypoint="dag", arguments=Parameter(name="url")) as w:
with DAG(name="dag"):
# Define workflow steps here
...
return w
Typical Pipeline Structure
- Create a
WorkflowHera object and set the entrypoint (usually a DAG). - Use a
DAGorStepscontext to define the workflow structure. - Add steps via
step(...), providing templates, function names, inputs/outputs and parameters. - Chain steps using Hera operators to define dependencies.
- Return the
WorkflowHera object.
DSL Components
The runtime provides DSL helpers in digitalhub_runtime_hera.dsl:
step function
step(**step_kwargs) creates a workflow step (a Hera Task) inside a DAG or Steps context. Main arguments:
| Parameter | Type | Example | Description |
|---|---|---|---|
| template | dict | {"action": "job"} | Parameters template to pass to function.run() or workflow.run(). The action key is always required. To pass inputs from other steps use the {{inputs.parameters.parameter_name}} template syntax. |
| function | str | "download-data" | Name of the digitalhub function to execute. |
| function_id | str | "abc123" | Function ID (optional). |
| name | str | "step1" | Step name. |
| inputs | dict | {"some-input": ANOTHER_STEP.get_parameter("some-output")} | Step inputs. Keys become Hera Parameters; values can reference other steps' outputs. |
| outputs | list | ["output1"] | Step outputs. These become Hera Outputs and Artifacts. |
Other keyword arguments are forwarded to the underlying container template. step must be called inside a DAG or Steps context.
Step Inputs and Outputs
Step inputs are defined via the inputs argument to step(...). This is a dictionary where keys are input names and values can reference outputs from other steps using the get_parameter(...) method. For example:
# Here the step A (let say a python function) produces an output named "data"
A = step(template={"action":"job"}, function="step-a", outputs=["data"])
# Step B consumes the output "data" from step A as its input. Because "data" is an output of step A, we use A.get_parameter("data") to reference it. Note that the input name "input_data" in step B can be different from the output name "data" in step A.
# In the template of step B, we use the template syntax {{inputs.parameters.data-from-b}} to refer to the input parameter.
# The "inputs" provided in template of step B belongs to the spec of the function "step-b", in this case a python function.
B = step(template={"action":"job", "inputs": {"parameter-name": "{{inputs.parameters.data-from-b}}"}}, function="step-b", inputs={"data-from-b": A.get_parameter("data")})
container_template function
container_template(...) builds a Hera container template for a workflow step. It returns a Hera Container object and accepts similar arguments to step (template, function, name, inputs, outputs, ...). Use it directly for advanced scenarios or custom templates.
Pipeline Definition Example
from hera.workflows import Workflow, DAG, Parameter
from digitalhub_runtime_hera.dsl import step
def pipeline():
# Create a new Workflow with an entrypoint DAG and a parameter
with Workflow(entrypoint="dag", arguments=Parameter(name="url")) as w:
with DAG(name="dag"):
# First step: takes the workflow parameter and outputs a dataset
A = step(template={"action":"job", "inputs": {"url": "{{workflow.parameters.url}}"}},
function="download-data",
outputs=["dataset"])
# Subsequent steps consume A's output
B = step(template={"action":"job", "inputs": {"di": "{{inputs.parameters.di}}"}},
function="process-spire",
inputs={"di": A.get_parameter("dataset")})
C = step(template={"action":"job", "inputs": {"di": "{{inputs.parameters.di}}"}},
function="process-measures",
inputs={"di": A.get_parameter("dataset")})
# Chain the steps
A >> [B, C]
return w