From 50dd38be4fbf122c8e93c86d39895443dee722b9 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Mon, 20 Dec 2021 13:57:42 -0800 Subject: [PATCH 1/7] Add slack alerts throughout data refresh --- ingestion_server/ingestion_server/api.py | 2 ++ ingestion_server/ingestion_server/indexer.py | 8 +++++ ingestion_server/ingestion_server/ingest.py | 13 ++++++- ingestion_server/ingestion_server/slack.py | 36 ++++++++++++++++++++ ingestion_server/test/env.integration | 1 + 5 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 ingestion_server/ingestion_server/slack.py diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index f0fbe8de1..6aa337c53 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -13,6 +13,7 @@ import falcon +from ingestion_server import slack import ingestion_server.indexer as indexer from ingestion_server.constants.media_types import MEDIA_TYPES from ingestion_server.state import clear_state, worker_finished @@ -151,6 +152,7 @@ def on_post(self, req, resp): index_type = target_index.split("-")[0] if index_type not in MEDIA_TYPES: index_type = "image" + slack.message(f"Elasticsearch reindex complete for `{index_type}` | Next: promote index as primary") f = indexer.TableIndexer.go_live p = Process(target=f, args=(target_index, index_type)) p.start() diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index b8063819f..94ddf32aa 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -37,6 +37,7 @@ from ingestion_server.es_mapping import index_settings from ingestion_server.qa import create_search_qa_index from ingestion_server.queries import get_existence_queries +from ingestion_server import slack # For AWS IAM access to Elasticsearch @@ -376,6 +377,10 @@ def go_live(write_index, live_alias): else: es.indices.put_alias(index=write_index, name=live_alias) log.info(f"Created '{live_alias}' index alias pointing to {write_index}") + slack.message( + f"Elasticsearch promotion complete for `{write_index}` " + f"- data refresh complete!" + ) def listen(self, poll_interval=10): """ @@ -410,6 +415,9 @@ def reindex(self, model_name: str, distributed=None): schedule_distributed_index(database_connect(), destination_index) else: self._index_table(model_name, dest_idx=destination_index) + slack.message( + f"Elasticsearch reindex complete for `{model_name}` | Next: promote index as primary" + ) self.go_live(destination_index, model_name) def update(self, model_name: str, since_date): diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index 963f7bc80..bb1fb2b65 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -30,7 +30,7 @@ get_fdw_query, get_go_live_query, ) - +from ingestion_server import slack UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost") UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int) @@ -266,6 +266,9 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") """ # Step 1: Get the list of overlapping columns + slack.message( + f"Starting data refresh on `{table}` | Next: copying data from upstream" + ) downstream_db = database_connect() upstream_db = psycopg2.connect( dbname=UPSTREAM_DB_NAME, @@ -296,6 +299,9 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") copy_data = get_copy_data_query(table, shared_cols, approach=approach) log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}") downstream_cur.execute(copy_data) + + next_step = "starting data cleaning" if table == "image" else "re-applying indices & constraints" + slack.message(f"Data copy complete for `{table}` | Next: {next_step}") downstream_db.commit() downstream_db.close() @@ -303,6 +309,9 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") # Step 4: Clean the data log.info("Cleaning data...") clean_image_data(table) + slack.message( + f"Data cleaning complete for `{table}` | Next: re-applying indices & constraints" + ) downstream_db = database_connect() with downstream_db.cursor() as downstream_cur: @@ -320,6 +329,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") if len(remap_constraints.seq) != 0: downstream_cur.execute(remap_constraints) _update_progress(progress, 99.0) + slack.message(f"Indices & constraints applied for `{table}` | Next: go-live") # Step 7: Promote the temporary table and delete the original log.info("Done remapping constraints! Going live with new table...") @@ -330,6 +340,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") downstream_db.close() log.info(f"Finished refreshing table '{table}'.") _update_progress(progress, 100.0) + slack.message(f"Finished refresh of table `{table}` | Next: Elasticsearch reindex") if finish_time: finish_time.value = datetime.datetime.utcnow().timestamp() diff --git a/ingestion_server/ingestion_server/slack.py b/ingestion_server/ingestion_server/slack.py new file mode 100644 index 000000000..a960a4ae0 --- /dev/null +++ b/ingestion_server/ingestion_server/slack.py @@ -0,0 +1,36 @@ +import logging +import os + +import requests + + +log = logging.getLogger(__name__) +SLACK_WEBHOOK = "SLACK_WEBHOOK" + + +def message(text: str, summary: str = None) -> None: + """ + Send a Slack message to a channel specified by a Slack webhook variable. + + A message is only sent if the SLACK_WEBHOOK environment variable is undefined. + """ + if not (webhook := os.getenv(SLACK_WEBHOOK)): + log.debug( + f"{SLACK_WEBHOOK} variable not defined, skipping slack message: {text}" + ) + return + if not summary: + if "\n" in text: + summary = "Ingestion server message" + else: + summary = text + + data = { + "blocks": [{"text": {"text": text, "type": "mrkdwn"}, "type": "section"}], + "text": summary, + } + try: + requests.post(webhook, json=data) + except Exception as err: + log.exception(f"Unable to issue slack message: {err}") + pass diff --git a/ingestion_server/test/env.integration b/ingestion_server/test/env.integration index 18d3ee73a..1b2b1dc32 100644 --- a/ingestion_server/test/env.integration +++ b/ingestion_server/test/env.integration @@ -9,3 +9,4 @@ UPSTREAM_DB_PORT="5432" LOCK_PATH="/worker_state/lock" SHELF_PATH="/worker_state/db" +SLACK_WEBHOOK="" From 6c3b57a4a25fdaf84aec1360efbdd7059c532f0a Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Mon, 20 Dec 2021 15:21:22 -0800 Subject: [PATCH 2/7] Improve message formatting --- ingestion_server/ingestion_server/api.py | 7 +++++-- ingestion_server/ingestion_server/indexer.py | 8 ++++---- ingestion_server/ingestion_server/ingest.py | 20 +++++++++++++------- ingestion_server/ingestion_server/slack.py | 2 ++ 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index 6aa337c53..2e348bd39 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -13,8 +13,8 @@ import falcon -from ingestion_server import slack import ingestion_server.indexer as indexer +from ingestion_server import slack from ingestion_server.constants.media_types import MEDIA_TYPES from ingestion_server.state import clear_state, worker_finished from ingestion_server.tasks import Task, TaskTracker, TaskTypes @@ -152,7 +152,10 @@ def on_post(self, req, resp): index_type = target_index.split("-")[0] if index_type not in MEDIA_TYPES: index_type = "image" - slack.message(f"Elasticsearch reindex complete for `{index_type}` | Next: promote index as primary") + slack.message( + f"`{index_type}`: Elasticsearch reindex complete | " + f"_Next: promote index as primary_" + ) f = indexer.TableIndexer.go_live p = Process(target=f, args=(target_index, index_type)) p.start() diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index 94ddf32aa..abf8e881c 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -32,12 +32,12 @@ from elasticsearch_dsl import Search, connections from psycopg2.sql import SQL, Identifier, Literal +from ingestion_server import slack from ingestion_server.distributed_reindex_scheduler import schedule_distributed_index from ingestion_server.elasticsearch_models import database_table_to_elasticsearch_model from ingestion_server.es_mapping import index_settings from ingestion_server.qa import create_search_qa_index from ingestion_server.queries import get_existence_queries -from ingestion_server import slack # For AWS IAM access to Elasticsearch @@ -378,8 +378,7 @@ def go_live(write_index, live_alias): es.indices.put_alias(index=write_index, name=live_alias) log.info(f"Created '{live_alias}' index alias pointing to {write_index}") slack.message( - f"Elasticsearch promotion complete for `{write_index}` " - f"- data refresh complete!" + f"`{write_index}`: ES index promoted - data refresh complete! :tada:" ) def listen(self, poll_interval=10): @@ -416,7 +415,8 @@ def reindex(self, model_name: str, distributed=None): else: self._index_table(model_name, dest_idx=destination_index) slack.message( - f"Elasticsearch reindex complete for `{model_name}` | Next: promote index as primary" + f"`{model_name}`: Elasticsearch reindex complete | " + f"_Next: promote index as primary_" ) self.go_live(destination_index, model_name) diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index bb1fb2b65..298df9903 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -23,6 +23,7 @@ from psycopg2.extras import DictCursor from psycopg2.sql import SQL, Identifier, Literal +from ingestion_server import slack from ingestion_server.cleanup import clean_image_data from ingestion_server.indexer import database_connect from ingestion_server.queries import ( @@ -30,7 +31,7 @@ get_fdw_query, get_go_live_query, ) -from ingestion_server import slack + UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost") UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int) @@ -267,7 +268,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") # Step 1: Get the list of overlapping columns slack.message( - f"Starting data refresh on `{table}` | Next: copying data from upstream" + f"`{table}`: Starting data refresh | _Next: copying data from upstream_" ) downstream_db = database_connect() upstream_db = psycopg2.connect( @@ -300,8 +301,12 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}") downstream_cur.execute(copy_data) - next_step = "starting data cleaning" if table == "image" else "re-applying indices & constraints" - slack.message(f"Data copy complete for `{table}` | Next: {next_step}") + next_step = ( + "starting data cleaning" + if table == "image" + else "re-applying indices & constraints" + ) + slack.message(f"`{table}`: Data copy complete | _Next: {next_step}_") downstream_db.commit() downstream_db.close() @@ -310,7 +315,8 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") log.info("Cleaning data...") clean_image_data(table) slack.message( - f"Data cleaning complete for `{table}` | Next: re-applying indices & constraints" + f"`{table}`: Data cleaning complete | " + f"_Next: re-applying indices & constraints_" ) downstream_db = database_connect() @@ -329,7 +335,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") if len(remap_constraints.seq) != 0: downstream_cur.execute(remap_constraints) _update_progress(progress, 99.0) - slack.message(f"Indices & constraints applied for `{table}` | Next: go-live") + slack.message(f"`{table}`: Indices & constraints applied | _Next: go-live_") # Step 7: Promote the temporary table and delete the original log.info("Done remapping constraints! Going live with new table...") @@ -340,7 +346,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") downstream_db.close() log.info(f"Finished refreshing table '{table}'.") _update_progress(progress, 100.0) - slack.message(f"Finished refresh of table `{table}` | Next: Elasticsearch reindex") + slack.message(f"`{table}`: Finished table refresh | _Next: Elasticsearch reindex_") if finish_time: finish_time.value = datetime.datetime.utcnow().timestamp() diff --git a/ingestion_server/ingestion_server/slack.py b/ingestion_server/ingestion_server/slack.py index a960a4ae0..d03470f72 100644 --- a/ingestion_server/ingestion_server/slack.py +++ b/ingestion_server/ingestion_server/slack.py @@ -28,6 +28,8 @@ def message(text: str, summary: str = None) -> None: data = { "blocks": [{"text": {"text": text, "type": "mrkdwn"}, "type": "section"}], "text": summary, + "username": "Data Refresh Notification", + "icon_emoji": "arrows_counterclockwise", } try: requests.post(webhook, json=data) From 578a16028678b89bf4e291fe764d6e20473afbec Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Mon, 20 Dec 2021 15:35:21 -0800 Subject: [PATCH 3/7] Add general exception handling for tasks --- ingestion_server/ingestion_server/tasks.py | 59 +++++++++++++--------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/ingestion_server/ingestion_server/tasks.py b/ingestion_server/ingestion_server/tasks.py index 5e6cb13bb..750cd0523 100644 --- a/ingestion_server/ingestion_server/tasks.py +++ b/ingestion_server/ingestion_server/tasks.py @@ -9,6 +9,7 @@ import requests +from ingestion_server import slack from ingestion_server.indexer import TableIndexer, elasticsearch_connect from ingestion_server.ingest import reload_upstream @@ -97,28 +98,36 @@ def __init__( self.callback_url = callback_url def run(self): - # Map task types to actions. - elasticsearch = elasticsearch_connect() - indexer = TableIndexer( - elasticsearch, self.model, self.progress, self.finish_time - ) - if self.task_type == TaskTypes.REINDEX: - indexer.reindex(self.model) - elif self.task_type == TaskTypes.UPDATE_INDEX: - indexer.update(self.model, self.since_date) - elif self.task_type == TaskTypes.INGEST_UPSTREAM: - reload_upstream(self.model) - if self.model == "audio": - reload_upstream("audioset", approach="basic") - indexer.reindex(self.model) - elif self.task_type == TaskTypes.LOAD_TEST_DATA: - indexer.load_test_data(self.model) - logging.info(f"Task {self.task_id} exited.") - if self.callback_url: - try: - logging.info("Sending callback request") - res = requests.post(self.callback_url) - logging.info(f"Response: {res.text}") - except requests.exceptions.RequestException as e: - logging.error("Failed to send callback!") - logging.error(e) + try: + # Map task types to actions. + elasticsearch = elasticsearch_connect() + indexer = TableIndexer( + elasticsearch, self.model, self.progress, self.finish_time + ) + if self.task_type == TaskTypes.REINDEX: + indexer.reindex(self.model) + elif self.task_type == TaskTypes.UPDATE_INDEX: + indexer.update(self.model, self.since_date) + elif self.task_type == TaskTypes.INGEST_UPSTREAM: + reload_upstream(self.model) + if self.model == "audio": + reload_upstream("audioset", approach="basic") + indexer.reindex(self.model) + elif self.task_type == TaskTypes.LOAD_TEST_DATA: + indexer.load_test_data(self.model) + logging.info(f"Task {self.task_id} exited.") + if self.callback_url: + try: + logging.info("Sending callback request") + res = requests.post(self.callback_url) + logging.info(f"Response: {res.text}") + except requests.exceptions.RequestException as e: + logging.error("Failed to send callback!") + logging.error(e) + except Exception as err: + exception_type = f"{err.__class__.__module__}.{err.__class__.__name__}" + slack.message( + f":x_red: Error processing task `{self.task_type}` for `{self.model}`: " + f'"{err}" (`{exception_type}`)' + ) + raise From 656f5cc07bf27ed672dc8fb1528bb9d1362b674e Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Mon, 20 Dec 2021 16:07:24 -0800 Subject: [PATCH 4/7] Add notification info to README --- ingestion_server/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ingestion_server/README.md b/ingestion_server/README.md index 9cb56d940..86969e488 100644 --- a/ingestion_server/README.md +++ b/ingestion_server/README.md @@ -19,6 +19,10 @@ The server has been designed to fail gracefully in the event of network interrup The server is designed to be run in a private network only. You must not expose the private Ingestion Server API to the public internet. +## Notifications + +If a `SLACK_WEBHOOK` variable is provided, the ingestion server will provide periodic updates on the progress of a data refresh, or relay any errors that may occur during the process. + ## Running on the host 1. Create environment variables from the template file. From 7e2fd85ff2f2a92fd96fe66c2636e7f0354bd48d Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 21 Dec 2021 10:42:45 -0800 Subject: [PATCH 5/7] Fix indentation issue --- ingestion_server/ingestion_server/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion_server/ingestion_server/tasks.py b/ingestion_server/ingestion_server/tasks.py index 750cd0523..902c4256a 100644 --- a/ingestion_server/ingestion_server/tasks.py +++ b/ingestion_server/ingestion_server/tasks.py @@ -112,7 +112,7 @@ def run(self): reload_upstream(self.model) if self.model == "audio": reload_upstream("audioset", approach="basic") - indexer.reindex(self.model) + indexer.reindex(self.model) elif self.task_type == TaskTypes.LOAD_TEST_DATA: indexer.load_test_data(self.model) logging.info(f"Task {self.task_id} exited.") From bdb11f0e8ebe8a71f21d44de561e3ae3f2208ba0 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 21 Dec 2021 10:47:20 -0800 Subject: [PATCH 6/7] Add debug log message --- ingestion_server/ingestion_server/ingest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index 298df9903..217379ecc 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -319,6 +319,9 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") f"_Next: re-applying indices & constraints_" ) + # The server sometimes hangs on or before this next step. Adding this debug + # message here because apparently that unblocks it? May have to do with GC. + log.debug("Getting downstream cursor again") downstream_db = database_connect() with downstream_db.cursor() as downstream_cur: # Step 5: Recreate indices from the original table From 1c8f2db4d2ec6ca67fc1bbc7c84d37aa40f40856 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Tue, 21 Dec 2021 13:00:10 -0800 Subject: [PATCH 7/7] Use gc.collect instead --- ingestion_server/ingestion_server/ingest.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index 217379ecc..d6584debd 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -16,6 +16,7 @@ """ import datetime +import gc import logging as log import psycopg2 @@ -319,9 +320,9 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") f"_Next: re-applying indices & constraints_" ) - # The server sometimes hangs on or before this next step. Adding this debug - # message here because apparently that unblocks it? May have to do with GC. - log.debug("Getting downstream cursor again") + # The server sometimes hangs on or before this next step. This is a pre-emptive + # garbage collection to try and assist with that. + gc.collect() downstream_db = database_connect() with downstream_db.cursor() as downstream_cur: # Step 5: Recreate indices from the original table