Skip to content

Commit

Permalink
Merge pull request #95 from openzim/fix_failures_handling
Browse files Browse the repository at this point in the history
Fix failures handling
  • Loading branch information
benoit74 authored Feb 29, 2024
2 parents a5b0165 + 18c7034 commit 393bcf2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ requires-python = ">=3.12,<3.13"
description = "Make ZIM file from Kolibri Channels"
readme = "README.md"
dependencies = [
"zimscraperlib==3.3.0",
"zimscraperlib==3.3.1",
"kiwixstorage==0.8.3",
"Jinja2==3.1.3",
"pif==0.8.2",
Expand All @@ -22,7 +22,7 @@ kind = "scraper"
additional-keywords = ["kolibri"]

[tool.hatch.build.hooks.openzim-build]
dependencies = [ "zimscraperlib==3.3.0" ] # required for fix_ogv_dist
dependencies = [ "zimscraperlib==3.3.1" ] # required for fix_ogv_dist

[project.optional-dependencies]
scripts = [
Expand Down
39 changes: 24 additions & 15 deletions src/kolibri2zim/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ def read_from_zip(ark, member):
return ark.open(member).read()


def wrap_failure_details(func):
def wrapper(self, item):
try:
node_id, kind = item
return func(self, item)
except Exception as exc:
raise RuntimeError(f"Failed to process {kind} node {node_id}") from exc

return wrapper


class Kolibri2Zim:
def __init__(self, **kwargs):
for option in options:
Expand Down Expand Up @@ -179,13 +190,9 @@ def add_local_files(self, root_path, folder):
def populate_nodes_executor(self):
"""Loop on content nodes to create zim entries from kolibri DB"""

def remove_future(future):
self.nodes_futures.remove(future)

def schedule_node(item):
future = self.nodes_executor.submit(self.add_node, item=item)
self.nodes_futures.add(future)
future.add_done_callback(remove_future)

# schedule root-id
schedule_node((self.db.root["id"], self.db.root["kind"]))
Expand All @@ -195,6 +202,7 @@ def schedule_node(item):
if self.node_ids is None or node["id"] in self.node_ids:
schedule_node((node["id"], node["kind"]))

@wrap_failure_details
def add_node(self, item):
"""process a content node from the tuple in queue"""
node_id, kind = item
Expand Down Expand Up @@ -873,7 +881,7 @@ def run(self):
self.populate_nodes_executor()

# await completion of all futures (nodes and videos)
result = cf.wait(
futures = cf.wait(
self.videos_futures | self.nodes_futures,
return_when=cf.FIRST_EXCEPTION,
)
Expand All @@ -883,19 +891,20 @@ def run(self):
# only awaits future completion and doesn't include callbacks
self.videos_executor.shutdown()

succeeded = (
not result.not_done
and sum([1 if fs.exception() else 0 for fs in result.done]) == 0
nb_done_with_failure = sum(
1 if future.exception() else 0 for future in futures.done
)
succeeded = not futures.not_done and nb_done_with_failure == 0

# DEBUG: raise first exception
if not succeeded and result.done:
logger.info(
f"FAILURE not_done={len(result.not_done)} done={len(result.done)}"
if not succeeded:
logger.warning(
f"FAILURE: not_done={len(futures.not_done)}, "
f"done successfully={len(futures.done) - nb_done_with_failure}, "
f"done with failure={nb_done_with_failure}"
)
for future in result.done:
if future.exception():
raise future.exception() # pyright:ignore
for future in [fut for fut in futures.done if fut.exception()]:
logger.warning("", exc_info=future.exception())
raise Exception("Some nodes have not been processed successfully")
except KeyboardInterrupt:
self.creator.can_finish = False
logger.error("KeyboardInterrupt, exiting.")
Expand Down

0 comments on commit 393bcf2

Please sign in to comment.