Skip to content

Entity and methods

Dataitem

Bases: MaterialEntity

A class representing a dataitem.

Source code in digitalhub_data/entities/dataitem/entity/_base.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class Dataitem(MaterialEntity):
    """
    A class representing a dataitem.
    """

    ENTITY_TYPE = EntityTypes.DATAITEM.value

    def __init__(
        self,
        project: str,
        name: str,
        uuid: str,
        kind: str,
        metadata: Metadata,
        spec: DataitemSpec,
        status: DataitemStatus,
        user: str | None = None,
    ) -> None:
        super().__init__(project, name, uuid, kind, metadata, spec, status, user)
        self.spec: DataitemSpec
        self.status: DataitemStatus

    ##############################
    #  Private helper methods
    ##############################

    @staticmethod
    def _get_extension(path: str, file_format: str | None = None) -> str:
        """
        Get extension of path.

        Parameters
        ----------
        path : str
            Path to get extension from.
        file_format : str
            File format.

        Returns
        -------
        str
            File extension.

        Raises
        ------
        EntityError
            If file format is not supported.
        """
        if file_format is not None:
            return file_format

        scheme = map_uri_scheme(path)
        if scheme == "sql":
            return "parquet"

        ext = Path(path).suffix[1:]
        if ext is not None:
            return ext
        raise EntityError("Unknown file format. Only csv and parquet are supported.")

DataitemDataitem

Bases: Dataitem

Dataitem dataitem.

Source code in digitalhub_data/entities/dataitem/entity/dataitem.py
6
7
8
9
class DataitemDataitem(Dataitem):
    """
    Dataitem dataitem.
    """

DataitemIceberg

Bases: Dataitem

Iceberg dataitem.

Source code in digitalhub_data/entities/dataitem/entity/iceberg.py
4
5
6
7
class DataitemIceberg(Dataitem):
    """
    Iceberg dataitem.
    """

DataitemTable

Bases: Dataitem

Table dataitem.

Source code in digitalhub_data/entities/dataitem/entity/table.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class DataitemTable(Dataitem):

    """
    Table dataitem.
    """

    def as_df(
        self,
        file_format: str | None = None,
        engine: str | None = None,
        clean_tmp_path: bool = True,
        **kwargs,
    ) -> Any:
        """
        Read dataitem file (csv or parquet) as a DataFrame from spec.path.
        If the dataitem is not local, it will be downloaded to a temporary
        folder named tmp_dir in the project context folder.
        If clean_tmp_path is True, the temporary folder will be deleted after the
        method is executed.
        It's possible to pass additional arguments to the this function. These
        keyword arguments will be passed to the DataFrame reader function such as
        pandas's read_csv or read_parquet.

        Parameters
        ----------
        file_format : str
            Format of the file. (Supported csv and parquet).
        engine : str
            Dataframe framework, by default pandas.
        clean_tmp_path : bool
            If True, the temporary folder will be deleted.
        **kwargs : dict
            Keyword arguments passed to the read_df function.

        Returns
        -------
        Any
            DataFrame.
        """
        if engine is None:
            engine = "pandas"
        try:
            if check_local_path(self.spec.path):
                tmp_dir = None
                data_path = self.spec.path
            else:
                tmp_dir = self._context().root / "tmp_data"
                tmp_dir.mkdir(parents=True, exist_ok=True)
                data_path = self.download(destination=str(tmp_dir), overwrite=True)

            if Path(data_path).is_dir():
                files = [str(i) for i in Path(data_path).rglob("*") if i.is_file()]
                checker = files[0]
            else:
                checker = data_path

            extension = self._get_extension(checker, file_format)
            datastore = get_datastore("")

            return datastore.read_df(data_path, extension, engine, **kwargs)

        except Exception as e:
            raise e

        finally:
            # Delete tmp folder
            self._clean_tmp_path(tmp_dir, clean_tmp_path)

    def write_df(
        self,
        df: Any,
        extension: str | None = None,
        **kwargs,
    ) -> str:
        """
        Write DataFrame as parquet/csv/table into dataitem spec.path.
        keyword arguments will be passed to the DataFrame reader function such as
        pandas's to_csv or to_parquet.

        Parameters
        ----------
        df : Any
            DataFrame to write.
        extension : str
            Extension of the file.
        **kwargs : dict
            Keyword arguments passed to the write_df function.

        Returns
        -------
        str
            Path to the written dataframe.
        """
        datastore = get_datastore(self.spec.path)
        return datastore.write_df(df, self.spec.path, extension=extension, **kwargs)

    @staticmethod
    def _clean_tmp_path(pth: Path | None, clean: bool) -> None:
        """
        Clean temporary path.

        Parameters
        ----------
        pth : Path | None
            Path to clean.
        clean : bool
            If True, the path will be cleaned.

        Returns
        -------
        None
        """
        if pth is not None and clean:
            shutil.rmtree(pth)

as_df(file_format=None, engine=None, clean_tmp_path=True, **kwargs)

Read dataitem file (csv or parquet) as a DataFrame from spec.path. If the dataitem is not local, it will be downloaded to a temporary folder named tmp_dir in the project context folder. If clean_tmp_path is True, the temporary folder will be deleted after the method is executed. It's possible to pass additional arguments to the this function. These keyword arguments will be passed to the DataFrame reader function such as pandas's read_csv or read_parquet.

Parameters:

Name Type Description Default
file_format str

Format of the file. (Supported csv and parquet).

None
engine str

Dataframe framework, by default pandas.

None
clean_tmp_path bool

If True, the temporary folder will be deleted.

True
**kwargs dict

Keyword arguments passed to the read_df function.

{}

Returns:

Type Description
Any

DataFrame.

Source code in digitalhub_data/entities/dataitem/entity/table.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def as_df(
    self,
    file_format: str | None = None,
    engine: str | None = None,
    clean_tmp_path: bool = True,
    **kwargs,
) -> Any:
    """
    Read dataitem file (csv or parquet) as a DataFrame from spec.path.
    If the dataitem is not local, it will be downloaded to a temporary
    folder named tmp_dir in the project context folder.
    If clean_tmp_path is True, the temporary folder will be deleted after the
    method is executed.
    It's possible to pass additional arguments to the this function. These
    keyword arguments will be passed to the DataFrame reader function such as
    pandas's read_csv or read_parquet.

    Parameters
    ----------
    file_format : str
        Format of the file. (Supported csv and parquet).
    engine : str
        Dataframe framework, by default pandas.
    clean_tmp_path : bool
        If True, the temporary folder will be deleted.
    **kwargs : dict
        Keyword arguments passed to the read_df function.

    Returns
    -------
    Any
        DataFrame.
    """
    if engine is None:
        engine = "pandas"
    try:
        if check_local_path(self.spec.path):
            tmp_dir = None
            data_path = self.spec.path
        else:
            tmp_dir = self._context().root / "tmp_data"
            tmp_dir.mkdir(parents=True, exist_ok=True)
            data_path = self.download(destination=str(tmp_dir), overwrite=True)

        if Path(data_path).is_dir():
            files = [str(i) for i in Path(data_path).rglob("*") if i.is_file()]
            checker = files[0]
        else:
            checker = data_path

        extension = self._get_extension(checker, file_format)
        datastore = get_datastore("")

        return datastore.read_df(data_path, extension, engine, **kwargs)

    except Exception as e:
        raise e

    finally:
        # Delete tmp folder
        self._clean_tmp_path(tmp_dir, clean_tmp_path)

write_df(df, extension=None, **kwargs)

Write DataFrame as parquet/csv/table into dataitem spec.path. keyword arguments will be passed to the DataFrame reader function such as pandas's to_csv or to_parquet.

Parameters:

Name Type Description Default
df Any

DataFrame to write.

required
extension str

Extension of the file.

None
**kwargs dict

Keyword arguments passed to the write_df function.

{}

Returns:

Type Description
str

Path to the written dataframe.

Source code in digitalhub_data/entities/dataitem/entity/table.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def write_df(
    self,
    df: Any,
    extension: str | None = None,
    **kwargs,
) -> str:
    """
    Write DataFrame as parquet/csv/table into dataitem spec.path.
    keyword arguments will be passed to the DataFrame reader function such as
    pandas's to_csv or to_parquet.

    Parameters
    ----------
    df : Any
        DataFrame to write.
    extension : str
        Extension of the file.
    **kwargs : dict
        Keyword arguments passed to the write_df function.

    Returns
    -------
    str
        Path to the written dataframe.
    """
    datastore = get_datastore(self.spec.path)
    return datastore.write_df(df, self.spec.path, extension=extension, **kwargs)

dataitem_from_dict(obj)

Create a new object from dictionary.

Parameters:

Name Type Description Default
obj dict

Dictionary to create object from.

required

Returns:

Type Description
Dataitem

Object instance.

Source code in digitalhub_data/entities/dataitem/builder.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def dataitem_from_dict(obj: dict) -> Dataitem:
    """
    Create a new object from dictionary.

    Parameters
    ----------
    obj : dict
        Dictionary to create object from.

    Returns
    -------
    Dataitem
        Object instance.
    """
    kind = obj.get("kind")
    cls = _choose_dataitem_type(kind)
    return cls.from_dict(obj)

dataitem_from_parameters(project, name, kind, uuid=None, description=None, labels=None, embedded=True, path=None, **kwargs)

Create a new object.

Parameters:

Name Type Description Default
project str

Project name.

required
name str

Object name.

required
kind str

Kind the object.

required
uuid str

ID of the object (UUID4, e.g. 40f25c4b-d26b-4221-b048-9527aff291e2).

None
description str

Description of the object (human readable).

None
labels list[str]

List of labels.

None
embedded bool

Flag to determine if object spec must be embedded in project spec.

True
path str

Object path on local file system or remote storage. It is also the destination path of upload() method.

None
**kwargs dict

Spec keyword arguments.

{}

Returns:

Type Description
Dataitem

Object instance.

Source code in digitalhub_data/entities/dataitem/builder.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def dataitem_from_parameters(
    project: str,
    name: str,
    kind: str,
    uuid: str | None = None,
    description: str | None = None,
    labels: list[str] | None = None,
    embedded: bool = True,
    path: str | None = None,
    **kwargs,
) -> Dataitem:
    """
    Create a new object.

    Parameters
    ----------
    project : str
        Project name.
    name : str
        Object name.
    kind : str
        Kind the object.
    uuid : str
        ID of the object (UUID4, e.g. 40f25c4b-d26b-4221-b048-9527aff291e2).
    description : str
        Description of the object (human readable).
    labels : list[str]
        List of labels.
    embedded : bool
        Flag to determine if object spec must be embedded in project spec.
    path : str
        Object path on local file system or remote storage. It is also the destination path of upload() method.
    **kwargs : dict
        Spec keyword arguments.

    Returns
    -------
    Dataitem
        Object instance.
    """
    if path is None:
        raise EntityError("Dataitem path must be provided")
    name = build_name(name)
    uuid = build_uuid(uuid)
    metadata = build_metadata(
        kind,
        project=project,
        name=name,
        version=uuid,
        description=description,
        labels=labels,
        embedded=embedded,
    )
    spec = build_spec(
        kind,
        path=path,
        **kwargs,
    )
    status = build_status(kind)
    cls = _choose_dataitem_type(kind)
    return cls(
        project=project,
        name=name,
        uuid=uuid,
        kind=kind,
        metadata=metadata,
        spec=spec,
        status=status,
    )