Skip to content

Commit

Permalink
♻️ Added check for unprocessed points
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesbrandreth committed Mar 5, 2021
1 parent b96ddd4 commit 92d37d2
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 23 deletions.
4 changes: 2 additions & 2 deletions containers/cleanair/cleanair/databases/tables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
JamCamDayStats,
JamCamMetaData,
)
from .road_point_map import RoadPointMap
from .point_road_map import PointRoadMap

__all__ = [
"AirQualityDataTable",
Expand All @@ -68,9 +68,9 @@
"JamCamMetaData",
"MetaPoint",
"OSHighway",
"PointRoadMap",
"RectGrid",
"RectGrid100",
"RoadPointMap",
"SatelliteGrid",
"SatelliteForecast",
"SatelliteBox",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
"""
Table for storing mapping of interset points to road segments
"""
from sqlalchemy import Column, ForeignKey, Float, DateTime
from sqlalchemy import Column, ForeignKey, PrimaryKeyConstraint, Index, Float, DateTime

from ..base import Base


class RoadPointMap(Base):
class PointRoadMap(Base):
"""Table of road segments mapped to interest points"""

__tablename__ = "road_point_map"
__tablename__ = "point_road_map"
__table_args__ = {"schema": "interest_points"}

point_id = Column(ForeignKey("interest_points.meta_point.id"))
road_segment_id = Column(ForeignKey("static_data.oshighway_roadlink.toid"))
buffer_radius = Column(Float)
map_datetime = Column(DateTime)

road_point_pk = PrimaryKeyConstraint(point_id, road_segment_id, buffer_radius)
point_index = Index(point_id)


def __repr__(self):
return f"<RoadPointMap( point_id: {self.point_id}," \
Expand Down
2 changes: 2 additions & 0 deletions containers/cleanair/cleanair/features/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
ALL_FEATURES_DYNAMIC,
)
from .feature_extractor import FeatureExtractor, ScootFeatureExtractor
from .point_road_mapper import PointRoadMapper

__all__ = [
"ALL_FEATURES",
"FeatureExtractor",
"PointRoadMapper",
"ScootForecastFeatures",
"ScootReadingFeatures",
"ScootFeatureExtractor",
Expand Down
34 changes: 25 additions & 9 deletions containers/cleanair/cleanair/features/point_road_mapper.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
from Typing import Optional, List, Datetime
from datetime import datetime
from typing import Optional, List

from sqlalchemy import func

from ..databases import DBWriter
from ..databases.tables import MetaPoint, RoadPointMap
from ..databases.tables import MetaPoint, PointRoadMap
from ..decorators import db_query
from ..types.enum_types import Source


class PointRoadMapper(DBWriter):

@db_query()
def unprocessed_ids(self, sources: List[Source], datetime: Optional[Datetime]):
def unprocessed_ids(self, sources=None, time=None):
with self.dbcnxn.open_session() as session:
processed_ids_query = session.query(RoadPointMap.point_id)
if datetime:
processed_ids_query = processed_ids_query.filter(RoadPointMap.map_datetime >= datetime)
processed_ids_query = session.query(PointRoadMap.point_id)
if time:
processed_ids_query = processed_ids_query.filter(PointRoadMap.map_datetime >= time)

return (session.query(MetaPoint.id)
.filter(MetaPoint.id.notin_(processed_ids_query))
unprocessed_ids_query = session.query(MetaPoint.id) \
.filter(MetaPoint.id.notin_(processed_ids_query))
if sources:
unprocessed_ids_query = unprocessed_ids_query \
.filter(MetaPoint.source in sources)
)

return unprocessed_ids_query

@db_query()
def unprocessed_counts(self):

ids = self.unprocessed_ids().all()

with self.dbcnxn.open_session() as session:
return session.query(MetaPoint.source, func.count(MetaPoint.id).label("unprocessed")).group_by(MetaPoint.source)

Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Processors parser"""
import typer
from . import scoot_forecast_cli
from . import point_road_map_cli

app = typer.Typer(help="Process urbanair input data into a different format")
app.add_typer(scoot_forecast_cli.app, name="scoot")
app.add_typer(point_road_map_cli.app, name="map")
app.add_typer(point_road_map_cli.app, name="mapping")

if __name__ == "__main__":
app()
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import typer

from typing import List

import typer
from cleanair.features import PointRoadMapper

from ..shared_args import ValidSources, Sources
from ..state import state

app = typer.Typer()





@app.command
@app.command()
def points(source: List[ValidSources] = Sources):
pass


# get unprocessed ids
@app.command()
def check() -> None:
map = PointRoadMapper(secretfile=state["secretfile"])
unprocesed_ids = map.unprocessed_counts(output_type="tabulate")

# Do intercestion thing
typer.echo(unprocesed_ids)

0 comments on commit 92d37d2

Please sign in to comment.