Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Data refresh slack notifications #421

Merged
merged 7 commits into from
Jan 4, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve message formatting
AetherUnbound committed Jan 4, 2022
commit 6c3b57a4a25fdaf84aec1360efbdd7059c532f0a
7 changes: 5 additions & 2 deletions ingestion_server/ingestion_server/api.py
Original file line number Diff line number Diff line change
@@ -13,8 +13,8 @@

import falcon

from ingestion_server import slack
import ingestion_server.indexer as indexer
from ingestion_server import slack
from ingestion_server.constants.media_types import MEDIA_TYPES
from ingestion_server.state import clear_state, worker_finished
from ingestion_server.tasks import Task, TaskTracker, TaskTypes
@@ -152,7 +152,10 @@ def on_post(self, req, resp):
index_type = target_index.split("-")[0]
if index_type not in MEDIA_TYPES:
index_type = "image"
slack.message(f"Elasticsearch reindex complete for `{index_type}` | Next: promote index as primary")
slack.message(
f"`{index_type}`: Elasticsearch reindex complete | "
f"_Next: promote index as primary_"
)
f = indexer.TableIndexer.go_live
p = Process(target=f, args=(target_index, index_type))
p.start()
8 changes: 4 additions & 4 deletions ingestion_server/ingestion_server/indexer.py
Original file line number Diff line number Diff line change
@@ -32,12 +32,12 @@
from elasticsearch_dsl import Search, connections
from psycopg2.sql import SQL, Identifier, Literal

from ingestion_server import slack
from ingestion_server.distributed_reindex_scheduler import schedule_distributed_index
from ingestion_server.elasticsearch_models import database_table_to_elasticsearch_model
from ingestion_server.es_mapping import index_settings
from ingestion_server.qa import create_search_qa_index
from ingestion_server.queries import get_existence_queries
from ingestion_server import slack


# For AWS IAM access to Elasticsearch
@@ -378,8 +378,7 @@ def go_live(write_index, live_alias):
es.indices.put_alias(index=write_index, name=live_alias)
log.info(f"Created '{live_alias}' index alias pointing to {write_index}")
slack.message(
f"Elasticsearch promotion complete for `{write_index}` "
f"- data refresh complete!"
f"`{write_index}`: ES index promoted - data refresh complete! :tada:"
)

def listen(self, poll_interval=10):
@@ -416,7 +415,8 @@ def reindex(self, model_name: str, distributed=None):
else:
self._index_table(model_name, dest_idx=destination_index)
slack.message(
f"Elasticsearch reindex complete for `{model_name}` | Next: promote index as primary"
f"`{model_name}`: Elasticsearch reindex complete | "
f"_Next: promote index as primary_"
)
self.go_live(destination_index, model_name)

20 changes: 13 additions & 7 deletions ingestion_server/ingestion_server/ingest.py
Original file line number Diff line number Diff line change
@@ -23,14 +23,15 @@
from psycopg2.extras import DictCursor
from psycopg2.sql import SQL, Identifier, Literal

from ingestion_server import slack
from ingestion_server.cleanup import clean_image_data
from ingestion_server.indexer import database_connect
from ingestion_server.queries import (
get_copy_data_query,
get_fdw_query,
get_go_live_query,
)
from ingestion_server import slack


UPSTREAM_DB_HOST = config("UPSTREAM_DB_HOST", default="localhost")
UPSTREAM_DB_PORT = config("UPSTREAM_DB_PORT", default=5433, cast=int)
@@ -267,7 +268,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")

# Step 1: Get the list of overlapping columns
slack.message(
f"Starting data refresh on `{table}` | Next: copying data from upstream"
f"`{table}`: Starting data refresh | _Next: copying data from upstream_"
)
downstream_db = database_connect()
upstream_db = psycopg2.connect(
@@ -300,8 +301,12 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")
log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}")
downstream_cur.execute(copy_data)

next_step = "starting data cleaning" if table == "image" else "re-applying indices & constraints"
slack.message(f"Data copy complete for `{table}` | Next: {next_step}")
next_step = (
"starting data cleaning"
if table == "image"
else "re-applying indices & constraints"
)
slack.message(f"`{table}`: Data copy complete | _Next: {next_step}_")
downstream_db.commit()
downstream_db.close()

@@ -310,7 +315,8 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")
log.info("Cleaning data...")
clean_image_data(table)
slack.message(
f"Data cleaning complete for `{table}` | Next: re-applying indices & constraints"
f"`{table}`: Data cleaning complete | "
f"_Next: re-applying indices & constraints_"
)

downstream_db = database_connect()
@@ -329,7 +335,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")
if len(remap_constraints.seq) != 0:
downstream_cur.execute(remap_constraints)
_update_progress(progress, 99.0)
slack.message(f"Indices & constraints applied for `{table}` | Next: go-live")
slack.message(f"`{table}`: Indices & constraints applied | _Next: go-live_")

# Step 7: Promote the temporary table and delete the original
log.info("Done remapping constraints! Going live with new table...")
@@ -340,7 +346,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")
downstream_db.close()
log.info(f"Finished refreshing table '{table}'.")
_update_progress(progress, 100.0)
slack.message(f"Finished refresh of table `{table}` | Next: Elasticsearch reindex")
slack.message(f"`{table}`: Finished table refresh | _Next: Elasticsearch reindex_")

if finish_time:
finish_time.value = datetime.datetime.utcnow().timestamp()
2 changes: 2 additions & 0 deletions ingestion_server/ingestion_server/slack.py
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ def message(text: str, summary: str = None) -> None:
data = {
"blocks": [{"text": {"text": text, "type": "mrkdwn"}, "type": "section"}],
"text": summary,
"username": "Data Refresh Notification",
"icon_emoji": "arrows_counterclockwise",
}
try:
requests.post(webhook, json=data)