DBT runtime
The DBT runtime allows you to run DBT transformations on your data. It is a wrapper around the DBT CLI tool.
The runtime introduces a function of kind dbt
and a task of kind transform
.
Prerequisites
Python version and libraries:
python >= 3.9
digitalhub-runtime-dbt
The package is available on PyPI:
python -m pip install digitalhub-runtime-dbt # for remote execution only
python -m pip install digitalhub-runtime-dbt[local] # for local execution
HOW TO
With the DBT runtime you can use the function's run()
method to execute a DBT query you have defined.
The DBT runtime execution workflow follows roughly these steps:
- The runtime fetches the input dataitems by downloading them locally. The runtime tries to get the file from the
path
attribute in the dataitem specification. At the moment, we support the following path types:http(s)://<url>
s3://<bucket>/<path>
sql://<database>(/<schema-optional>)/<table>
<local-path>
- The runtime inserts the data into a temporary versioned table in the default postgres database. These tables are named
<dataitem-name>_v<dataitem-id>
, and will be deleted at the end of the execution. - The runtime collect the source code of the DBT query and creates all the necessary DBT artifacts (profiles.yml, dbt_project.yml, etc.) and runs the DBT transformation.
- The runtime stores the output table into the default postgres database as result of the DBT execution. The table name is built from the
outputs
parameter. Then, the runtime creates a dataitem with theoutputs
name parameter and saves it into the Core backend. You can retrieve the dataitem with therun.outputs()
method. In general, the output table versioned is named<dataitem-output-name>_v<dataitem-output-id>
and is stored in the default postgres database passed to the runtime via env variable.
Function
The DBT runtime introduces a function of kind dbt
that allows you to execute sql dbt queries on your data.
Function parameters
Name | Type | Description | Default |
---|---|---|---|
project | str | Project name. Required only if creating from library, otherwise MUST NOT be set | |
name | str | Name that identifies the object | required |
kind | str | Function kind | required |
uuid | str | ID of the object in form of UUID4 | None |
description | str | Description of the object | None |
labels | list[str] | List of labels | None |
embedded | bool | Flag to determine if object must be embedded in project | True |
code_src | str | URI pointer to source code | None |
code | str | Source code (plain text) | None |
base64 | str | Source code (base64 encoded) | None |
handler | str | Function entrypoint | None |
lang | str | Source code language (hint) | None |
Function kinds
The kind
parameter must be:
dbt
Function example
import digitalhub as dh
project = dh.get_or_create_project("my_project")
sql = """
SELECT * FROM {{ ref("my_table") }}
"""
dataitem = project.new_dataitem("my_dataitem", kind="table", path="path-to-some-data")
function = dh.new_function(
kind="dbt",
name="my_function",
code=sql
)
Task
The DBT runtime introduces a task of kind transform
that allows you to run a DBT transformation on your data.
A Task
is created with the run()
method, so it's not managed directly by the user. The parameters for the task creation are passed directly to the run()
method, and may vary depending on the kind of task.
Task parameters
Name | Type | Description | Default | Kind specific |
---|---|---|---|---|
action | str | Task action | required | |
node_selector | list[dict] | Node selector | None | |
volumes | list[dict] | List of volumes | None | |
resources | dict | Resources restrictions | None | |
affinity | dict | Affinity | None | |
tolerations | list[dict] | Tolerations | None | |
envs | list[dict] | Env variables | None | |
secrets | list[str] | List of secret names | None | |
profile | str | Profile template | None |
Task actions
Actions must be one of the following:
serve
: to deploy a service
Task example
run = function.run(
action="transform",
inputs={"my_table": my_dataitem.key},
outputs={"output_table": "my_output_table"},
)
Run
The Run
object is, similar to the Task
, created with the run()
method.
The run's parameters are passed alongside the task's ones.
Run parameters
Name | Type | Description | Default |
---|---|---|---|
loacal_execution | bool | Flag to indicate if the run will be executed locally | False |
inputs | dict | Input entity key. | None |
outputs | dict | Outputs mapped. | None |
parameters | dict | Extra parameters for a function. | None |
Run example
run = function.run(
action="job",
inputs={
"dataitem": dataitem.key
},
outputs={
"dataitem": "mapped-name",
"label": "some-label"
}
)
Run methods
output
Get run's output by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name
|
str
|
Key of the result. |
required |
as_key
|
bool
|
If True, return result as key. |
False
|
as_dict
|
bool
|
If True, return result as dictionary. |
False
|
Returns:
Type | Description |
---|---|
Entity | dict | str | None
|
Result. |
outputs
Get run's outputs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
as_key
|
bool
|
If True, return results as keys. |
False
|
as_dict
|
bool
|
If True, return results as dictionaries. |
False
|
Returns:
Type | Description |
---|---|
dict
|
List of output objects. |