Skip to content

Process the data

Raw data, as ingested from the remote API, is usually not suitable for consumption. We'll define a set of functions to process it.

Define a function to derive the dataset, group information about spires (id, geolocation, address, name...) and save the result in the store:

%%writefile "src/process-spire.py"

from digitalhub_runtime_python import handler

KEYS=['codice spira','longitudine','latitudine','Livello','tipologia','codice','codice arco','codice via','Nome via', 'stato','direzione','angolo','geopoint']

@handler(outputs=["dataset-spire"])
def process(project, di):
    df = di.as_df()
    sdf= df.groupby(['codice spira']).first().reset_index()[KEYS]
    return sdf

Register the function in Core:

process_func = project.new_function(
                         name="process-spire",
                         kind="python",
                         python_version="PYTHON3_9",
                         code_src="src/process-spire.py",
                         handler="process")

Run it locally:

process_run = process_func.run(action="job", inputs={'di': dataset_di.key}, outputs={'dataset-spire': 'dataset-spire'}, local_execution=True)

The results has been saved as an artifact in the data store:

spire_di = project.get_dataitem('dataset-spire')
spire_df = spire_di.as_df()

Now you can view the results with spire_df.head().

Let's transform the data. We will extract a new data frame, where each record contains the identifier of the spire and how much traffic it detected on a specific date and time slot.

A record that looks like this:

data codice spira 00:00-01:00 01:00-02:00 ... Nodo a ordinanza stato codimpsem direzione angolo longitudine latitudine geopoint giorno settimana
2023-03-25 0.127 3.88 4 1 90 58 ... 15108 4000/343434 A 125 NO 355.0 11.370234 44.509137 44.5091367043883, 11.3702339463537 Sabato

Will become 24 records, each containing the spire's code and recorded traffic within each time slot in a specific date:

time codice spira value
2023-03-25 00:00 0.127 3.88 4 1 90
... ... ...

Load the data item into a data frame and remove all columns except for date, spire identifier and recorded values for each time slot:

keys = ['00:00-01:00', '01:00-02:00', '02:00-03:00', '03:00-04:00', '04:00-05:00', '05:00-06:00', '06:00-07:00', '07:00-08:00', '08:00-09:00', '09:00-10:00', '10:00-11:00', '11:00-12:00', '12:00-13:00', '13:00-14:00', '14:00-15:00', '15:00-16:00', '16:00-17:00', '17:00-18:00', '18:00-19:00', '19:00-20:00', '20:00-21:00', '21:00-22:00', '22:00-23:00', '23:00-24:00']
columns=['data','codice spira'] + keys
rdf = dataset_df[columns]

Derive dataset for recorded traffic within each time slot for each spire:

ls = []

for key in keys:
    k = key.split("-")[0]
    xdf = rdf[['data','codice spira',key]]
    xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
    xdf['value'] = xdf[key]
    vdf = xdf[['time','codice spira','value']]
    ls.append(vdf)

edf = pd.concat(ls)

You can verify with edf.head() that the derived dataset matches our goal.

Let's put this into a function:

%%writefile "src/process-measures.py"

from digitalhub_runtime_python import handler
import pandas as pd

KEYS = ['00:00-01:00', '01:00-02:00', '02:00-03:00', '03:00-04:00', '04:00-05:00', '05:00-06:00', '06:00-07:00', '07:00-08:00', '08:00-09:00', '09:00-10:00', '10:00-11:00', '11:00-12:00', '12:00-13:00', '13:00-14:00', '14:00-15:00', '15:00-16:00', '16:00-17:00', '17:00-18:00', '18:00-19:00', '19:00-20:00', '20:00-21:00', '21:00-22:00', '22:00-23:00', '23:00-24:00']
COLUMNS=['data','codice spira']

@handler(outputs=["dataset-measures"])
def process(project, di):
    df = di.as_df()
    rdf = df[COLUMNS+KEYS]
    ls = []
    for key in KEYS:
        k = key.split("-")[0]
        xdf = rdf[COLUMNS + [key]]
        xdf['time'] = xdf.data.apply(lambda x: x+' ' +k)
        xdf['value'] = xdf[key]
        ls.append(xdf[['time','codice spira','value']])
    edf = pd.concat(ls)
    return edf

Register it:

process_measures_func = project.new_function(
                         name="process-measures",
                         kind="python",
                         python_version="PYTHON3_9",
                         code_src="src/process-measures.py",
                         handler="process")

Run it locally:

process_measures_run = process_measures_func.run(action="job", inputs={'di': dataset_di.key}, outputs={'dataset-measures': 'dataset-measures'}, local_execution=True)

Inspect the resulting data artifact:

measures_di = project.get_dataitem('dataset-measures')
measures_df = measures_di.as_df()
measures_df.head()

Now that we have defined three functions to collect data, process it and extract information, let's put them in a pipeline.