Skip to content

Commit

Permalink
Merge pull request #6 from shirte/main
Browse files Browse the repository at this point in the history
Use nerdd-link for communication
  • Loading branch information
shirte authored Nov 28, 2024
2 parents a1897fb + 374ecaa commit 50bc4b8
Show file tree
Hide file tree
Showing 32 changed files with 265 additions and 317 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ dmypy.json
# pytype static type analyzer
.pytype/

# ruff
.ruff_cache/

# Cython debug symbols
cython_debug/

Expand Down
4 changes: 4 additions & 0 deletions nerdd_backend/actions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .save_job_to_db import *
from .save_module_to_db import *
from .save_result_to_db import *
from .update_job_size import *
21 changes: 21 additions & 0 deletions nerdd_backend/actions/save_job_to_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import logging

from nerdd_link import Action, Channel, JobMessage

from ..data import RethinkDbRepository

__all__ = ["SaveJobToDb"]

logger = logging.getLogger(__name__)


class SaveJobToDb(Action[JobMessage]):
def __init__(self, channel: Channel, repository: RethinkDbRepository) -> None:
super().__init__(channel.jobs_topic())
self.repository = repository

async def _process_message(self, message: JobMessage) -> None:
await self.repository.upsert_job(message)

def _get_group_name(self):
return "save-jobs-to-db"
22 changes: 22 additions & 0 deletions nerdd_backend/actions/save_module_to_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging

from nerdd_link import Action, Channel, ModuleMessage

from ..data import RethinkDbRepository

__all__ = ["SaveModuleToDb"]

logger = logging.getLogger(__name__)


class SaveModuleToDb(Action[ModuleMessage]):
def __init__(self, channel: Channel, repository: RethinkDbRepository) -> None:
super().__init__(channel.modules_topic())
self.repository = repository

async def _process_message(self, message: ModuleMessage) -> None:
logger.info(f"Creating a new module called {message.name}")
await self.repository.upsert_module(message)

def _get_group_name(self):
return "save-module-to-db"
24 changes: 24 additions & 0 deletions nerdd_backend/actions/save_result_to_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging

from nerdd_link import Action, Channel, ResultMessage

from ..data import RethinkDbRepository

__all__ = ["SaveResultToDb"]

logger = logging.getLogger(__name__)


class SaveResultToDb(Action[ResultMessage]):
def __init__(self, channel: Channel, repository: RethinkDbRepository) -> None:
super().__init__(channel.results_topic())
self.repository = repository

async def _process_message(self, message: ResultMessage) -> None:
try:
await self.repository.upsert_result(message)
except Exception as e:
logger.error(f"Error consuming message: {e}")

def _get_group_name(self):
return "save-result-to-db"
30 changes: 30 additions & 0 deletions nerdd_backend/actions/update_job_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging

from nerdd_link import Action, Channel, LogMessage

from ..data import RethinkDbRepository

__all__ = ["UpdateJobSize"]

logger = logging.getLogger(__name__)


class UpdateJobSize(Action[LogMessage]):
def __init__(self, channel: Channel, repository: RethinkDbRepository) -> None:
super().__init__(channel.logs_topic())
self.repository = repository

async def _process_message(self, message: LogMessage) -> None:
if message["message_type"] == "report_job_size":
logger.info(f"Update job {message}")

# get job
job = await self.repository.get_job_by_id(message.job_id)

# update job size
job["num_entries_total"] = message["size"]

await self.repository.upsert_job(job)

def _get_group_name(self):
return "update-job-size"
31 changes: 6 additions & 25 deletions nerdd_backend/data/rethinkdb_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ async def get_all_modules(self):
return [item async for item in cursor]

async def get_module_by_id(self, id):
return (
await self.r.db(RETHINKDB_DB).table("modules").get(id).run(self.connection)
)
return await self.r.db(RETHINKDB_DB).table("modules").get(id).run(self.connection)

async def create_module_table(self):
try:
Expand All @@ -66,12 +64,7 @@ async def create_module_table(self):

async def get_most_recent_version(self, module_id):
# TODO: incorporate versioning
return (
await self.r.db(RETHINKDB_DB)
.table("modules")
.get(module_id)
.run(self.connection)
)
return await self.r.db(RETHINKDB_DB).table("modules").get(module_id).run(self.connection)

async def upsert_module(self, module):
# TODO: incorporate versioning
Expand All @@ -97,9 +90,7 @@ async def upsert_module(self, module):
async def create_jobs_table(self):
try:
await (
self.r.db(RETHINKDB_DB)
.table_create("jobs", primary_key="id")
.run(self.connection)
self.r.db(RETHINKDB_DB).table_create("jobs", primary_key="id").run(self.connection)
)
except ReqlOpFailedError:
pass
Expand All @@ -116,9 +107,7 @@ async def get_job_by_id(self, id):
return await self.r.db(RETHINKDB_DB).table("jobs").get(id).run(self.connection)

async def delete_job_by_id(self, id):
await (
self.r.db(RETHINKDB_DB).table("jobs").get(id).delete().run(self.connection)
)
await self.r.db(RETHINKDB_DB).table("jobs").get(id).delete().run(self.connection)

#
# SOURCES
Expand All @@ -142,18 +131,10 @@ async def upsert_source(self, source):
)

async def get_source_by_id(self, id):
return (
await self.r.db(RETHINKDB_DB).table("sources").get(id).run(self.connection)
)
return await self.r.db(RETHINKDB_DB).table("sources").get(id).run(self.connection)

async def delete_source_by_id(self, id):
await (
self.r.db(RETHINKDB_DB)
.table("sources")
.get(id)
.delete()
.run(self.connection)
)
await self.r.db(RETHINKDB_DB).table("sources").get(id).delete().run(self.connection)

#
# RESULTS
Expand Down
5 changes: 0 additions & 5 deletions nerdd_backend/kafka/__init__.py

This file was deleted.

21 changes: 0 additions & 21 deletions nerdd_backend/kafka/kafka_job_consumer.py

This file was deleted.

27 changes: 0 additions & 27 deletions nerdd_backend/kafka/kafka_log_consumer.py

This file was deleted.

20 changes: 0 additions & 20 deletions nerdd_backend/kafka/kafka_module_consumer.py

This file was deleted.

20 changes: 0 additions & 20 deletions nerdd_backend/kafka/kafka_producer.py

This file was deleted.

5 changes: 2 additions & 3 deletions nerdd_backend/lifespan/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .consume_kafka_topic_lifespan import *
from .action_lifespan import *
from .create_module_lifespan import *
from .initialize_database_lifespan import *
from .kafka_producer_lifespan import *
from .initialize_app_lifespan import *
3 changes: 0 additions & 3 deletions nerdd_backend/lifespan/abstract_lifespan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import asyncio
from threading import Thread

__all__ = ["AbstractLifespan"]


Expand Down
19 changes: 19 additions & 0 deletions nerdd_backend/lifespan/action_lifespan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

from .abstract_lifespan import AbstractLifespan

__all__ = ["ActionLifespan"]

logger = logging.getLogger(__name__)


class ActionLifespan(AbstractLifespan):
def __init__(self, action_creator):
super().__init__()
self.action_creator = action_creator

async def start(self, app):
self.action = self.action_creator(app)

async def run(self):
await self.action.run()
66 changes: 0 additions & 66 deletions nerdd_backend/lifespan/consume_kafka_topic_lifespan.py

This file was deleted.

Loading

0 comments on commit 50bc4b8

Please sign in to comment.