Skip to content

Commit

Permalink
Merge pull request #506 from cagov/hang_high_flow
Browse files Browse the repository at this point in the history
High Flow and Occupancy Detection and Correction
  • Loading branch information
thehanggit authored Jan 15, 2025
2 parents 6a351a5 + d93e2f3 commit 1655e15
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 4 deletions.
22 changes: 22 additions & 0 deletions transform/models/intermediate/clearinghouse/_clearinghouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,25 @@ models:
description: |
An imputed version of g-factor based speed, adjusted using upper and lower bounds. Boundaries vary
based on different scenarios for specific lanes.
- name: int_clearinghouse_detector_outlier_agg_five_minutes
description: |
This model takes the `int_clearinghouse__detector_agg_five_minutes` model and detects extreme values for
flow and occupancy using z-score statistics in last month. All extreme values are filled with 95th percentile
of data and labeled as 'observed_outlier'.
columns:
- name: volume_mean
description: Calculate the mean volume value from last month's volume data.
- name: volume_stddev
description: Calculate the standard deviation from last month's volume data.
- name: volume_95th
description: Calculate 95th percentile of volume for last month.
- name: occupancy_95th
description: Calculate 95th percentile of occupancy for last month.
- name: updated_volume_sum
description: Replace volume with 'volume_95th' if it is diagnosed as outlier.
- name: volume_label
description: Generate a new label indicating whether volume value is an outlier or normal data.
- name: updated_occupancy_avg
description: Replace occupancy with 'occupancy_95th' if it is diagnosed as outlier.
- name: occupancy_label
description: Generate a new label indicating whether occupancy value is an outlier or normal data.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ detector_agg as (
station_id,
lane,
station_type,
volume_sum,
occupancy_avg,
speed_weighted,
updated_volume_sum as volume_sum,
updated_occupancy_avg as occupancy_avg,
volume_observed
from {{ ref('int_clearinghouse__detector_agg_five_minutes') }}
from {{ ref('int_clearinghouse__detector_outlier_agg_five_minutes') }}
where {{ make_model_incremental('sample_date') }}
),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{{ config(
materialized="incremental",
cluster_by=["sample_date"],
unique_key=["detector_id", "sample_timestamp"],
on_schema_change="append_new_columns",
snowflake_warehouse = get_snowflake_refresh_warehouse(small="XS", big="XL")
) }}

/*We dynamically select dataset for last month and calculate the statistics (mean, std)
for outlier detection*/
with
five_minute_agg_lastmonth as (
select
detector_id,
sample_date,
volume_sum,
occupancy_avg
from {{ ref('int_clearinghouse__detector_agg_five_minutes') }}
where
sample_date >= dateadd(month, -1, date_trunc('month', current_date))
and sample_date < date_trunc('month', current_date)
and station_type in ('ML', 'HV')
),

-- get all good detectors
good_detectors as (
select
detector_id,
sample_date
from {{ ref('int_diagnostics__detector_status') }}
where
status = 'Good'
and sample_date >= dateadd(month, -1, date_trunc('month', current_date))
and sample_date < date_trunc('month', current_date)
and station_type in ('ML', 'HV')
),

-- filter last month's data for good detectors only
filtered_five_minute_agg_lastmonth as (
select
f.detector_id,
f.sample_date,
f.volume_sum,
f.occupancy_avg
from five_minute_agg_lastmonth as f
inner join good_detectors as g
on
f.detector_id = g.detector_id
and f.sample_date = g.sample_date

),

-- calculate the statistics
monthly_stats as (
select
detector_id,
avg(volume_sum) as volume_mean,
stddev(volume_sum) as volume_stddev,
-- consider using max_capacity
percentile_cont(0.95) within group (order by volume_sum) as volume_95th,
percentile_cont(0.95) within group (order by occupancy_avg) as occupancy_95th
from filtered_five_minute_agg_lastmonth
group by detector_id
),

-- retrieve recent five-minute data
five_minute_agg as (
select *
from {{ ref('int_clearinghouse__detector_agg_five_minutes') }}
where {{ make_model_incremental('sample_date') }}
),

-- impute detected outliers
outlier_removed_data as (
select
fa.*,
-- update volume_sum if it's an outlier
case
when
(fa.volume_sum - ms.volume_mean) / nullifzero(ms.volume_stddev) > 3
then ms.volume_95th
else fa.volume_sum
end as updated_volume_sum,
-- add a volume_label for imputed volume
case
when
(fa.volume_sum - ms.volume_mean) / nullifzero(ms.volume_stddev) > 3
then 'observed outlier'
else 'observed data'
end as volume_label,
-- update occupancy if it's an outlier
case
when
fa.occupancy_avg > ms.occupancy_95th
then ms.occupancy_95th
else fa.occupancy_avg
end as updated_occupancy_avg,
-- add a column for imputed occupancy
case
when
fa.occupancy_avg > ms.occupancy_95th
then 'observed outlier'
else 'observed data'
end as occupancy_label
from five_minute_agg as fa
left join
monthly_stats as ms
on
fa.detector_id = ms.detector_id

)

select * from outlier_removed_data

0 comments on commit 1655e15

Please sign in to comment.