Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to support MDS 0.4.x #87

Merged
merged 43 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
385b7f6
Update JSON schema URLs (#86)
cmdoptesc Jan 27, 2020
051789e
Removing support for MDS 0.2.x (#90)
thekaveman Mar 3, 2020
9d11a35
correct comma typo
thekaveman Mar 4, 2020
729ef1e
new optional fields for status_changes and trips
thekaveman Mar 4, 2020
a7467bc
adjusting time queries for 0.4.0
thekaveman Mar 4, 2020
cdb517c
adjusting paging for 0.4.0
thekaveman Mar 4, 2020
07f4290
removing unsupported query params for 0.4.0
thekaveman Mar 4, 2020
6d66fab
missed member assignment
thekaveman Apr 22, 2020
cc0f2d4
raise earlier for bad params
thekaveman Apr 22, 2020
5bdae8f
fix comparison operator for int
thekaveman Apr 22, 2020
5da635c
add support for the events schema
thekaveman Mar 4, 2020
c317e61
handle events using the status_changes payload key
thekaveman Mar 5, 2020
f7815a5
api client support for events
thekaveman Mar 5, 2020
418fb76
support for loading events data
thekaveman Mar 5, 2020
6d36e59
process time params for events
thekaveman Apr 22, 2020
10d68fe
events time range check, centralized method
thekaveman Apr 22, 2020
c1950c7
flake8 linting
thekaveman Apr 28, 2020
8f9dd9d
schema item key may be different from type
thekaveman Apr 28, 2020
95d2f13
reuse schema key logic
thekaveman Apr 28, 2020
e2ef4eb
simplify request loop
thekaveman Apr 28, 2020
e809a9a
refactor raising UnsupportedVersionError
thekaveman Apr 29, 2020
0cd5340
add support for the vehicles schema
thekaveman Apr 28, 2020
aba3465
api client support for vehicles
thekaveman Apr 28, 2020
9d8a814
support for vehicles data files
thekaveman Apr 29, 2020
06028d8
support for loading vehicles data
thekaveman Apr 29, 2020
c6fa829
fix typo in list concat
thekaveman May 7, 2020
13718e0
fallback to str comparison for non-Version arg
thekaveman May 7, 2020
ea570c4
paging support was removed in 0.4.1
thekaveman May 12, 2020
7598ea5
expose schema_key on DataValidator
thekaveman May 12, 2020
e7d2e75
no need for fstring
thekaveman May 18, 2020
1a5e5db
delay acquiring schema documents from GitHub
thekaveman May 12, 2020
db4722d
standardize on Schema.data_key
thekaveman May 13, 2020
c5a326b
return event types and reasons for vehicles
thekaveman May 20, 2020
40ea262
fix random time generator
thekaveman May 18, 2020
1a7dead
elevate to module constant
thekaveman May 18, 2020
2c12e40
add currency code to trips
thekaveman May 18, 2020
4030a5b
support fake event and vehicle status payloads
thekaveman May 18, 2020
eeea0b6
formatting cleanup
thekaveman May 18, 2020
465cd85
missing self reference
thekaveman May 20, 2020
64a34f7
dict keys are not subscriptable, convert to list
thekaveman May 20, 2020
97bad3b
convert generated locations to feature
thekaveman May 20, 2020
91df0bd
version fixes
thekaveman May 21, 2020
fc43c41
fixing sql generation for pre-0.4.0
thekaveman May 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[flake8]
max-line-length = 120
ignore =
# whitespace after/before square brackets
E201,
E202,
# whitespace after comma
E231,
# bare except
E722,
# do not assign lambda expression
E731
2 changes: 1 addition & 1 deletion mds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
from .encoding import JsonEncoder, TimestampDecoder, TimestampEncoder
from .files import ConfigFile, DataFile
from .providers import Provider, Registry
from .schemas import STATUS_CHANGES, TRIPS, DataValidator, Schema
from .schemas import STATUS_CHANGES, TRIPS, EVENTS, VEHICLES, DataValidator, Schema
from .versions import UnsupportedVersionError, Version
345 changes: 236 additions & 109 deletions mds/api/client.py

Large diffs are not rendered by default.

99 changes: 67 additions & 32 deletions mds/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import sqlalchemy

from ..db import loaders
from ..providers import Provider
from ..schemas import STATUS_CHANGES, TRIPS
from ..versions import UnsupportedVersionError, Version
from ..schemas import STATUS_CHANGES, TRIPS, VEHICLES
from ..versions import Version


def data_engine(uri=None, **kwargs):
Expand Down Expand Up @@ -47,7 +46,9 @@ def data_engine(uri=None, **kwargs):
"""
if uri is None and all(k in kwargs for k in ["user", "password", "host", "db"]):
backend = kwargs.pop("backend", "postgresql")
user, password, host, port, db = kwargs["user"], kwargs["password"], kwargs["host"], kwargs.get("port", 5432), kwargs["db"]
user, password, host, port, db = (
kwargs["user"], kwargs["password"], kwargs["host"], kwargs.get("port", 5432), kwargs["db"]
)
uri = f"{backend}://{user}:{password}@{host}:{port}/{db}"
elif uri is None:
raise KeyError("Provide either uri or ([backend], user, password, host, [port], db).")
Expand All @@ -62,7 +63,7 @@ class Database():

def __init__(self, uri=None, **kwargs):
"""
Initialize a new ProviderDataLoader using a number of connection methods.
Initialize a new Database using a number of connection methods.

Parameters:
uri: str, optional
Expand Down Expand Up @@ -109,8 +110,7 @@ def __init__(self, uri=None, **kwargs):
When an unsupported MDS version is specified.
"""
self.version = Version(kwargs.pop("version", Version.mds_lower()))
if self.version.unsupported:
raise UnsupportedVersionError(self.version)
self.version.raise_if_unsupported()

self.stage_first = kwargs.pop("stage_first", True)
self.engine = kwargs.pop("engine", data_engine(uri=uri, **kwargs))
Expand Down Expand Up @@ -140,10 +140,7 @@ def load(self, source, record_type, table, **kwargs):
* a pandas.DataFrame containing MDS data records

record_type: str
The type of MDS data, e.g. status_changes or trips

record_type: str
The type of MDS data ("status_changes" or "trips").
The type of MDS data.

table: str
The name of the database table to insert this data into.
Expand Down Expand Up @@ -182,8 +179,7 @@ def load(self, source, record_type, table, **kwargs):
self
"""
version = Version(kwargs.pop("version", self.version))
if version.unsupported:
raise UnsupportedVersionError(version)
version.raise_if_unsupported()

if "stage_first" not in kwargs:
kwargs["stage_first"] = self.stage_first
Expand Down Expand Up @@ -240,23 +236,12 @@ def _before_load(df, version):

self._json_cols_tostring(df, ["event_location"])

null_cols = ["battery_pct"]

# version-depenedent association column
association_col = "associated_trips" if version < Version("0.3.0") else "associated_trip"
null_cols.append(association_col)

if version >= Version("0.3.0"):
null_cols.append("publication_time")

# inject any missing optional columns
null_cols = ["battery_pct", "associated_trip", "publication_time", "associated_ticket"]
df = self._add_missing_cols(df, null_cols)

# coerce to object column
df[[association_col]] = df[[association_col]].astype("object")

if version < Version("0.3.0"):
# empty list by default
df[association_col] = df[association_col].apply(lambda d: d if isinstance(d, list) else [])
df[["associated_trip"]] = df[["associated_trip"]].astype("object")

return before_load(df, version)

Expand All @@ -279,7 +264,8 @@ def load_trips(self, source, **kwargs):

drop_duplicates: list, optional
List of column names used to drop duplicate records before load.
By default, ["provider_id", "trip_id"]
By default, ["provider_id", "trip_id"].
If None, don't drop any records.

Additional keyword arguments are passed-through to load().

Expand All @@ -300,17 +286,66 @@ def _before_load(df, version):

self._json_cols_tostring(df, ["route"])

null_cols = ["parking_verification_url", "standard_cost", "actual_cost"]

if version >= Version("0.3.0"):
null_cols.append("publication_time")
null_cols = ["parking_verification_url", "standard_cost", "actual_cost", "publication_time", "currency"]

df = self._add_missing_cols(df, null_cols)

return before_load(df, version)

return self.load(source, TRIPS, table, before_load=_before_load, **kwargs)

def load_events(self, source, **kwargs):
"""
Load MDS events data.

Since events have the same structure as status_changes, this method just proxies to load_status_changes().
"""
return self.load_status_changes(source, **kwargs)

def load_vehicles(self, source, **kwargs):
"""
Load MDS vehicles data.

Parameters:
source: dict, list, str, Path, pandas.DataFrame
See load() for supported source types.

table: str, optional
The name of the table to load data to, by default vehicles.

before_load: callable(df=DataFrame, version=Version): DataFrame, optional
Callback executed on the incoming DataFrame and Version.
Should return the final DataFrame for loading.

drop_duplicates: list, optional
List of column names used to drop duplicate records before load.

Additional keyword arguments are passed-through to load().

Return:
Database
self
"""
table = kwargs.pop("table", VEHICLES)
before_load = kwargs.pop("before_load", lambda df,v: df)
drop_duplicates = kwargs.pop("drop_duplicates", None)

def _before_load(df, version):
"""
Helper converts JSON cols and ensures optional cols exist
"""
if drop_duplicates:
df.drop_duplicates(subset=drop_duplicates, keep="last", inplace=True)

null_cols = ["current_location", "battery_pct", "last_updated", "ttl"]
df = self._add_missing_cols(df, null_cols)

self._json_cols_tostring(df, ["last_event_location", "current_location"])

return before_load(df, version)

return self.load(source, VEHICLES, table, before_load=_before_load, **kwargs)

@staticmethod
def _json_cols_tostring(df, cols):
"""
Expand Down
55 changes: 34 additions & 21 deletions mds/db/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from ..db import sql
from ..fake import util
from ..files import DataFile
from ..schemas import STATUS_CHANGES, TRIPS
from ..versions import UnexpectedVersionError, UnsupportedVersionError, Version
from ..schemas import STATUS_CHANGES, TRIPS, EVENTS, VEHICLES, Schema
from ..versions import UnexpectedVersionError, Version


class DataFrame():
Expand Down Expand Up @@ -39,7 +39,7 @@ def load(self, source, **kwargs):
DataFrame of type record_type to insert.

record_type: str
The type of MDS data ("status_changes" or "trips")
The type of MDS data.

table: str
The name of the database table to insert this data into.
Expand Down Expand Up @@ -78,8 +78,7 @@ def load(self, source, **kwargs):
engine = kwargs.pop("engine")

version = Version(kwargs.get("version", Version.mds_lower()))
if version.unsupported:
raise UnsupportedVersionError(version)
version.raise_if_unsupported()

before_load = kwargs.get("before_load")
stage_first = kwargs.get("stage_first")
Expand All @@ -102,10 +101,21 @@ def load(self, source, **kwargs):

# now insert from the temp table to the actual table
with engine.begin() as conn:
if record_type == STATUS_CHANGES:
query = sql.insert_status_changes_from(temp, table, on_conflict_update=on_conflict_update, version=version)
if record_type in [STATUS_CHANGES, EVENTS]:
query = sql.insert_status_changes_from(temp,
table,
version=version,
on_conflict_update=on_conflict_update)
elif record_type == TRIPS:
query = sql.insert_trips_from(temp, table, on_conflict_update=on_conflict_update, version=version)
query = sql.insert_trips_from(temp,
table,
version=version,
on_conflict_update=on_conflict_update)
elif record_type == VEHICLES:
query = sql.insert_vehicles_from(temp,
table,
version=version,
on_conflict_update=on_conflict_update)
if query is not None:
# move data using query and delete temp table
conn.execute(query)
Expand Down Expand Up @@ -133,7 +143,7 @@ def load(self, source, **kwargs):
An mds.files.DataFile compatible JSON file path.

record_type: str
The type of MDS data ("status_changes" or "trips")
The type of MDS data

table: str
The name of the database table to insert this data into.
Expand All @@ -147,7 +157,7 @@ def load(self, source, **kwargs):
UnexpectedVersionError
When data is parsed with a version different from what was expected.
"""
record_type = kwargs.pop("record_type")
record_type = kwargs.get("record_type")
version = Version(kwargs.get("version"))

# read the data file
Expand All @@ -156,8 +166,6 @@ def load(self, source, **kwargs):
if version and _version != version:
raise UnexpectedVersionError(_version, version)

kwargs["record_type"] = record_type

return super().load(df, **kwargs)

@classmethod
Expand Down Expand Up @@ -191,7 +199,7 @@ def load(self, source, **kwargs):
One or more dicts of type record_type.

record_type: str
The type of MDS data ("status_changes" or "trips").
The type of MDS data.

table: str
The name of the database table to insert this data into.
Expand Down Expand Up @@ -244,7 +252,7 @@ def load(self, source, **kwargs):
One or more payload dicts.

record_type: str
The type of MDS data ("status_changes" or "trips").
The type of MDS data.

table: str
The name of the database table to insert this data into.
Expand All @@ -254,28 +262,33 @@ def load(self, source, **kwargs):

Additional keyword arguments are passed-through to DataFrameLoader.load().
"""
record_type = kwargs.pop("record_type")
record_type = kwargs.get("record_type")
version = kwargs.get("version")

if isinstance(source, dict):
source = [source]

kwargs["record_type"] = record_type
for payload in [p for p in source if record_type in p["data"]]:
data_key = Schema(record_type).data_key
for payload in [p for p in source if data_key in p["data"]]:
if version and version != Version(payload["version"]):
raise UnexpectedVersionError(payload["version"], version)

records = payload["data"][record_type]
records = payload["data"][data_key]

# insert last_updated and ttl data from outer payload into each vehicle record
if record_type == VEHICLES:
last_updated, ttl = payload["last_updated"], payload["ttl"]
for item in records:
item["last_updated"] = last_updated
item["ttl"] = ttl

super().load(records, **kwargs)

@classmethod
def can_load(cls, source):
"""
True if source is one or more MDS Provider payload dicts.
"""
"""
True if source is one or more MDS Provider record dicts.
"""
if isinstance(source, dict):
source = [source]
return isinstance(source, list) and all([
Expand Down
Loading