Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-compute road-interest point mappings #667

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions containers/cleanair/cleanair/databases/tables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
JamCamDayStats,
JamCamMetaData,
)
from .point_road_map import PointRoadMap

__all__ = [
"AirQualityDataTable",
Expand All @@ -67,6 +68,7 @@
"JamCamMetaData",
"MetaPoint",
"OSHighway",
"PointRoadMap",
"RectGrid",
"RectGrid100",
"SatelliteGrid",
Expand Down
29 changes: 29 additions & 0 deletions containers/cleanair/cleanair/databases/tables/point_road_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Table for storing mapping of interset points to road segments
"""
from sqlalchemy import Column, ForeignKey, PrimaryKeyConstraint, Index, Float, DateTime

from ..base import Base


class PointRoadMap(Base):
"""Table of road segments mapped to interest points"""

__tablename__ = "point_road_map"
__table_args__ = {"schema": "interest_points"}

point_id = Column(ForeignKey("interest_points.meta_point.id"))
road_segment_id = Column(ForeignKey("static_data.oshighway_roadlink.toid"))
buffer_radius = Column(Float)
map_datetime = Column(DateTime)

road_point_pk = PrimaryKeyConstraint(point_id, road_segment_id, buffer_radius)
point_index = Index(point_id)


def __repr__(self):
return f"<RoadPointMap( point_id: {self.point_id}," \
f" road_segment_id: {self.road_segment_id}," \
f" buffer_radius: {self.buffer_radius}" \
f" map_datetime: {self.map_datetime}" \
f")>"
2 changes: 2 additions & 0 deletions containers/cleanair/cleanair/features/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
ALL_FEATURES_DYNAMIC,
)
from .feature_extractor import FeatureExtractor, ScootFeatureExtractor
from .point_road_mapper import PointRoadMapper

__all__ = [
"ALL_FEATURES",
"FeatureExtractor",
"PointRoadMapper",
"ScootForecastFeatures",
"ScootReadingFeatures",
"ScootFeatureExtractor",
Expand Down
67 changes: 67 additions & 0 deletions containers/cleanair/cleanair/features/point_road_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import List

from sqlalchemy import func
from sqlalchemy.dialects import postgresql
from sqlalchemy.sql.expression import literal
from sqlalchemy.types import Float

from ..databases import DBWriter
from ..databases.tables import MetaPoint, PointRoadMap, OSHighway
from ..decorators import db_query


class PointRoadMapper(DBWriter):

@db_query()
def unprocessed_ids(self, sources=None, time=None):
with self.dbcnxn.open_session() as session:
processed_ids_query = session.query(PointRoadMap.point_id)
if time:
processed_ids_query = processed_ids_query.filter(PointRoadMap.map_datetime >= time)

unprocessed_ids_query = session.query(MetaPoint.id) \
.filter(MetaPoint.id.notin_(processed_ids_query))
if sources:
unprocessed_ids_query = unprocessed_ids_query \
.filter(MetaPoint.source.in_(sources))

return unprocessed_ids_query

@db_query()
def unprocessed_counts(self, sources=None):

ids = self.unprocessed_ids(sources=sources).all()

with self.dbcnxn.open_session() as session:
return session.query(MetaPoint.source, func.count(MetaPoint.id).label("unprocessed")).group_by(MetaPoint.source)

@db_query()
def buffers(self, point_ids: List[str], radius: float):
with self.dbcnxn.open_session() as session:
return session.query(
MetaPoint.id.label("id"),
func.Geometry(func.ST_Buffer(func.Geography(MetaPoint.location), radius)).label("geom")
).filter(MetaPoint.id.in_(point_ids))

@db_query()
def map_points(self, point_ids: List[str]):
with self.dbcnxn.open_session() as session:
for radius in [10, 100, 200, 500, 1000]:
buffers = self.buffers(point_ids=point_ids, radius=radius).cte("buffer")

maps = (
session.query(
buffers.c.id.label("point_id"),
OSHighway.toid.label("road_segment_id"),
literal(radius, Float).label("buffer_radius"),
func.now().label("map_datetime")
)
.filter(func.ST_Intersects(buffers.c.geom, OSHighway.geom))
.subquery()
)

self.commit_records(
maps,
on_conflict="overwrite",
table=PointRoadMap
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Processors parser"""
import typer
from . import scoot_forecast_cli
from . import point_road_map_cli

app = typer.Typer(help="Process urbanair input data into a different format")
app.add_typer(scoot_forecast_cli.app, name="scoot")
app.add_typer(point_road_map_cli.app, name="mapping")

if __name__ == "__main__":
app()
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import List

import typer
from cleanair.features import PointRoadMapper

from ..shared_args import ValidSources
from ..state import state

app = typer.Typer()


@app.command()
def check() -> None:
map = PointRoadMapper(secretfile=state["secretfile"])

unprocessed_ids = map.unprocessed_counts(output_type="tabulate")

typer.echo(unprocessed_ids)


@app.command()
def map(source: List[ValidSources] = typer.Option(None)) -> None:
map = PointRoadMapper(secretfile=state["secretfile"])

unprocessed_ids = map.unprocessed_ids(sources=source).all()

map.map_points(point_ids=unprocessed_ids)