Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagster University: parquet file after being materialized errors out repeatedly #25729

Open
cobrienbeam opened this issue Nov 4, 2024 · 2 comments
Assignees
Labels
area: dagster-university Related to Dagster University type: bug Something isn't working

Comments

@cobrienbeam
Copy link

cobrienbeam commented Nov 4, 2024

What's the issue?

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "taxi_trips":
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 282, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 494, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 183, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 88, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 198, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 167, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 471, in iterate_with_context
    with context_fn():
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'data/raw/taxi_trips_2023-03.parquet'
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 473, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 141, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
             ^^^^^^^^^^^^^^^^^^
  File "/Users/charlieobrien/.pyenv/versions/3.11.10/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 129, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
                                                                    ^^^^^^^^^^^^^^^^^^
  File "/Users/charlieobrien/Documents/Workspace/dagster-course/dagster_university/dagster_university/assets/trips.py", line 57, in taxi_trips
    conn.execute(sql_query)
<img width="1728" alt="image" src="https://github.com/user-attachments/assets/998a2e09-7ee3-4695-a765-430f92d1b768">

What did you expect to happen?

For the file to process correctly.

How to reproduce?

Follow the steps in the dagster university instructions

Dagster version

dagster~=1.7

Deployment type

Local

Deployment details

No response

Additional information

import duckdb
import requests
from . import constants
from dagster import asset
from dagster_duckdb import DuckDBResource


@asset
def taxi_trips_file() -> None:
    """
    The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
    """
    month_to_fetch = "2023-03"
    raw_trips = requests.get(
        f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
    )
    with open(
        constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb"
    ) as output_file:
        output_file.write(raw_trips.content)


@asset
def taxi_zones_file() -> None:
    """
    The raw CSV file for the taxi zones dataset. Sourced from the NYC Open Data portal.
    """
    taxi_zones = requests.get(
        "https://data.cityofnewyork.us/api/views/755u-8jsi/rows.csv?accessType=DOWNLOAD"
    )
    with open(constants.TAXI_ZONES_FILE_PATH, "wb") as output_file:
        output_file.write(taxi_zones.content)


@asset(deps=["taxi_trips_file"])
def taxi_trips(database: DuckDBResource) -> None:
    """
    The raw taxi trips data, loaded into a DuckDB database.
    """
    sql_query = """
        create or replace table trips as (
          select
            VendorID as vendor_id,
            PULocationID as pickup_zone_id,
            DOLocationID as dropoff_zone_id,
            RatecodeID as rate_code_id,
            payment_type as payment_type,
            tpep_dropoff_datetime as dropoff_datetime,
            tpep_pickup_datetime as pickup_datetime,
            trip_distance as trip_distance,
            passenger_count as passenger_count,
            total_amount as total_amount
          from 'data/raw/taxi_trips_2023-03.parquet'
        );
    """
    with database.get_connection() as conn:
        conn.execute(sql_query)


@asset(deps=["taxi_zones_file"])
def taxi_zones(database: DuckDBResource) -> None:
    """
    The raw taxi zones data, loaded into a DuckDB database.
    """
    sql_query = f"""
        create or replace table zones as (
          select
            LocationID as zone_id,
            Zone as zone,
            Borough as borough,
            the_geom as geometry
          from '{constants.TAXI_ZONES_FILE_PATH}'
        );
    """
    with database.get_connection() as conn:
        conn.execute(sql_query)

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.

@cobrienbeam cobrienbeam added the type: bug Something isn't working label Nov 4, 2024
@garethbrickman garethbrickman added the area: dagster-university Related to Dagster University label Nov 4, 2024
@garethbrickman garethbrickman changed the title Parquet file after being materialized errors out repeatedly Dagster University: parquet file after being materialized errors out repeatedly Nov 4, 2024
@cmpadden cmpadden self-assigned this Nov 5, 2024
@exprms
Copy link

exprms commented Jan 23, 2025

For me it turned out, that the download link for the taxi-zones does not exist anymore -> there is no dataset with id 755u-8jsi on https://data.cityofnewyork.us/api/views/
So after (successful) materializing the taxi_zones_file-asset the csv-file only contains:

>>> head data/raw/taxi_zones.csv 
{
  "code" : "not_found",
  "error" : true,
  "message" : "Cannot find view with id 755u-8jsi"
}

therefor I get an error when materializing downstream asset taxi_zones

@garethbrickman
Copy link
Contributor

garethbrickman commented Jan 23, 2025

We are working to get that fixed. In the meantime as a workaround attached is the needed data in a CSV: taxi_zones.csv

In the taxi_zones_file asset, replace the commented out code below with the non commented code and add the correct file path to the CSV file:

# raw_zones = requests.get(
#    "https://data.cityofnewyork.us/api/views/755u-8jsi/rows.csv?accessType=DOWNLOAD"
# )

 with open("path/to/file/taxi_zones.csv", 'rb') as file:
        raw_zones = file.read()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: dagster-university Related to Dagster University type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants