Source code for john_toolbox.preprocessing.pandas_transformers

from typing import Callable, Dict, List
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from scipy.sparse import issparse
from john_toolbox.preprocessing.utils import compute_in_parallel

import logging

logger = logging.getLogger(__name__)


[docs]class SelectColumnsTransformer(BaseEstimator, TransformerMixin): """ This class aims to keep desired columns in Sklearn pipeline. See Also -------- DropColumnsTransformer : Drop columns from DataFrame. EncoderTransformer : Drop columns from DataFrame. FunctionTransformer : Use of standard Encoder from sklearn. DebugTransformer : Keep track of information about DataFrame between steps. """ def __init__(self, columns: List[str] = None): """ Parameters ---------- columns : List[str] List of column name to keep. """ self.columns = columns def transform(self, X, **transform_params): copy_df = X[self.columns].copy() return copy_df def fit(self, X, y=None, **fit_params): return self
[docs]class DebugTransformer(BaseEstimator, TransformerMixin): """ This class save information between steps in sklearn pipeline and is used for debug purposes. See Also -------- SelectColumnsTransformer : Keep columns from DataFrame. DropColumnsTransformer : Drop columns from DataFrame. EncoderTransformer : Drop columns from DataFrame. FunctionTransformer : Use of standard Encoder from sklearn. """ def __init__(self): self.shape = None self.columns = None self.dtypes = None def transform(self, X: pd.DataFrame) -> pd.DataFrame: logger.info(f"SHAPE : {X.shape}") logger.info(f"COLUMNS : {X.columns}") self.columns = X.columns self.dtypes = X.dtypes return X def fit(self, X, y=None, **fit_params): return self
[docs]class DropColumnsTransformer(BaseEstimator, TransformerMixin): """ This class let you remove a column in Sklearn pipeline. See Also -------- SelectColumnsTransformer : Keep columns from DataFrame. EncoderTransformer : Drop columns from DataFrame. FunctionTransformer : Use of standard Encoder from sklearn. DebugTransformer : Keep track of information about DataFrame between steps. """ def __init__(self, columns_to_drop: List[str] = None): """ Parameters ---------- columns_to_drop : List[str] List of column to drop. """ self.columns_to_drop = columns_to_drop def transform(self, X: pd.DataFrame, **transform_params) -> pd.DataFrame: copy_df = X.copy() copy_df = copy_df.drop(self.columns_to_drop, axis=1) return copy_df def fit(self, X, y=None, **fit_params): return self
[docs]class EncoderTransformer(BaseEstimator, TransformerMixin): """ This class let you use standard Encoder from sklearn. Attributes ---------- encoder : Standard sklearn Encoder. For example, you can provide OneHotEncoder. column : str, Optional Column to transform with the encoder. encoder_args : Dict, Optional Arguments to pass to the sklearn encoder. new_cols_prefix : str, Optional If you provide value, all generated column will have a this value as prefix. is_drop_input_col : bool, Optional, default True the old column will be removed if self.column != new_cols_prefix and is_drop_input_col == True or if self.column == new_cols_prefix See Also -------- SelectColumnsTransformer : Keep columns from DataFrame. DropColumnsTransformer : Drop columns from DataFrame. FunctionTransformer : Apply function to a column. DebugTransformer : Keep track of information about DataFrame between steps. """ def __init__( self, encoder, column: str = None, encoder_args: Dict = None, new_cols_prefix: str = None, is_drop_input_col: bool = True, ): """ Parameters ---------- encoder : Standard sklearn Encoder. For example, you can provide OneHotEncoder. column : str, Optional Column to transform with the encoder. encoder_args : Dict, Optional Arguments to pass to the sklearn encoder. new_cols_prefix : str, Optional If you provide value, all generated column will have a this value as prefix. is_drop_input_col : bool, Optional, default True the old column will be removed if self.column != new_cols_prefix and is_drop_input_col == True or if self.column == new_cols_prefix """ self.encoder_args = encoder_args if self.encoder_args is None: self.encoder_args = {} self.column = column self.encoder = encoder(**self.encoder_args) self.new_cols_prefix = new_cols_prefix self.is_drop_input_col = is_drop_input_col if self.new_cols_prefix is None: self.new_cols_prefix = ( f"{self.column}_{self.encoder.__class__.__name__}_" ) def fit(self, X: pd.DataFrame, y=None, **fit_params): self.encoder.fit(X[self.column].to_numpy().reshape(-1, 1)) return self def transform(self, X: pd.DataFrame, **transform_params) -> pd.DataFrame: logger.debug(f"name encoder : {self.encoder.__class__.__name__}") copy_df = X.copy() encoder_result = self.encoder.transform( copy_df[self.column].to_numpy().reshape(-1, 1) ) if issparse(encoder_result): encoder_result = encoder_result.toarray() logger.debug(f"SHAPE encoder_result_array : {encoder_result.shape}") if len(encoder_result.shape) >= 2: new_cols_size = encoder_result.shape[1] else: new_cols_size = 1 try: new_cols = self.encoder.get_feature_names_out([self.column]) if len(new_cols) > 1: # usefull only in case encoder create multiple columns like one hot encoding new_cols = [f"{self.new_cols_prefix}_{col}" for col in new_cols] logger.debug(f"new_cols = {new_cols}") except AttributeError: new_cols = ( [ f"{self.new_cols_prefix}_{idx}" for idx in range(new_cols_size) ] if new_cols_size > 1 else [self.new_cols_prefix] ) logger.debug(f"new_cols = {new_cols}") encoder_result_df = pd.DataFrame(data=encoder_result, columns=new_cols) encoder_result_df.index = copy_df.index if (self.column != self.new_cols_prefix and self.is_drop_input_col) or ( self.column == self.new_cols_prefix ): copy_df = copy_df.drop(self.column, axis=1) output_df = pd.concat([copy_df, encoder_result_df], axis=1) return output_df
[docs]class FunctionTransformer(BaseEstimator): """Apply function Transformer. For example, please refer to : https://github.com/nguyenanht/john-toolbox/blob/develop/notebooks/tutorial1%20-%20PandasPipeline%20%26%20PandasTransformer.ipynb from https://stackoverflow.com/questions/42844457/scikit-learn-applying-an-arbitary-function-as-part-of-a-pipeline Attributes ---------- column : str, Optional Column to transform with the encoder. func : Callable Function to apply. dict_args: Dict Arguments to pass to the function. mode : str, Optional, default apply_by_multiprocessing Mode accepted : `apply_by_multiprocessing`, `apply` or `vectorized` `apply_by_multiprocessing`: apply the function by using total_number of cpu minus one `apply`: apply in standard way the function. `vectorized`: vectorise an operation. For example add 2 columns. return_col: str, Optional, default=column Name of the output. drop_input_col: str, default=False Drop the input column. See Also -------- SelectColumnsTransformer : Keep columns from DataFrame. DropColumnsTransformer : Drop columns from DataFrame. EncoderTransformer : Drop columns from DataFrame. DebugTransformer : Keep track of information about DataFrame between steps. """ def __init__( self, column: str, func: Callable, dict_args: Dict, mode: str = "apply_by_multiprocessing", return_col: str = None, drop_input_col: bool = False, ): self.column = column self.func = func self.dict_args = dict_args self.return_col = return_col self.mode = mode self.drop_input_col = drop_input_col # if None, we replace the value of the column where we apply the function if return_col is None: self.return_col = self.column def fit(self, *args, **kwargs): return self def transform(self, X: pd.DataFrame, *args, **kwargs) -> pd.DataFrame: copy_df = X.copy() if self.mode == "apply_by_multiprocessing": copy_df[self.return_col] = compute_in_parallel( series=X[self.column], func=self.func, **self.dict_args ) elif self.mode == "apply": copy_df[self.return_col] = X[self.column].apply( lambda x: self.func(x, **self.dict_args) ) elif self.mode == "vectorized": copy_df[self.return_col] = self.func(X, **self.dict_args) else: raise ValueError( f"{self.mode} mode not implemented. It must be in `apply_by_multiprocessing`, `apply` or `vectorized`" ) if self.drop_input_col: copy_df.drop(self.column, axis=1) return copy_df