Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Nov 29, 2024
1 parent 0ef1317 commit f91b086
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None:
with JobManager() as jm:
ads_interval = jm.get_setting_from_running_job("od_reading", "interval")
ads_start_time = jm.get_setting_from_running_job(
"od_reading", "first_od_obs_time", block=True
"od_reading", "first_od_obs_time", timeout=5
) # this is populated later in the job...

# get interval, and confirm that the requirements are possible: post_delay + pre_delay <= ADS interval - (od reading duration)
Expand Down
41 changes: 21 additions & 20 deletions pioreactor/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from pioreactor.pubsub import subscribe_and_callback
from pioreactor.utils.networking import resolve_to_address
from pioreactor.utils.timing import current_utc_timestamp
from pioreactor.utils.timing import catchtime

if TYPE_CHECKING:
from pioreactor.pubsub import Client
Expand Down Expand Up @@ -516,6 +517,7 @@ def __init__(self) -> None:
self._create_tables()

def _create_tables(self) -> None:
# TODO: add a created_at, updated_at to pio_job_published_settings
create_table_query = """
CREATE TABLE IF NOT EXISTS pio_job_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
Expand All @@ -532,10 +534,10 @@ def _create_tables(self) -> None:
);
CREATE TABLE IF NOT EXISTS pio_job_published_settings (
setting TEXT NOT NULL,
setting TEXT NOT NULL,
value BLOB,
proposed_value BLOB,
job_id INTEGER NOT NULL,
job_id INTEGER NOT NULL,
FOREIGN KEY(job_id) REFERENCES pio_job_metadata(id),
UNIQUE(setting, job_id)
);
Expand Down Expand Up @@ -603,28 +605,27 @@ def upsert_setting(self, job_id: JobMetadataKey, setting: str, value: Any) -> No
self.conn.commit()
return

def get_setting_from_running_job(self, job_name: str, setting: str, block=False) -> Any:
if not block and not self.is_job_running(job_name):
def get_setting_from_running_job(self, job_name: str, setting: str, timeout=None) -> Any:
if timeout is not None and not self.is_job_running(job_name):
raise JobNotRunningError(f"Job {job_name} is not running.")

result = None
while result is None:
select_query = """
SELECT value
FROM pio_job_published_settings s
JOIN pio_job_metadata m ON s.job_id = m.id
WHERE job_name=(?) and setting=(?) and is_running=1"""
self.cursor.execute(select_query, (job_name, setting))
result = self.cursor.fetchone() # returns None if not found

if result is not None:
return result[0]
else:
if block:
continue
else:
with catchtime() as timer:
while True:
select_query = """
SELECT value
FROM pio_job_published_settings s
JOIN pio_job_metadata m ON s.job_id = m.id
WHERE job_name=(?) and setting=(?) and is_running=1"""
self.cursor.execute(select_query, (job_name, setting))
result = self.cursor.fetchone() # returns None if not found

if result:
return result[0]

if (timeout and timer() > timeout) or (timeout is None):
raise NameError(f"Setting {setting} was not found.")


def set_not_running(self, job_id: JobMetadataKey) -> None:
update_query = "UPDATE pio_job_metadata SET is_running=0, ended_at=STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW') WHERE id=(?)"
self.cursor.execute(update_query, (job_id,))
Expand Down

0 comments on commit f91b086

Please sign in to comment.