Skip to content

Entity and methods

Workflow

Bases: ExecutableEntity

A class representing a workflow.

Source code in digitalhub_core/entities/workflow/entity.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class Workflow(ExecutableEntity):
    """
    A class representing a workflow.
    """

    ENTITY_TYPE = EntityTypes.WORKFLOW.value

    def __init__(
        self,
        project: str,
        name: str,
        uuid: str,
        kind: str,
        metadata: Metadata,
        spec: WorkflowSpec,
        status: WorkflowStatus,
        user: str | None = None,
    ) -> None:
        super().__init__(project, name, uuid, kind, metadata, spec, status, user)

        self.spec: WorkflowSpec
        self.status: WorkflowStatus

    ##############################
    #  Workflow Methods
    ##############################

    def run(self, **kwargs) -> Run:
        """
        Run workflow.

        Parameters
        ----------
        **kwargs : dict
            Keyword arguments passed to Task and Run builders.

        Returns
        -------
        Run
            Run instance.
        """

        # Get kind registry
        kind_reg = get_kind_registry(self.kind)

        # Get task and run kind
        task_kind = kind_reg.get_task_kind_from_action(action="pipeline")
        run_kind = kind_reg.get_run_kind()

        # Create or update new task
        task = self.new_task(task_kind, **kwargs)

        # Raise error if execution is not done by DHCore backend
        if self._context().local:
            raise BackendError("Cannot run workflow with local backend.")

        return task.run(run_kind, local_execution=False, **kwargs)

    def _get_workflow_string(self) -> str:
        """
        Get workflow string.

        Returns
        -------
        str
            Workflow string.
        """
        return super()._get_executable_string()

    def import_tasks(self, tasks: list[dict]) -> None:
        """
        Import tasks from yaml.

        Parameters
        ----------
        tasks : list[dict]
            List of tasks to import.

        Returns
        -------
        None
        """
        # Loop over tasks list, in the case where the function
        # is imported from local file.
        for task in tasks:
            # If task is not a dictionary, skip it
            if not isinstance(task, dict):
                continue

            # Create a new object from dictionary.
            # the form in which tasks are stored in function
            # status
            task_obj = task_from_dict(task)

            # Try to save it in backend to been able to use
            # it for launching runs. In fact, tasks must be
            # persisted in backend to be able to launch runs.
            # Ignore if task already exists
            try:
                task_obj.save()
            except BackendError:
                pass

            # Set task if function is the same. Overwrite
            # status task dict with the new task object
            if task_obj.spec.function == self._get_workflow_string():
                self._tasks[task_obj.kind] = task_obj

    def new_task(self, task_kind: str, **kwargs) -> Task:
        """
        Create new task.

        Parameters
        ----------
        task_kind : str
            Kind the object.
        **kwargs : dict
            Keyword arguments.

        Returns
        -------
        Task
            New task.
        """
        # Override kwargs
        kwargs["project"] = self.project
        kwargs["function"] = self._get_workflow_string()
        kwargs["kind"] = task_kind

        # Create object instance
        task = task_from_parameters(**kwargs)

        exists, task_id = self._check_task_in_backend(task_kind)

        # Save or update task
        if not exists:
            task.save()
        else:
            task.id = task_id
            task.save(update=True)

        self._tasks[task_kind] = task
        return task

    def update_task(self, kind: str, **kwargs) -> None:
        """
        Update task.

        Parameters
        ----------
        kind : str
            Kind the object.
        **kwargs : dict
            Keyword arguments.

        Returns
        -------
        None

        Raises
        ------
        EntityError
            If task does not exist.
        """
        self._raise_if_not_exists(kind)

        # Update kwargs
        kwargs["project"] = self.project
        kwargs["kind"] = kind
        kwargs["function"] = self._get_workflow_string()
        kwargs["uuid"] = self._tasks[kind].id

        # Update task
        task = task_from_parameters(**kwargs)
        task.save(update=True)
        self._tasks[kind] = task

    def get_task(self, kind: str) -> Task:
        """
        Get task.

        Parameters
        ----------
        kind : str
            Kind the object.

        Returns
        -------
        Task
            Task.

        Raises
        ------
        EntityError
            If task is not created.
        """
        self._raise_if_not_exists(kind)
        return self._tasks[kind]

    def delete_task(self, kind: str, cascade: bool = True) -> None:
        """
        Delete task.

        Parameters
        ----------
        kind : str
            Kind the object.
        cascade : bool
            Flag to determine if cascade deletion must be performed.

        Returns
        -------
        None

        Raises
        ------
        EntityError
            If task is not created.
        """
        self._raise_if_not_exists(kind)
        delete_task(self._tasks[kind].key, cascade=cascade)
        self._tasks.pop(kind, None)

    def _check_task_in_backend(self, kind: str) -> tuple[bool, str | None]:
        """
        Check if task exists in backend.

        Parameters
        ----------
        kind : str
            Kind the object.

        Returns
        -------
        tuple[bool, str | None]
            Flag to determine if task exists in backend and ID if exists.
        """
        # List tasks from backend filtered by function and kind
        params = {"function": self._get_workflow_string(), "kind": kind}
        objs = list_entity_api_ctx(self.project, EntityTypes.TASK.value, params=params)
        try:
            return True, objs[0]["id"]
        except IndexError:
            return False, None

    def _raise_if_not_exists(self, kind: str) -> None:
        """
        Raise error if task is not created.

        Parameters
        ----------
        kind : str
            Kind the object.

        Returns
        -------
        None

        Raises
        ------
        EntityError
            If task does not exist.
        """
        if self._tasks.get(kind) is None:
            raise EntityError("Task does not exist.")

delete_task(kind, cascade=True)

Delete task.

Parameters:

Name Type Description Default
kind str

Kind the object.

required
cascade bool

Flag to determine if cascade deletion must be performed.

True

Returns:

Type Description
None

Raises:

Type Description
EntityError

If task is not created.

Source code in digitalhub_core/entities/workflow/entity.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def delete_task(self, kind: str, cascade: bool = True) -> None:
    """
    Delete task.

    Parameters
    ----------
    kind : str
        Kind the object.
    cascade : bool
        Flag to determine if cascade deletion must be performed.

    Returns
    -------
    None

    Raises
    ------
    EntityError
        If task is not created.
    """
    self._raise_if_not_exists(kind)
    delete_task(self._tasks[kind].key, cascade=cascade)
    self._tasks.pop(kind, None)

get_task(kind)

Get task.

Parameters:

Name Type Description Default
kind str

Kind the object.

required

Returns:

Type Description
Task

Task.

Raises:

Type Description
EntityError

If task is not created.

Source code in digitalhub_core/entities/workflow/entity.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def get_task(self, kind: str) -> Task:
    """
    Get task.

    Parameters
    ----------
    kind : str
        Kind the object.

    Returns
    -------
    Task
        Task.

    Raises
    ------
    EntityError
        If task is not created.
    """
    self._raise_if_not_exists(kind)
    return self._tasks[kind]

import_tasks(tasks)

Import tasks from yaml.

Parameters:

Name Type Description Default
tasks list[dict]

List of tasks to import.

required

Returns:

Type Description
None
Source code in digitalhub_core/entities/workflow/entity.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def import_tasks(self, tasks: list[dict]) -> None:
    """
    Import tasks from yaml.

    Parameters
    ----------
    tasks : list[dict]
        List of tasks to import.

    Returns
    -------
    None
    """
    # Loop over tasks list, in the case where the function
    # is imported from local file.
    for task in tasks:
        # If task is not a dictionary, skip it
        if not isinstance(task, dict):
            continue

        # Create a new object from dictionary.
        # the form in which tasks are stored in function
        # status
        task_obj = task_from_dict(task)

        # Try to save it in backend to been able to use
        # it for launching runs. In fact, tasks must be
        # persisted in backend to be able to launch runs.
        # Ignore if task already exists
        try:
            task_obj.save()
        except BackendError:
            pass

        # Set task if function is the same. Overwrite
        # status task dict with the new task object
        if task_obj.spec.function == self._get_workflow_string():
            self._tasks[task_obj.kind] = task_obj

new_task(task_kind, **kwargs)

Create new task.

Parameters:

Name Type Description Default
task_kind str

Kind the object.

required
**kwargs dict

Keyword arguments.

{}

Returns:

Type Description
Task

New task.

Source code in digitalhub_core/entities/workflow/entity.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def new_task(self, task_kind: str, **kwargs) -> Task:
    """
    Create new task.

    Parameters
    ----------
    task_kind : str
        Kind the object.
    **kwargs : dict
        Keyword arguments.

    Returns
    -------
    Task
        New task.
    """
    # Override kwargs
    kwargs["project"] = self.project
    kwargs["function"] = self._get_workflow_string()
    kwargs["kind"] = task_kind

    # Create object instance
    task = task_from_parameters(**kwargs)

    exists, task_id = self._check_task_in_backend(task_kind)

    # Save or update task
    if not exists:
        task.save()
    else:
        task.id = task_id
        task.save(update=True)

    self._tasks[task_kind] = task
    return task

run(**kwargs)

Run workflow.

Parameters:

Name Type Description Default
**kwargs dict

Keyword arguments passed to Task and Run builders.

{}

Returns:

Type Description
Run

Run instance.

Source code in digitalhub_core/entities/workflow/entity.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def run(self, **kwargs) -> Run:
    """
    Run workflow.

    Parameters
    ----------
    **kwargs : dict
        Keyword arguments passed to Task and Run builders.

    Returns
    -------
    Run
        Run instance.
    """

    # Get kind registry
    kind_reg = get_kind_registry(self.kind)

    # Get task and run kind
    task_kind = kind_reg.get_task_kind_from_action(action="pipeline")
    run_kind = kind_reg.get_run_kind()

    # Create or update new task
    task = self.new_task(task_kind, **kwargs)

    # Raise error if execution is not done by DHCore backend
    if self._context().local:
        raise BackendError("Cannot run workflow with local backend.")

    return task.run(run_kind, local_execution=False, **kwargs)

update_task(kind, **kwargs)

Update task.

Parameters:

Name Type Description Default
kind str

Kind the object.

required
**kwargs dict

Keyword arguments.

{}

Returns:

Type Description
None

Raises:

Type Description
EntityError

If task does not exist.

Source code in digitalhub_core/entities/workflow/entity.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def update_task(self, kind: str, **kwargs) -> None:
    """
    Update task.

    Parameters
    ----------
    kind : str
        Kind the object.
    **kwargs : dict
        Keyword arguments.

    Returns
    -------
    None

    Raises
    ------
    EntityError
        If task does not exist.
    """
    self._raise_if_not_exists(kind)

    # Update kwargs
    kwargs["project"] = self.project
    kwargs["kind"] = kind
    kwargs["function"] = self._get_workflow_string()
    kwargs["uuid"] = self._tasks[kind].id

    # Update task
    task = task_from_parameters(**kwargs)
    task.save(update=True)
    self._tasks[kind] = task