Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars lazy mode bug and panic resolving columns properly #17427

Closed
2 tasks done
atigbadr opened this issue Jul 4, 2024 · 2 comments · Fixed by #17458
Closed
2 tasks done

Polars lazy mode bug and panic resolving columns properly #17427

atigbadr opened this issue Jul 4, 2024 · 2 comments · Fixed by #17458
Assignees
Labels
accepted Ready for implementation bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@atigbadr
Copy link
Contributor

atigbadr commented Jul 4, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import polars as pl

def add_prio_safety(df):
    """Add column 'prio_safty' by mapping SAFETY_CLASS."""
    # define dict for mapping
    prio_dict = {
        "IPS": 1,
        "EIP": 2,
        "IPR": 3,
        "IPI": 4,
        "QS": 5,
        "ND": 6,
        "NON": 7,
        "?": 8,
    }
    df = df.with_columns(
        pl.col("SAFETY_CLASS")
        .replace(prio_dict)
        .fill_null(pl.lit(9))
        .alias("PRIO_SAFETY")
    )

    return df


def rename_col_suffix(df, suffix, col_key="", col_key_suffix=""):
    """
    Rename all columns with suffix excepct for column col_key that is renamed with suffix
    col_key_suffix.
    """
    new_col_names = {
        c: f"{c}_{suffix}_{col_key_suffix}" if c == col_key else f"{c}_{suffix}"
        for c in df.lazy().collect_schema().names()
    }
    df = df.rename(new_col_names)

    return df


def add_rf_complet(df: pl.DataFrame, suffix: str = ""):
    """Add rf complet as concatenation of exisiting columns, with suffix if given."""
    df = df.with_columns(
        [
            pl.concat_str(
                [
                    pl.col(f"FACILITY{suffix}"),
                    pl.col(f"UNIT{suffix}"),
                    pl.col(f"DIVISION{suffix}"),
                    pl.lit("_"),
                    pl.col(f"SYSTEM_CODE{suffix}"),
                    pl.col(f"EQUIPMENT_NUMBER{suffix}"),
                    pl.col(f"EQUIPMENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_NUMBER{suffix}"),
                ],
                ignore_nulls=True,
            ).alias(f"RF_COMPLET{suffix}"),
            pl.concat_str(
                [
                    pl.col(f"SYSTEM_CODE{suffix}"),
                    pl.col(f"EQUIPMENT_NUMBER{suffix}"),
                    pl.col(f"EQUIPMENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_NUMBER{suffix}"),
                ],
                ignore_nulls=True,
            ).alias(f"RF_COMPLET_WO_TRANCHE{suffix}"),
            pl.concat_str(
                [
                    pl.col(f"SYSTEM_CODE{suffix}"),
                    pl.col(f"EQUIPMENT_NUMBER{suffix}"),
                    pl.col(f"EQUIPMENT_TYPE{suffix}"),
                ],
                ignore_nulls=True,
            ).alias(f"RF{suffix}"),
        ]
    )

    return df

def merge_tot_pmrq_sub_cols(df, col_name):
    """
    Merges same columns, col_name_XXX (XXX stands for 'tot', 'pmrq' or 'sub'), into a new column
    col_final, that contains the value of col_tot if it exists, otherwise value of col_sub if it
    exits, otherwise value of col_pmrq.

    Args:
        df (DataFrame): Input DataFrame.
        col_name (str): Original column name. Columns with same main name and suffix '_tot', '_pmrq'
        and '_sub' must exist.

    Returns
    -------
        DataFrame: Output DataFrame with a new column '{col_name}_final'.

    Operations:
        Create a new column '{col_name}_final'
    """
    df = df.with_columns(
        pl.when(
            (pl.col(f"{col_name}_TOT").is_not_null())
            & (pl.col(f"{col_name}_TOT") != "_//")
            & (pl.col(f"{col_name}_TOT") != "//")
            & (pl.col(f"{col_name}_TOT") != "")
        )
        .then(pl.col(f"{col_name}_TOT"))
        .when(
            (pl.col(f"{col_name}_SUB").is_not_null())
            & (pl.col(f"{col_name}_SUB") != "_//")
            & (pl.col(f"{col_name}_SUB") != "//")
            & (pl.col(f"{col_name}_SUB") != "")
        )
        .then(pl.col(f"{col_name}_SUB"))
        .otherwise(pl.col(f"{col_name}_PMRQ"))
        .alias(f"{col_name}_FINAL")
    )
    return df


def add_rf_complet_and_mi(
    df, df_1, df_2, df_3, df_4, df_5
):
    """
    Add columns rf_complet, rf_complet_sub, rf_complet_pmid, and related modele_industriel
    and SAFETY_CLASS.

    Args:
        df (dataframe) :
            input dataframe
        compute_stats (bool) :
            if true, statitics about filters and joins are computed and
            saved in a csv file stored in S3 at stats_path. WARNING : Increase a lot the
            computation time. In prod, should be False. Default: False.
        stats_path (str) :
            S3 path to save stats. Used only if comute_stats is True. Default: None.

    Returns
    -------
        dataframe: output dataframe
    """
    ###################
    # ADD RF FROM TOT #
    ###################
    df = df.with_columns(pl.col("RF").replace("", None))

    # rename columns to prepare join
    df_1 = rename_col_suffix(df_1, suffix="TOT")

    # JOIN main df and TIDWOTSK data
    df = df.join(
        df_1,
        left_on=["NUMERO_OT", "NUMERO_TOT"],
        right_on=["WORK_ORDER_NBR_TOT", "WORK_ORDER_TASK_TOT"],
        how="left",
    )

    # add rf_complet by contactenation : facility & unit & division & "_" & system_code & equipment_number
    # & equipment_type & "/" & component_type & "/" & component_number
    df = add_rf_complet(df, suffix="_TOT")

    #################
    # ADD PMRQ INFO #
    #################

    # GET TIDECPMS data : PMRQ and e_code relation

    # JOIN main df and TIDECPMS data
    df = df.join(
        df_2,
        left_on=["PMRQ_SITE"],
        right_on=["PMRQ"],
        how="left",
    )

    # rename dataframe before join
    df_3_pmrq = rename_col_suffix(df_3, suffix="PMRQ")

    df_3_pmrq = df_3_pmrq.with_columns(
        pl.col("E_CODE_PMRQ").alias("E_CODE_PMRQ_copy")
    )

    # JOIN main df and TIDECHDR data

    df = df.join(
        df_3_pmrq, left_on=["OWNER_CODE"], right_on=["E_CODE_PMRQ"], how="left"
    )

    df = df.rename({"E_CODE_PMRQ_copy": "E_CODE_PMRQ"})
    # add rf_complet by contactenation : facility & unit & division & "_" & system_code & equipment_number
    # & equipment_type & "/" & component_type & "/" & component_number
    df = add_rf_complet(df, suffix="_PMRQ")

    # rename columns to prepare join

    df_4 = df_4.rename(
        {"E_CODE": "E_CODE_TIDECEQN", "EQUIP_REVISION": "EQUIP_REVISION_TIDECEQN"}
    )
    df_4_pmrq = rename_col_suffix(df_4, suffix="PMRQ")

    # JOIN main df and TIDECEQN data : modele_industriel and classe_surete info
    df = df.join(
        df_4_pmrq,
        left_on=["E_CODE_PMRQ", "EQUIP_REVISION_PMRQ"],
        right_on=["E_CODE_TIDECEQN_PMRQ", "EQUIP_REVISION_TIDECEQN_PMRQ"],
        how="left",
    )

    ################
    # ADD SUB INFO #
    ################

    # rename columns to prepare join
    df_5 = rename_col_suffix(df_5, suffix="TIDPMOVR")

    # JOIN main df and TIDPMOVR data
    df = df.join(
        df_5,
        left_on=["PM_ID_NUMBER", "PM_RQ_NUMBER", "NUMERO_TOT"],
        right_on=[
            "PM_ID_NUMBER_TIDPMOVR",
            "PM_RQ_NUMBER_TIDPMOVR",
            "WORK_ORDER_TASK_TIDPMOVR",
        ],
        how="left",
    )

    # rename column e_code_sub from e_code found in table TIDPMOVR

    df = df.rename({"E_CODE_TIDPMOVR": "E_CODE_SUB"})

    # rename columns of df_3 to prepare join to get substituted values
    df_3_sub = rename_col_suffix(
        df_3, suffix="SUB", col_key="E_CODE", col_key_suffix="TIDECHDR"
    )

    # JOIN main df and TIDECHDR (equipment info - for substituded values this time)
    df = df.join(
        df_3_sub,
        left_on=["E_CODE_SUB"],
        right_on=["E_CODE_SUB_TIDECHDR"],
        how="left",
    )

    # add rf_complet_sub
    df = add_rf_complet(df, suffix="_SUB")

    # change columns of names of table TIDECEQN for join with sub values
    df_4_sub = rename_col_suffix(df_4, suffix="SUB")

    # JOIN main df and TIDECEQN (modele_industriel and classe_surete info - for substituded values this time)
    df = df.join(
        df_4_sub,
        left_on=["E_CODE_SUB", "EQUIP_REVISION_SUB"],
        right_on=["E_CODE_TIDECEQN_SUB", "EQUIP_REVISION_TIDECEQN_SUB"],
        how="left",
    )

    ###############################
    # MERGE PMRQ, SUB & PMID INFO #
    ###############################

    # add final columns that merge original and sub columns

    cols_to_merge = [
        "E_CODE",
        "FACILITY",
        "UNIT",
        "RF_COMPLET",
        "RF_COMPLET_WO_TRANCHE",
        "RF",
        "MODELE_INDUSTRIEL",
        "SAFETY_CLASS",
    ]

    for col in cols_to_merge:
        df = merge_tot_pmrq_sub_cols(df, col)

    ###########################
    # DROP NOT NEEDED COLUMNS #
    ###########################

    # drop columns redundant due to joins
    columns_to_drop = [
        "WORK_ORDER_NBR_TOT",
        "WORK_ORDER_TASK_TOT",
        "PM_ID_NUMBER_TIDPMREF",
        "OWNER_CODE",
        "E_CODE_TIDECEQN_PMRQ",
        "OWNER_CODE_TIDPMREF",
        "E_CODE_PMID_TIDECHDR",
        "EQUIP_REVISION_TIDECEQN_PMRQ",
        "PM_ID_NUMBER_TIDPMOVR",
        "PM_RQ_NUMBER_TIDPMOVR",
        "WORK_ORDER_TASK_TIDPMOVR",
        "E_CODE_SUB_TIDECHDR",
        "E_CODE_TIDECEQN_SUB",
        "E_CODE_TIDECEQN_PMID",
        "EQUIP_REVISION_TIDECEQN_SUB",
        "EQUIP_REVISION_TIDECEQN_PMID",
        "EQUIP_REVISION_PMRQ_right",
        "EQUIP_REVISION_SUB_right",
        "E_CODE_SUB_right",
    ]

    for suffix in ["TOT", "PMRQ", "SUB"]:
        # for suffix in ["tot", "pmrq", "pmid", "sub"]:
        for col in [
            "E_CODE",
            "EQUIP_REVISION",
            "FACILITY",
            "UNIT",
            "DIVISION",
            "SYSTEM_CODE",
            "EQUIPMENT_NUMBER",
            "EQUIPMENT_TYPE",
            "COMPONENT_TYPE",
            "COMPONENT_NUMBER",
            "RF_COMPLET_WO_TRANCHE",
            "RF",
            "MODELE_INDUSTRIEL",
            "SAFETY_CLASS",
        ]:
            columns_to_drop.append(f"{col}_{suffix}")
    df = df.drop(columns_to_drop, strict=False)
    ##################
    # RENAME COLUMNS #
    ##################

    # we rename columns with correct values for downstream function,
    # keep previous values with suffix '_old'
    df = df.rename(
        {
            "SITE": "SITE_OLD",
            "TRANCHE": "TRANCHE_OLD",
            "RF": "RF_OLD",
            "FACILITY_FINAL": "SITE",
            "UNIT_FINAL": "TRANCHE",
            "RF_FINAL": "RF",
            "RF_COMPLET_FINAL": "RF_COMPLET",
            "RF_COMPLET_WO_TRANCHE_FINAL": "RF_COMPLET_SANS_TRANCHE",
            "MODELE_INDUSTRIEL_FINAL": "MODELE_INDUSTRIEL",
            "SAFETY_CLASS_FINAL": "SAFETY_CLASS",
        }
    )

    ###################
    # ADD prio_safety #
    ###################

    df = add_prio_safety(df)

    ##############
    # SAVE STATS #
    ##############

    return df


df_data =  {'SYSTEME_ELEMENTAIRE': ['10259518993288137799'], 'PROCEDURE_END': ['3717447046043311022'], 'NUM_EQU_BIG': ['11710319551364327107'], 'ANNEE': [30], 'NB_PDM': [2], 'TYPE_PMRQ_SITE': ['10116380667568548973'], 'NUMERO_OT': ['17840858055815323787'], 'FAMILLE_EQUIPEMENT_TOT': ['4305432402955241627'], 'HR_BTE_STD': [None], 'JOURS_SITE': [2190], 'RF': ['12429358722875805624'], 'DELTARELATIF_JOURS_SITE_JOURS_PALIER': [0.825], 'DELTA_FREQUENCE': [-0.1375], 'LIBELLE_ACTIVITE_CORRIGE': ['5692331372148487298'], 'SYSTEME_ELEME_TOT': ['10259518993288137799'], 'NUMERO_OTM': ['17840858055815323787'], 'ID': ['12995454962688269568'], 'ID_DF': [474773], 'ARRET': ['2840857409195221889'], 'FREQSITE': [0.16666666666666666], 'LIBELLE_TOT': ['13739374131666219563'], 'STATUT_PMRQ': ['10713894897579468908'], 'TYPE_PMRQ_PALIER': ['10116380667568548973'], 'DN_ROBINET': [None], 'MAILLE_GESTION': ['646788285261384707'], 'RAPPORT_FREQUENCE_SITE_PALIER': [0.547945205479452], 'FREQPALIER': [0.30416666666666664], 'VOIE': ['3717447046043311022'], 'PMRQ_SITE': ['13058443681754841415'], 'PROVENANCE': ['8832189020569722148'], 'JOURS_PALIER': [1200], 'TYPE_TOT': ['706592231814243521'], 'EXCLUSION': ['9225005290826826710'], 'LIBELLE_ACTIVITE': ['933838592733991669'], 'TRANCHE': [1], 'NUMERO_EQUIPEMENT': ['13488033529883711274'], 'PMRQ_PALIER': ['3389348587797839655'], 'FREQUENCE_SITE': ['2873171927416060184'], 'TYPE_GESTE_MAINTENANCE': ['5956551079191982252'], 'SITE': ['11203314010027630067'], 'TYPE_ARRET': ['10383623603468822009'], 'CODE_BTE': ['3717447046043311022'], 'NUMERO_TOT': ['12667853719307402080'], 'DISCIPLINE_TOT': ['7103309410951340401'], 'RDU': ['17378103479390767212'], 'FREQUENCE_PALIER': ['4177858838454549957'], 'TYPE_ROBINET': ['3717447046043311022'], 'DISCIPLINE_OT': ['7103309410951340401'], 'NOM_EQUIPEMENT': ['16076728382483653473'], 'HR_BTE_LOCALES': [None], 'PUISSANCE': [1300], 'PALIER': ['13917492298881635150'], 'TECHNIQUE_END': ['3717447046043311022']}
df_1_data =  {'WORK_ORDER_NBR': ['16044451887719335171'], 'WORK_ORDER_TASK': ['4438342574969621960'], 'WORK_ORDER_TYPE': ['1946467547790703037'], 'FACILITY': ['16491980263625684958'], 'UNIT': ['7569191480599717718'], 'DIVISION': ['3717447046043311022'], 'SYSTEM_CODE': ['1844251584742556006'], 'EQUIPMENT_NUMBER': ['16065166508435221904'], 'EQUIPMENT_TYPE': ['1123847502349723702'], 'COMPONENT_TYPE': ['3717447046043311022'], 'COMPONENT_NUMBER': ['3717447046043311022'], 'E_CODE': ['15111364629475924547'], 'EQUIP_REVISION': ['5597443177314721229'], 'MODELE_INDUSTRIEL': ['3717447046043311022'], 'SAFETY_CLASS': ['15621912796529646110']}
df_2_data =  {'PM_IDENTIFIER': ['11675224185897931296'], 'PM_ID_NUMBER': ['231603847244906558'], 'PM_RQ_NUMBER': ['13341616531854961667'], 'OWNER_CODE': ['6942795238955349321'], 'PMRQ': ['13805924151917160826']}
df_3_data =  {'FACILITY': ['507015596643481126'], 'UNIT': ['9225005290826826710'], 'DIVISION': ['3717447046043311022'], 'SYSTEM_CODE': ['17029355182952006822'], 'EQUIPMENT_NUMBER': ['4217269385066705779'], 'EQUIPMENT_TYPE': ['7858962460223325788'], 'COMPONENT_TYPE': ['3717447046043311022'], 'COMPONENT_NUMBER': ['3717447046043311022'], 'REVISION_STATUS': ['1725357529781714295'], 'E_CODE': ['16719398411140702957'], 'EQUIP_REVISION': ['5597443177314721229']}
df_4_data =  {'E_CODE': ['18185221358599604455'], 'EQUIP_REVISION': ['5597443177314721229'], 'MODELE_INDUSTRIEL': ['7403472235573633466'], 'SAFETY_CLASS': ['15621912796529646110']}
df_5_data =  {'E_CODE': ['953248819759467992'], 'PM_ID_NUMBER': ['1236084142282258250'], 'PM_RQ_NUMBER': ['1497819091941839941'], 'WORK_ORDER_TASK': ['3636567706313226194']}
df_6_data =  {'Code arret complet - base source': ['3234782187912179067'], 'Code arret complet - revise': ['9323118806241067325']}
df_7_data =  {'Code PMT': ['9323118806241067325'], 'Code arret - revise': ['9323118806241067325'], 'Annee - revise': ['72259354551516523']}

pl.DataFrame(df_data).write_parquet("df.parquet")
pl.DataFrame(df_1_data).write_parquet("df_1.parquet")
pl.DataFrame(df_2_data).write_parquet("df_2.parquet")
pl.DataFrame(df_3_data).write_parquet("df_3.parquet")
pl.DataFrame(df_4_data).write_parquet("df_4.parquet")
pl.DataFrame(df_5_data).write_parquet("df_5.parquet")
pl.DataFrame(df_6_data).write_parquet("df_6.parquet")
pl.DataFrame(df_7_data).write_parquet("df_7.parquet")


df = pl.scan_parquet("df.parquet")
df_1 = pl.scan_parquet("df_1.parquet")
df_2 = pl.scan_parquet("df_2.parquet")
df_3 = pl.scan_parquet("df_3.parquet")
df_4 = pl.scan_parquet("df_4.parquet")
df_5 = pl.scan_parquet("df_5.parquet")
df_6 = pl.scan_parquet("df_6.parquet")
df_7 = pl.scan_parquet("df_7.parquet")


df = add_rf_complet_and_mi(
    df, df_1, df_2, df_3, df_4, df_5
)
df = df.with_columns(
    pl.concat_str(
        [pl.col("SITE"), pl.col("ARRET")],
        separator="-",
    ).alias("ARRET_SITE")
)


df_6 = df_6.rename(
    {
        "Code arret complet - base source": "ARRET_SITE",
        "Code arret complet - revise": "ARRET_NEW",
    }
)

df_7 = df_7.rename(
    {"Code arret - revise": "ARRET_NEW", "Annee - revise": "ANNEE_NEW"}
)
# Work around i had to use to get this to work in streaming mode is
# df = df.cast(df.collect_schema())
df = df.join(df_6, how="left", on="ARRET_SITE")
df = df.join(
    df_7.select(["ARRET_NEW", "ANNEE_NEW"]), how="left", on="ARRET_NEW"
)
df = df.drop("ARRET_SITE", strict=False)
print(df.explain())
print(df.collect().head())

Log output

pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: ColumnNotFound(ErrString(""))

Issue description

Polars lazy mode struggles/bugs resolving columns properly with schema especially when using combination of rename/drop.

The code works in non lazy mode.
The code also works when using df = df.cast(df.collect_schema()) at some point before the final LoC

Everything is in the code.

Data is written to parquet and scanned since this is where the original data is scanned from in our case.

Expected behavior

Lazy mode should be able to resolve columns properly.

Installed versions

--------Version info---------
Polars:               1.0.0
Index type:           UInt32
Platform:             Linux-3.10.0-1160.114.2.el7.x86_64-x86_64-with-glibc2.28
Python:               3.11.8 (main, Apr 12 2024, 16:17:28) [GCC 8.5.0 20210514 (Red Hat 8.5.0-20)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           0.3.2
deltalake:            0.18.1
fastexcel:            0.10.4
fsspec:               2024.6.1
gevent:               <not installed>
great_tables:         <not installed>
hvplot:               0.10.0
matplotlib:           <not installed>
nest_asyncio:         1.6.0
numpy:                1.26.4
openpyxl:             3.1.5
pandas:               2.2.2
pyarrow:              16.1.0
pydantic:             2.7.4
pyiceberg:            <not installed>
sqlalchemy:           2.0.31
torch:                <not installed>
xlsx2csv:             0.8.2
xlsxwriter:           <not installed>
@atigbadr atigbadr added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jul 4, 2024
@cmdlineluser
Copy link
Contributor

There should be a way to reduce the example down.

It does appear to be projection_pushdown related:

>>> df.collect(projection_pushdown=False)
shape: (1, 73)
┌──────────────────┬──────────────────┬──────────────────┬───────┬───┬──────────────┬─────────────┬───────────┬───────────┐
│ SYSTEME_ELEMENTAPROCEDURE_ENDNUM_EQU_BIGANNEE ┆ … ┆ SAFETY_CLASSPRIO_SAFETYARRET_NEWANNEE_NEW │
│ IRE---------   ┆   ┆ ------------       │
│ ---strstri64   ┆   ┆ strstrstrstr       │
│ str              ┆                  ┆                  ┆       ┆   ┆              ┆             ┆           ┆           │
╞══════════════════╪══════════════════╪══════════════════╪═══════╪═══╪══════════════╪═════════════╪═══════════╪═══════════╡
│ 10259518993288133717447046043311117103195513643230    ┆ … ┆ null9nullnull      │
│ 77990227107             ┆       ┆   ┆              ┆             ┆           ┆           │
└──────────────────┴──────────────────┴──────────────────┴───────┴───┴──────────────┴─────────────┴───────────┴───────────┘

@cmdlineluser
Copy link
Contributor

Minimal repro:

import polars as pl

(pl.LazyFrame({'A': [1]})
   .with_columns(B = 2)
   .drop([], strict=False)
   .rename({'A': 'C', 'B': 'A'})
   .drop([], strict=False)
   .collect()
)
# pyo3_runtime.PanicException: 
# called `Result::unwrap()` on an `Err` value: ColumnNotFound(ErrString("C"))

@ritchie46 ritchie46 self-assigned this Jul 6, 2024
@c-peters c-peters added the accepted Ready for implementation label Jul 8, 2024
@c-peters c-peters added this to Backlog Jul 8, 2024
@c-peters c-peters moved this to Done in Backlog Jul 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

4 participants