preprocess.pipeline
Preprocess - Pipeline¤
BasePreProcessor (OrderedProcessor)
¤
Shared methods to transform the datasets
The following example demonstrates how to use it.
from haferml.preprocess.ingredients import BaseProcessor, attributes
class DemoPreProcessor(BasePreProcessor):
def __init__(self, config, columns, cutoff_timestamp=None):
super(DemoPreProcessor, self).__init__(
config=config, columns=columns
)
self.cutoff_timestamp=cutoff_timestamp
def merge_datasets(self, datasets):
df_a = datasets["a"]
df_b = datasets["b"]
# 1. We only take data that is later than a certain date
if self.cutoff_timestamp:
filter_a_mask = (
df_a.req_created_at > self.cutoff_timestamp
)
df_a = df_a.loc[filter_a_mask]
# combine dataset a and b
dataset = pd.merge(
df_a,
df_b,
how="left",
on="request_id",
)
return dataset
@attributes(order=1)
def _fix_names(self, dataset):
dataset["names"] = dataset.names.replace(
"Tima", "Tim"
)
@attributes(order=2)
def _convert_requirement_to_bool(self, dataset):
dataset[
"requirements"
] = dataset.requirements.apply(
lambda x: False if pd.isnull(x) else True
)
@attributes(order=12)
def _filter_columns_and_crossing(self, dataset):
# _filter_columns_and_crossing removes unnecessary columns and append crossings
# only keep the specified columns
# the following code also deals with feature crossings
if self.columns:
columns = list(set(self.columns))
crossing = []
features_ex = []
for col in columns:
if "__" in col:
crossing.append(col)
else:
features_ex.append(col)
dataset = dataset[features_ex]
for fc in crossing:
fc_cols = fc.split("__")
fc_series = dataset[fc_cols[0]]
for fc_col in fc_cols[1:]:
fc_series = fc_series * dataset[fc_col]
dataset[fc] = fc_series
dp = DemoPreProcessor(config={}, columns=['names', 'requirements', 'names__requirements'])
dataset = {
"a": pd.DataFrame([{"names": "Tima Cook", "requirements": "I need it"}]),
"b": pd.DataFrame([{"names": "Time Cook", "requirements": None}])
}
dp.run(dataset)
merge_datasets(self, datasets)
¤
merge_datasets merges the datasets into one singular dataframe to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datasets |
dict |
dictionary that contains the dataframes |
required |
Source code in haferml/preprocess/pipeline.py
def merge_datasets(self, datasets):
"""
merge_datasets merges the datasets into one singular dataframe to be processed.
:param datasets: dictionary that contains the dataframes
:type datasets: dict
"""
raise NotImplementedError("Please implement this method!")
run(self, datasets, **params)
¤
run connects the transforms into pipelines
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datasets |
input datasets as list or dict of single dataframe |
required | |
params |
for example, |
{} |
Source code in haferml/preprocess/pipeline.py
def run(self, datasets, **params):
"""
run connects the transforms into pipelines
:param datasets: input datasets as list or dict of single dataframe
:param params: for example, `merge=True` can be used to indicate whether and how to merge datasets which will run the method `merge_datasets(datasets)`
"""
if params.get("merge") is True:
dataframe = self.merge_datasets(datasets)
elif isinstance(datasets, dict):
logger.warning(
"No specific merge methods specified\n"
"Auto concating all the datasets"
)
dataframe = pd.concat(datasets.values())
elif isinstance(datasets, list):
logger.warning(
"No specific merge methods specified\n"
"Auto concating all the datasets"
)
dataframe = pd.concat(datasets)
elif isinstance(datasets, pd.DataFrame):
logger.info(
"Input dataset is a single dataframe, " "making a copy for safety"
)
dataframe = datasets.copy()
else:
raise TypeError("Input datasets should be dataframe or list of dataframes")
# Go through the transforms
for t in self.transforms:
logger.info(f"Performing {t} ...")
dataframe = self.transforms[t](dataframe)
logger.info(f"{t} is done.")
return dataframe