Nefertem runtime
The Nefertem runtime allows you to run Nefertem validation, profiling or inference on your data. It is a wrapper around the Nefertem library.
The runtime introduces a function of kind neferetm
and four task of kind validate
, profile
, infer
and metric
.
Prerequisites
Python libraries:
- python >= 3.9
- digitalhub
- digitalhub-data-nefertem
- Nefertem plugins available in the Nefertem repository
If you want to execute Nefertem tasks locally, you need to install digitalhub-core-nefertem package with local
flag:
git clone https://github.com/scc-digitalhub/digitalhub-sdk.git
cd digitalhub-sdk
pip install core/ data/ ./
pip install data/modules/nefertem[local]
Function
The Nefertem runtime introduces a function of kind neferetm
that allows you to execute various tasks on your data.
Nefertem function parameters
When you create a function of kind neferetm
, you need to specify the following mandatory parameters:
project
: the project name with which the function is associated. Only if you do not use the project context to create the function, e.g.project.new_function()
.name
: the name of the functionkind
: the kind of the function, must beneferetm
Optionally, you can specify the following parameters:
uuid
: the uuid of the function (this is automatically generated if not provided). Must be a valid uuid v4.description
: the description of the functionlabels
: the labels of the functiongit_source
: the remote source of the function (git repository)source_code
: pointer to the source code of the functionconstraints
: the constraints of the function to be applied on the data. Valid only forvalidate
taskserror_report
: the error report output format. Valid only forvalidate
tasksembedded
: whether the function is embedded or not. IfTrue
, the function is embedded (all the details are expressed) in the project. IfFalse
, the function is not embedded in the project.
For example:
import digitalhub_core as dh
constraint = {
'constraint': 'type',
'field': 'field-name',
'field_type': 'string',
'name': 'check_country_string',
'resources': ['ref-source'],
'title': '',
'type': 'const-type',
'value': 'string',
'weight': 5
}
function = dh.new_function(name="nefertem-function",
kind="nefertem",
constraints=[constraint])
Task
The Nefertem runtime introduces three tasks of kind validate
, profile
and infer
that allows you to run a Nefertem validation, profiling or inference on your data.
Validate task parameters
When you want to execute a task of kind validate
, you need to pass the following mandatory parameters to the function method run()
:
action
: the action to perform. This must bevalidate
.framework
: the Nefertem framework to be used.inputs
: the dictionary of nefertem resources referenced in the constraint mapped to some dataitem keys. The corresponding dataitem objects must be present in the backend, whether it's local or Core backend.
As optional, you can pass the following task parameters specific for remote execution:
node_selector
: a list of node selectors. The runtime will select the nodes to which the task will be scheduled.volumes
: a list of volumesresources
: a map of resources (CPU, memory, GPU)affinity
: node affinitytolerations
: tolerationsenv
: environment variables to inject in the containersecrets
: list of secrets to inject in the containerbackoff_limit
: the number of retries when a job fails.schedule
: the schedule of the job as a cron expressionreplicas
: the number of replicas of the deployment
For example:
run = function.run("validate",
framework="frictionless",
inputs={"employees": di.key})
Profile task parameters
When you want to execute a task of kind profile
, you need to pass the following mandatory parameters to the function method run()
:
action
: the action to perform. This must beprofile
.framework
: the Nefertem framework to be used.inputs
: the dictionary of nefertem resources referenced mapped to some dataitem keys. The corresponding dataitem objects must be present in the backend, whether it's local or Core backend.
As optional, you can pass the following task parameters specific for remote execution:
node_selector
: a list of node selectors. The runtime will select the nodes to which the task will be scheduled.volumes
: a list of volumesresources
: a map of resources (CPU, memory, GPU)affinity
: node affinitytolerations
: tolerationsenv
: environment variables to inject in the containersecrets
: list of secrets to inject in the containerbackoff_limit
: the number of retries when a job fails.schedule
: the schedule of the job as a cron expressionreplicas
: the number of replicas of the deployment
For example:
run = function.run("profile",
framework="frictionless",
inputs={"employees": di.key})
Infer task parameters
When you want to execute a task of kind infer
, you need to pass the following mandatory parameters to the function method run()
:
action
: the action to perform. This must beinfer
.framework
: the Nefertem framework to be used.inputs
: the dictionary of nefertem resources referenced mapped to some dataitem keys. The corresponding dataitem objects must be present in the backend, whether it's local or Core backend.
As optional, you can pass the following task parameters specific for remote execution:
node_selector
: a list of node selectors. The runtime will select the nodes to which the task will be scheduled.volumes
: a list of volumesresources
: a map of resources (CPU, memory, GPU)affinity
: node affinitytolerations
: tolerationsenv
: environment variables to inject in the containersecrets
: list of secrets to inject in the containerbackoff_limit
: the number of retries when a job fails.schedule
: the schedule of the job as a cron expressionreplicas
: the number of replicas of the deployment
For example:
run = function.run("infer",
framework="frictionless",
inputs={"employees": di.key})
Runtime workflow
The Nefertem runtime execution workflow is the following:
- The runtime fetches the input dataitems by downloading them locally. The runtime tries to get the file from the
path
attribute. At the moment, we support the following path types:http(s)://<url>
s3://<bucket>/<path>
sql://<database>(/<schema-optional>)/<table>
<local-path>
- The runtime creates a Nefertem
DataResource
from the input dataitem. TheDataResource
is a Nefertem object that represents the data to be validated, profiled, inferred or measured. - The runtime then create a Nefertem
run
and execute it. The Nefertemrun
executes three methods based on the task, and produces arun_metadata
report file: - If the task is
validate
:run.validate()
run.log_report()
-> produces aNefertemReport
run.persist_report()
-> produces one or more validation framework reports
- If the task is
profile
:run.profile()
run.log_profile()
-> produces aNefertemProfile
run.persist_profile()
-> produces one or more profiling framework reports
- If the task is
infer
:run.infer()
run.log_schema()
-> produces aNefertemSchema
run.persist_schema()
-> produces one or more inference framework reports
- The runtime then creates an
Artifact
object for each file produced by Nefertem and saves it into the Core backend. It then uploads all the files to the default s3 storage provided. You can collect the artifacts with therun.outputs()
method. In general, the saving path iss3://<bucket-from-env>/<project-name>/artifacts/ntruns/<nefertem-run-uuid>/<file>
.
Snippet example
import digitalhub as dh
# Get or create project
project = dh.get_or_create_project("project-nefertem")
# Create dataitem
url = "https://gist.githubusercontent.com/kevin336/acbb2271e66c10a5b73aacf82ca82784/raw/e38afe62e088394d61ed30884dd50a6826eee0a8/employees.csv"
di = project.new_dataitem(name="employees",
kind="table",
path=url)
# Create function
constraint = {
'constraint': 'type',
'field': 'SALARY',
'field_type': 'number',
'name': 'check_value_integer',
'title': '',
'resources': ['employees'],
'type': 'frictionless',
'value': 'number',
'weight': 5
}
function = project.new_function(name="function-nefertem",
kind="nefertem",
constraints=[constraint])
# Run validate task
run = function.run("validate",
framework="frictionless",
inputs={"employees": di.key})
# Refresh run
run.refresh()