Skip to content

Data Pipeline

PandasDataPipeline

A data pipeline class that applies a series of steps to a pandas DataFrame.

Parameters:

Name Type Description Default
steps list

A list of functions or tuples (description, function) representing the steps to be applied.

required
name str

The name of the pipeline. Defaults to "pipeline".

'pipeline'

Attributes:

Name Type Description
steps list

A list of functions or tuples (description, function) representing the steps to be applied.

name str

The name of the pipeline.

Methods:

Name Description
apply

pd.DataFrame) -> pd.DataFrame: Applies the pipeline steps to the given DataFrame.

Source code in model_forge/data/datapipeline.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class PandasDataPipeline:
    """
    A data pipeline class that applies a series of steps to a pandas DataFrame.

    Args:
        steps (list): A list of functions or tuples (description, function) representing the steps to be applied.
        name (str, optional): The name of the pipeline. Defaults to "pipeline".

    Attributes:
        steps (list): A list of functions or tuples (description, function) representing the steps to be applied.
        name (str): The name of the pipeline.

    Methods:
        apply(df: pd.DataFrame) -> pd.DataFrame:
            Applies the pipeline steps to the given DataFrame.

    """

    def __init__(
        self,
        steps,
        name: str = "pipeline",
    ) -> None:
        self.steps = steps
        self.name = name

    def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Applies the pipeline steps to the given DataFrame.

        Args:
            df (pd.DataFrame): The DataFrame to apply the steps to.

        Returns:
            pd.DataFrame: The DataFrame after applying the steps.

        Raises:
            TypeError: If a step function does not accept a pandas DataFrame as an argument.

        """
        for step_number, step in enumerate(self.steps, start=0):
            if isinstance(step, tuple):
                # If step is a tuple, assume it's (description, function)
                _, step_func = step
            else:
                step_func = step

            # Check if step_func expects a pandas DataFrame as its argument
            if not self._function_accepts_dataframe(step_func):
                raise TypeError(
                    f"The step function at step {step_number} does not accept a pandas DataFrame as an argument."
                )
            # Apply the step
            df = step_func(df)

        return df

    def _function_accepts_dataframe(self, func):
        """Check if first argument op function expects pd.DataFrame"""
        sig = inspect.signature(func)
        params = sig.parameters.values()
        first_param = next(iter(params), None)
        return first_param and first_param.annotation is pd.DataFrame

    @safe
    def apply(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Applies the pipeline steps to the given DataFrame.

        Args:
            df (pd.DataFrame): The DataFrame to apply the steps to.

        Returns:
            pd.DataFrame: The DataFrame after applying the steps.

        """
        return self._apply(df)

apply(df)

Applies the pipeline steps to the given DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to apply the steps to.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame after applying the steps.

Source code in model_forge/data/datapipeline.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@safe
def apply(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Applies the pipeline steps to the given DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to apply the steps to.

    Returns:
        pd.DataFrame: The DataFrame after applying the steps.

    """
    return self._apply(df)

safe(fn)

A decorator that creates a safe version of the decorated function. The safe version of the function makes a deep copy of the arguments and keyword arguments before calling the original function. This ensures that the original arguments are not modified during the function call.

Parameters:

Name Type Description Default
fn function

The function to be decorated.

required

Returns:

Name Type Description
function

The safe version of the decorated function.

Source code in model_forge/data/datapipeline.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def safe(fn):
    """
    A decorator that creates a safe version of the decorated function.
    The safe version of the function makes a deep copy of the arguments
    and keyword arguments before calling the original function.
    This ensures that the original arguments are not modified during the function call.

    Args:
        fn (function): The function to be decorated.

    Returns:
        function: The safe version of the decorated function.
    """

    @wraps(fn)
    def wrapper(self, *args, **kwargs):
        cp_args = deepcopy(args)
        cp_kwargs = deepcopy(kwargs)
        res = fn(self, *cp_args, **cp_kwargs)
        return res

    return wrapper