diff --git a/ingestion_server/README.md b/ingestion_server/README.md index 9cb56d940..86969e488 100644 --- a/ingestion_server/README.md +++ b/ingestion_server/README.md @@ -19,6 +19,10 @@ The server has been designed to fail gracefully in the event of network interrup The server is designed to be run in a private network only. You must not expose the private Ingestion Server API to the public internet. +## Notifications + +If a `SLACK_WEBHOOK` variable is provided, the ingestion server will provide periodic updates on the progress of a data refresh, or relay any errors that may occur during the process. + ## Running on the host 1. Create environment variables from the template file. diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index f0fbe8de1..2e348bd39 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -14,6 +14,7 @@ import falcon import ingestion_server.indexer as indexer +from ingestion_server import slack from ingestion_server.constants.media_types import MEDIA_TYPES from ingestion_server.state import clear_state, worker_finished from ingestion_server.tasks import Task, TaskTracker, TaskTypes @@ -151,6 +152,10 @@ def on_post(self, req, resp): index_type = target_index.split("-")[0] if index_type not in MEDIA_TYPES: index_type = "image" + slack.message( + f"`{index_type}`: Elasticsearch reindex complete | " + f"_Next: promote index as primary_" + ) f = indexer.TableIndexer.go_live p = Process(target=f, args=(target_index, index_type)) p.start() diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index b8063819f..abf8e881c 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -32,6 +32,7 @@ from elasticsearch_dsl import Search, connections from psycopg2.sql import SQL, Identifier, Literal +from ingestion_server import slack from ingestion_server.distributed_reindex_scheduler import schedule_distributed_index from ingestion_server.elasticsearch_models import database_table_to_elasticsearch_model from ingestion_server.es_mapping import index_settings @@ -376,6 +377,9 @@ def go_live(write_index, live_alias): else: es.indices.put_alias(index=write_index, name=live_alias) log.info(f"Created '{live_alias}' index alias pointing to {write_index}") + slack.message( + f"`{write_index}`: ES index promoted - data refresh complete! :tada:" + ) def listen(self, poll_interval=10): """ @@ -410,6 +414,10 @@ def reindex(self, model_name: str, distributed=None): schedule_distributed_index(database_connect(), destination_index) else: self._index_table(model_name, dest_idx=destination_index) + slack.message( + f"`{model_name}`: Elasticsearch reindex complete | " + f"_Next: promote index as primary_" + ) self.go_live(destination_index, model_name) def update(self, model_name: str, since_date): diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index 963f7bc80..d6584debd 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -16,6 +16,7 @@ """ import datetime +import gc import logging as log import psycopg2 @@ -23,6 +24,7 @@ from psycopg2.extras import DictCursor from psycopg2.sql import SQL, Identifier, Literal +from ingestion_server import slack from ingestion_server.cleanup import clean_image_data from ingestion_server.indexer import database_connect from ingestion_server.queries import ( @@ -266,6 +268,9 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") """ # Step 1: Get the list of overlapping columns + slack.message( + f"`{table}`: Starting data refresh | _Next: copying data from upstream_" + ) downstream_db = database_connect() upstream_db = psycopg2.connect( dbname=UPSTREAM_DB_NAME, @@ -296,6 +301,13 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") copy_data = get_copy_data_query(table, shared_cols, approach=approach) log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}") downstream_cur.execute(copy_data) + + next_step = ( + "starting data cleaning" + if table == "image" + else "re-applying indices & constraints" + ) + slack.message(f"`{table}`: Data copy complete | _Next: {next_step}_") downstream_db.commit() downstream_db.close() @@ -303,7 +315,14 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") # Step 4: Clean the data log.info("Cleaning data...") clean_image_data(table) + slack.message( + f"`{table}`: Data cleaning complete | " + f"_Next: re-applying indices & constraints_" + ) + # The server sometimes hangs on or before this next step. This is a pre-emptive + # garbage collection to try and assist with that. + gc.collect() downstream_db = database_connect() with downstream_db.cursor() as downstream_cur: # Step 5: Recreate indices from the original table @@ -320,6 +339,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") if len(remap_constraints.seq) != 0: downstream_cur.execute(remap_constraints) _update_progress(progress, 99.0) + slack.message(f"`{table}`: Indices & constraints applied | _Next: go-live_") # Step 7: Promote the temporary table and delete the original log.info("Done remapping constraints! Going live with new table...") @@ -330,6 +350,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") downstream_db.close() log.info(f"Finished refreshing table '{table}'.") _update_progress(progress, 100.0) + slack.message(f"`{table}`: Finished table refresh | _Next: Elasticsearch reindex_") if finish_time: finish_time.value = datetime.datetime.utcnow().timestamp() diff --git a/ingestion_server/ingestion_server/slack.py b/ingestion_server/ingestion_server/slack.py new file mode 100644 index 000000000..d03470f72 --- /dev/null +++ b/ingestion_server/ingestion_server/slack.py @@ -0,0 +1,38 @@ +import logging +import os + +import requests + + +log = logging.getLogger(__name__) +SLACK_WEBHOOK = "SLACK_WEBHOOK" + + +def message(text: str, summary: str = None) -> None: + """ + Send a Slack message to a channel specified by a Slack webhook variable. + + A message is only sent if the SLACK_WEBHOOK environment variable is undefined. + """ + if not (webhook := os.getenv(SLACK_WEBHOOK)): + log.debug( + f"{SLACK_WEBHOOK} variable not defined, skipping slack message: {text}" + ) + return + if not summary: + if "\n" in text: + summary = "Ingestion server message" + else: + summary = text + + data = { + "blocks": [{"text": {"text": text, "type": "mrkdwn"}, "type": "section"}], + "text": summary, + "username": "Data Refresh Notification", + "icon_emoji": "arrows_counterclockwise", + } + try: + requests.post(webhook, json=data) + except Exception as err: + log.exception(f"Unable to issue slack message: {err}") + pass diff --git a/ingestion_server/ingestion_server/tasks.py b/ingestion_server/ingestion_server/tasks.py index 5e6cb13bb..902c4256a 100644 --- a/ingestion_server/ingestion_server/tasks.py +++ b/ingestion_server/ingestion_server/tasks.py @@ -9,6 +9,7 @@ import requests +from ingestion_server import slack from ingestion_server.indexer import TableIndexer, elasticsearch_connect from ingestion_server.ingest import reload_upstream @@ -97,28 +98,36 @@ def __init__( self.callback_url = callback_url def run(self): - # Map task types to actions. - elasticsearch = elasticsearch_connect() - indexer = TableIndexer( - elasticsearch, self.model, self.progress, self.finish_time - ) - if self.task_type == TaskTypes.REINDEX: - indexer.reindex(self.model) - elif self.task_type == TaskTypes.UPDATE_INDEX: - indexer.update(self.model, self.since_date) - elif self.task_type == TaskTypes.INGEST_UPSTREAM: - reload_upstream(self.model) - if self.model == "audio": - reload_upstream("audioset", approach="basic") - indexer.reindex(self.model) - elif self.task_type == TaskTypes.LOAD_TEST_DATA: - indexer.load_test_data(self.model) - logging.info(f"Task {self.task_id} exited.") - if self.callback_url: - try: - logging.info("Sending callback request") - res = requests.post(self.callback_url) - logging.info(f"Response: {res.text}") - except requests.exceptions.RequestException as e: - logging.error("Failed to send callback!") - logging.error(e) + try: + # Map task types to actions. + elasticsearch = elasticsearch_connect() + indexer = TableIndexer( + elasticsearch, self.model, self.progress, self.finish_time + ) + if self.task_type == TaskTypes.REINDEX: + indexer.reindex(self.model) + elif self.task_type == TaskTypes.UPDATE_INDEX: + indexer.update(self.model, self.since_date) + elif self.task_type == TaskTypes.INGEST_UPSTREAM: + reload_upstream(self.model) + if self.model == "audio": + reload_upstream("audioset", approach="basic") + indexer.reindex(self.model) + elif self.task_type == TaskTypes.LOAD_TEST_DATA: + indexer.load_test_data(self.model) + logging.info(f"Task {self.task_id} exited.") + if self.callback_url: + try: + logging.info("Sending callback request") + res = requests.post(self.callback_url) + logging.info(f"Response: {res.text}") + except requests.exceptions.RequestException as e: + logging.error("Failed to send callback!") + logging.error(e) + except Exception as err: + exception_type = f"{err.__class__.__module__}.{err.__class__.__name__}" + slack.message( + f":x_red: Error processing task `{self.task_type}` for `{self.model}`: " + f'"{err}" (`{exception_type}`)' + ) + raise diff --git a/ingestion_server/test/env.integration b/ingestion_server/test/env.integration index 18d3ee73a..1b2b1dc32 100644 --- a/ingestion_server/test/env.integration +++ b/ingestion_server/test/env.integration @@ -9,3 +9,4 @@ UPSTREAM_DB_PORT="5432" LOCK_PATH="/worker_state/lock" SHELF_PATH="/worker_state/db" +SLACK_WEBHOOK=""