Skip to content

Commit

Permalink
feat: 🎸 check if the resources are available at start up
Browse files Browse the repository at this point in the history
  • Loading branch information
severo committed Feb 10, 2023
1 parent 57dab13 commit ce2b651
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 20 deletions.
43 changes: 34 additions & 9 deletions jobs/mongodb_migration/src/mongodb_migration/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

import logging
import sys

from libcommon.log import init_logging
Expand All @@ -11,25 +12,49 @@
from mongodb_migration.plan import Plan
from mongodb_migration.resources import MigrationsMongoResource

if __name__ == "__main__":

def run_job() -> None:
job_config = JobConfig.from_env()

init_logging(log_level=job_config.common.log_level)
# ^ set first to have logs as soon as possible

with (
CacheMongoResource(database=job_config.cache.mongo_database, host=job_config.cache.mongo_url),
QueueMongoResource(database=job_config.queue.mongo_database, host=job_config.queue.mongo_url),
CacheMongoResource(
database=job_config.cache.mongo_database, host=job_config.cache.mongo_url
) as cache_resource,
QueueMongoResource(
database=job_config.queue.mongo_database, host=job_config.queue.mongo_url
) as queue_resource,
MigrationsMongoResource(
database=job_config.database_migrations.mongo_database, host=job_config.database_migrations.mongo_url
),
) as migrations_database_resource,
):
if cache_resource.check() is False:
logging.info(
"The connection to the cache database could not be established. The migration job is skipped."
)
return
if queue_resource.check() is False:
logging.info(
"The connection to the queue database could not be established. The migration job is skipped."
)
return
if migrations_database_resource.check() is False:
logging.info(
"The connection to the migrations database could not be established. The migration job is skipped."
)
return
collected_migrations = MigrationsCollector().get_migrations()
try:
Plan(collected_migrations=collected_migrations).execute()
sys.exit(0)
except Exception:
sys.exit(1)
Plan(collected_migrations=collected_migrations).execute()


if __name__ == "__main__":
try:
run_job()
sys.exit(0)
except Exception:
sys.exit(1)

# See:
# https://blog.appsignal.com/2020/04/14/dissecting-rails-migrationsl.html
Expand Down
15 changes: 10 additions & 5 deletions services/admin/src/admin/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from libcommon.log import init_logging
from libcommon.processing_graph import ProcessingGraph
from libcommon.resources import CacheMongoResource, QueueMongoResource, Resource
from libcommon.storage import init_assets_dir
from libcommon.storage import exists, init_assets_dir
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
Expand Down Expand Up @@ -33,15 +33,20 @@ def create_app() -> Starlette:
init_logging(log_level=app_config.common.log_level)
# ^ set first to have logs as soon as possible
assets_directory = init_assets_dir(directory=app_config.assets.storage_directory)
if not exists(assets_directory):
raise RuntimeError("The assets storage directory could not be accessed. Exiting.")

processing_graph = ProcessingGraph(app_config.processing_graph.specification)
processing_steps = list(processing_graph.steps.values())
init_processing_steps = processing_graph.get_first_steps()

resources: list[Resource] = [
CacheMongoResource(database=app_config.cache.mongo_database, host=app_config.cache.mongo_url),
QueueMongoResource(database=app_config.queue.mongo_database, host=app_config.queue.mongo_url),
]
cache_resource = CacheMongoResource(database=app_config.cache.mongo_database, host=app_config.cache.mongo_url)
queue_resource = QueueMongoResource(database=app_config.queue.mongo_database, host=app_config.queue.mongo_url)
resources: list[Resource] = [cache_resource, queue_resource]
if cache_resource.check() is False:
raise RuntimeError("The connection to the cache database could not be established. Exiting.")
if queue_resource.check() is False:
raise RuntimeError("The connection to the queue database could not be established. Exiting.")

prometheus = Prometheus(processing_steps=processing_steps, assets_directory=assets_directory)

Expand Down
11 changes: 7 additions & 4 deletions services/api/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def create_app() -> Starlette:
Middleware(PrometheusMiddleware, filter_unhandled_paths=True),
]

resources: list[Resource] = [
CacheMongoResource(database=app_config.cache.mongo_database, host=app_config.cache.mongo_url),
QueueMongoResource(database=app_config.queue.mongo_database, host=app_config.queue.mongo_url),
]
cache_resource = CacheMongoResource(database=app_config.cache.mongo_database, host=app_config.cache.mongo_url)
queue_resource = QueueMongoResource(database=app_config.queue.mongo_database, host=app_config.queue.mongo_url)
resources: list[Resource] = [cache_resource, queue_resource]
if cache_resource.check() is False:
raise RuntimeError("The connection to the cache database could not be established. Exiting.")
if queue_resource.check() is False:
raise RuntimeError("The connection to the queue database could not be established. Exiting.")

valid: List[BaseRoute] = [
Route(
Expand Down
15 changes: 13 additions & 2 deletions workers/datasets_based/src/datasets_based/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@
init_hf_datasets_cache=app_config.datasets_based.hf_datasets_cache,
numba_path=app_config.numba.path,
) as libraries_resource,
CacheMongoResource(database=app_config.cache.mongo_database, host=app_config.cache.mongo_url),
QueueMongoResource(database=app_config.queue.mongo_database, host=app_config.queue.mongo_url),
CacheMongoResource(
database=app_config.cache.mongo_database, host=app_config.cache.mongo_url
) as cache_resource,
QueueMongoResource(
database=app_config.queue.mongo_database, host=app_config.queue.mongo_url
) as queue_resource,
):
if libraries_resource.check() is False:
raise RuntimeError("The datasets and numba libraries could not be configured. Exiting.")
if cache_resource.check() is False:
raise RuntimeError("The connection to the cache database could not be established. Exiting.")
if queue_resource.check() is False:
raise RuntimeError("The connection to the queue database could not be established. Exiting.")

worker_factory = WorkerFactory(
app_config=app_config,
processing_graph=processing_graph,
Expand Down

0 comments on commit ce2b651

Please sign in to comment.