Skip to content

etl.transform.pipeline

ETL - Transform - Pipeline¤

Transformer ¤

Base Transformer of a single data record.

This transformer can use a schema specification schema and also built-in methods to transform the data.

A schema can have the following structure.

schema = [
    {
        "column_name": "abc",
        "type": "string"
    },
    {
        "column_name": "def",
        "type": "list"
    }
]

The "type" values will be used by the universal transformer method _universal_transformer.

A method with the name like _transformer__abc will be used to transform the column abc. If such method exists, this method will be used otherwise the universal transformer will be used and the transformation will be done based on the "type" of the column specified in the schema.

_get_transformers(self) private ¤

_get_transformers extracts the list of transformers

Source code in haferml/etl/transform/pipeline.py
def _get_transformers(self):
    """
    _get_transformers extracts the list of transformers
    """
    re_transformer_name = re.compile("^_transformer__(.*?)$")

    transformers = {}
    all_methods = dict(inspect.getmembers(self))
    for i in all_methods:
        if i.startswith("_transformer__"):
            transformer_name = re_transformer_name.findall(i)[0]
            transformer_method_i = all_methods.get(i)
            transformers[transformer_name] = transformer_method_i

    logger.debug("All methods: {}".format(all_methods))
    logger.info("All predefined transformers: {}".format(transformers))

    for i in self.transformer_schema:
        logger.info("this transformer schema: {}".format(i))
        i_val = self.transformer_schema.get(i, {})
        if i in transformers:
            i_val["transformer"] = transformers.get(i)
            logger.info("Has predefined transformer for {}".format(i))
        else:
            logger.info(
                "Using default transformer for {}; format: {}".format(
                    i, i_val.get("type")
                )
            )
            to_format_type = i_val.get("type")
            i_val["transformer"] = self._universal_transformer(
                to_format=to_format_type
            )

        self.transformer_schema[i] = i_val

_schema_to_utils(self) private ¤

_schema_to_utils converts some utility schemas using the input full schema

Source code in haferml/etl/transform/pipeline.py
def _schema_to_utils(self):
    """
    _schema_to_utils converts some utility schemas using the input full schema
    """
    self.column_rename_schema = {
        i.get("column_name"): i.get("column_name") for i in self.schema
    }

    # build transformer schema
    self.transformer_schema = {
        i.get("column_name"): {"type": i.get("type")} for i in self.schema
    }
    self._get_transformers()  # enhances the schema with the transformer function

_universal_transformer(to_format, from_format=None) private staticmethod ¤

_general_transformer is to be used if a specific transformer of the column is not found

Source code in haferml/etl/transform/pipeline.py
@staticmethod
def _universal_transformer(to_format, from_format=None):
    """
    _general_transformer is to be used if a specific transformer of the column is not found
    """

    def transformer(data):
        """
        transformer is the actual transformer
        """
        if pd.isnull(data):
            return None

        logger.debug(f"Transforming {data} to format {to_format}")
        if to_format.lower() in ("str", "string"):
            try:
                res = str(data)
                res = res.strip()
            except Exception as e:
                raise Exception("Could not convert {} to str".format(data))
        elif to_format.lower() == "int":
            if isinstance(data, str):
                if ("." in data) or ("," in data):
                    data = wlg.misc.eu_float_string_to_float(data)
            try:
                res = int(float(data))
            except Exception as e:
                raise Exception("Could not convert {} to float->int".format(data))
        elif to_format.lower() == "float":
            if isinstance(data, str):
                if ("." in data) or ("," in data):
                    data = wlg.misc.eu_float_string_to_float(data)
            try:
                res = float(data)
            except Exception as e:
                raise Exception("Could not convert {} to float".format(data))
        elif to_format.lower() == "datetime":
            res = wlg.datetime.convert_to_datetime(data, dayfirst=False)
        elif to_format.lower() == "date":
            res = wlg.datetime.convert_to_date(data)
        elif to_format.lower() == "bool":
            res = wlg.misc.convert_to_bool(data)
        elif to_format.lower() == "list":
            res = wlg.misc.convert_str_repr_to_list(data)
        else:
            raise Exception(
                f"Can not transform {data}; No transformer defined for the format: {to_format}"
            )

        return res

    return transformer

transform(self, record) ¤

transform transforms the json (list of dict data) into standardized format

Source code in haferml/etl/transform/pipeline.py
def transform(self, record):
    """
    transform transforms the json (list of dict data) into standardized format
    """

    # sometime the transformations requires other fields
    # we need to set self.record to access all the fields
    self.record = record.copy()
    try:
        for key, val in record.items():
            try:
                val = self.transformer_schema[key]["transformer"](val)
                record[key] = val
            except Exception as e:
                logger.error(
                    "Failed to transform key, val: {}, {}; schema is {}; e {}".format(
                        key, val, self.transformer_schema[key], e
                    )
                )
    except Exception as e:
        raise Exception(
            "Failed to transform record: {};error is {}".format(record, e)
        )

    return record