Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code to condense events in a DB #7

Merged
merged 6 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,5 @@ certs/
*.cer
.vscode
uv.lock
!example-certs/*
!example-certs/*
*.db
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,29 @@ Get the LFDI for a certificate. It will also do some validation checks.
```sh
python -m sep2tools cert-lfdi certs/dev-ABC-cert.pem
```


## Helper Functions


### Generating IDs

```python
from sep2tools.ids import generate_mrid

EXAMPLE_PEN = 1234
mrid = generate_mrid(EXAMPLE_PEN)
print(mrid) # 2726-D70C-C6C2-40DB-B78E-9B38-0000-1234
```

### Bitmap Hex Mappings

Some helper functions are provided for calculating the hex representation of SEP2 bitmap fields.

```python
from sep2tools.hexmaps import get_role_flag

binval, hexval = get_role_flag(is_mirror=1, is_der=1, is_submeter=1)
print(binval) # 0000000001001001
print(hexval) # 0049
```
9 changes: 9 additions & 0 deletions export_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from sep2tools.eventsdb import add_events, clear_old_events
from sep2tools.examples import example_controls, example_default_control

example_events = [*example_controls(), example_default_control()]
for evt in example_events:
print(evt)

add_events(example_events)
clear_old_events(days_to_keep=0)
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ classifiers = [
]
requires-python = ">=3.10"
dynamic = ["version", "description"]
dependencies = ["asn1", "cryptography", "pydantic", "python-dateutil", "typer"]
dependencies = [
"asn1",
"cryptography",
"pydantic",
"python-dateutil",
"typer",
'sqlite_utils',
]

[project.optional-dependencies]
test = ["ruff", "pytest>=2.7.3", "pytest-cov", "mypy"]
Expand Down
2 changes: 1 addition & 1 deletion sep2tools/cert_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def generate_key(
name = random_id()
key_file = output_dir / f"{name}.key"

key = ec.generate_private_key(ec.SECP256R1)
key = ec.generate_private_key(ec.SECP256R1())
key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
Expand Down
56 changes: 4 additions & 52 deletions sep2tools/events.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,4 @@
from enum import IntEnum

from pydantic import BaseModel


class CurrentStatus(IntEnum):
Scheduled = 0
Active = 1
Cancelled = 2
CancelledWithRadomization = 3
Superseded = 4


class Status(IntEnum):
EventReceived = 1
EventStarted = 2
EventCompleted = 3
Superseded = 4
EventCancelled = 6
EventSuperseded = 7


class DERControlBase(BaseModel):
mode: str
value: int
multiplier: int = 0


class DERControl(BaseModel):
mRID: str
creationTime: int
currentStatus: CurrentStatus
start: int
duration: int
randomizeStart: int = 0
randomizeDuration: int = 0
controls: list[DERControlBase]
primacy: int


class ModeEvent(BaseModel):
mrid: str
primacy: int
creation_time: int
start: int
end: int
value: int
rand_start: int
rand_dur: int
from .models import DERControl, ModeEvent


def non_overlapping_periods(events: list[tuple[int, int]]) -> list[tuple[int, int]]:
Expand Down Expand Up @@ -129,10 +81,10 @@ def condense_events(events: list[DERControl]) -> dict[str, list[ModeEvent]]:
schedule = {}
for evt in events:
mrid = evt.mRID
primacy = evt.primacy
primacy = evt.ProgramInfo.primacy
creation_time = evt.creationTime
start = evt.start
end = evt.start + evt.duration
start = evt.interval.start
end = evt.interval.start + evt.interval.duration
rand_start = evt.randomizeStart
rand_dur = evt.randomizeDuration
for cntrl in evt.controls:
Expand Down
242 changes: 242 additions & 0 deletions sep2tools/eventsdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import json
from datetime import datetime, timezone
from pathlib import Path

from sqlite_utils import Database

from .events import condense_events
from .models import (
DateTimeInterval,
DERControl,
DERControlBase,
EventStatus,
ModeEvent,
ProgramInfo,
)

DEFAULT_OUTPUT_DIR = Path("")

EVENT_COLS = {
"mRID": str,
"creationTime": int,
"currentStatus": int,
"start": int,
"duration": int,
"randomizeStart": int,
"randomizeDuration": int,
"controls": dict,
"program": str,
"primacy": int,
}

ENROLMENT_COLS = {
"der": str,
"program": str,
}

MODE_EVENT_COLS = {
"der": str,
"mode": str,
"start": int,
"end": int,
"value": int,
"creation_time": int,
"rand_start": int,
"rand_duration": int,
"mrid": str,
"primacy": int,
}


def create_db(output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path:
output_path = output_dir / "events.db"
if output_path.exists():
return output_path
db = Database(output_path, strict=True)
events = db["events"]
events.create(
EVENT_COLS,
pk="mRID",
not_null=["creationTime", "start", "duration", "controls"],
if_not_exists=True,
)
enrolments = db["enrolments"]
enrolments.create(
ENROLMENT_COLS,
pk=("der", "program"),
not_null=("der", "program"),
if_not_exists=True,
)
mode_events = db["mode_events"]
mode_events.create(
MODE_EVENT_COLS,
pk=("der", "mode", "start", "end"),
not_null=("der", "mode", "start", "end", "value"),
if_not_exists=True,
)

return output_path


def add_enrolment(der: str, program: str, output_dir: Path = DEFAULT_OUTPUT_DIR):
output_path = output_dir / "events.db"
create_db()
item = {"der": der, "program": program}
db = Database(output_path)
db["enrolments"].insert(item, replace=True)


def get_enrolments(output_dir: Path = DEFAULT_OUTPUT_DIR) -> dict[str, list[str]]:
output_path = output_dir / "events.db"
create_db()
db = Database(output_path)
sql = "SELECT der, program FROM enrolments ORDER BY der, program"
ders = {}
with db.conn:
res = db.query(sql)
for x in res:
der = x["der"]
program = x["program"]
if der not in ders:
ders[der] = []
ders[der].append(program)
return ders


def add_events(events: list[DERControl], output_dir: Path = DEFAULT_OUTPUT_DIR):
output_path = output_dir / "events.db"
create_db()
db = Database(output_path)
records = [
{
"mRID": evt.mRID,
"creationTime": evt.creationTime,
"currentStatus": evt.EventStatus.currentStatus,
"start": evt.interval.start,
"duration": evt.interval.duration,
"randomizeStart": evt.randomizeStart,
"randomizeDuration": evt.randomizeDuration,
"controls": [x.model_dump() for x in evt.controls],
"program": evt.ProgramInfo.program,
"primacy": evt.ProgramInfo.primacy,
}
for evt in events
]
db["events"].insert_all(records, replace=True)

# Trigger update of mode events calculation
update_mode_events(output_dir=output_dir)


def update_mode_events(output_dir: Path = DEFAULT_OUTPUT_DIR):
clear_mode_events(output_dir=output_dir)
enrolments = get_enrolments(output_dir=output_dir)
for der, programs in enrolments.items():
raw_events = []
for prg in programs:
prg_events = get_events(prg, output_dir=output_dir)
raw_events.extend(prg_events)

clean_events = condense_events(raw_events)
for mode, events in clean_events.items():
add_mode_events(der, mode, events, output_dir=output_dir)


def add_mode_events(
der: str, mode: str, events: list[ModeEvent], output_dir: Path = DEFAULT_OUTPUT_DIR
):
output_path = output_dir / "events.db"
create_db()
db = Database(output_path)
records = [
{
"der": der,
"mode": mode,
"start": evt.start,
"end": evt.end,
"value": evt.value,
"creation_time": evt.creation_time,
"rand_start": evt.rand_start,
"rand_duration": evt.rand_dur,
"mrid": evt.mrid,
"primacy": evt.primacy,
}
for evt in events
]
db["mode_events"].insert_all(records, replace=True)


def flattened_event_to_object(evt: dict) -> DERControl:
status = EventStatus(currentStatus=evt["currentStatus"])
interval = DateTimeInterval(start=evt["start"], duration=evt["duration"])
program = ProgramInfo(program=evt["program"], primacy=evt["primacy"])
controls_list = json.loads(evt["controls"])
controls = [DERControlBase(**x) for x in controls_list]
return DERControl(
mRID=evt["mRID"],
EventStatus=status,
creationTime=evt["creationTime"],
interval=interval,
randomizeStart=evt["randomizeStart"],
randomizeDuration=evt["randomizeDuration"],
controls=controls,
ProgramInfo=program,
)


def get_event(mrid: str, output_dir: Path = DEFAULT_OUTPUT_DIR) -> DERControl | None:
output_path = output_dir / "events.db"
create_db()

sql = "SELECT * FROM events WHERE mRID = :mrid"
db = Database(output_path)
with db.conn:
res = db.query(sql, {"mrid": mrid})
for x in res:
item = flattened_event_to_object(x)
return item
return None


def delete_event(mrid: str, output_dir: Path = DEFAULT_OUTPUT_DIR):
output_path = output_dir / "events.db"
sql = "DELETE FROM events WHERE mRID = :mrid"
db = Database(output_path)
with db.conn:
db.execute(sql, {"mrid": mrid})
db.vacuum()


def get_events(program: str, output_dir: Path = DEFAULT_OUTPUT_DIR) -> list[DERControl]:
output_path = output_dir / "events.db"
create_db()

sql = "SELECT * FROM events WHERE program = :prg ORDER BY start, creationTime"
db = Database(output_path)
events = []
with db.conn:
res = db.query(sql, {"prg": program})
for x in res:
item = flattened_event_to_object(x)
events.append(item)
return events


def clear_mode_events(output_dir: Path = DEFAULT_OUTPUT_DIR):
output_path = output_dir / "events.db"
sql = "DELETE FROM mode_events"
db = Database(output_path)
with db.conn:
db.execute(sql)


def clear_old_events(output_dir: Path = DEFAULT_OUTPUT_DIR, days_to_keep: float = 3.0):
output_path = output_dir / "events.db"
create_db()

sql = "DELETE FROM events WHERE (start + duration) < :expire"
now_utc = int(datetime.now(timezone.utc).timestamp())
db = Database(output_path)
with db.conn:
db.execute(sql, {"expire": now_utc})
db.vacuum()
Loading
Loading