Skip to content

Commit

Permalink
Remove multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
bartfeenstra committed Feb 18, 2024
1 parent 8c4d417 commit 2225218
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 269 deletions.
2 changes: 0 additions & 2 deletions betty/_package/pyinstaller/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sys
from multiprocessing import freeze_support

from betty.app import App
from betty.asyncio import sync
Expand All @@ -12,7 +11,6 @@ async def main() -> None:
"""
Launch Betty for PyInstaller builds.
"""
freeze_support()
async with App() as app:
qapp = BettyApplication([sys.argv[0]], app=app)
window = WelcomeWindow(app)
Expand Down
2 changes: 1 addition & 1 deletion betty/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def concurrency(self) -> int:
with suppress(KeyError):
return int(stdos.environ['BETTY_CONCURRENCY'])
# Assume that any machine that runs Betty has at least two CPU cores.
return stdos.cpu_count() or 2
return (stdos.cpu_count() or 2) * 4

@property
def async_concurrency(self) -> int:
Expand Down
23 changes: 12 additions & 11 deletions betty/extension/nginx/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,34 @@
from pathlib import Path
from typing import Any

import dill
import docker
from aiofiles.tempfile import TemporaryDirectory, AiofilesContextManagerTempDir
from docker.errors import DockerException

from betty.app import App
from betty.extension.nginx.artifact import generate_dockerfile_file, generate_configuration_file
from betty.extension.nginx.docker import Container
from betty.project import Project
from betty.serve import NoPublicUrlBecauseServerNotStartedError, AppServer


class DockerizedNginxServer(AppServer):
def __init__(self, app: App) -> None:
super().__init__(
# Create a new app so we can modify it later.
dill.loads(dill.dumps(app))
)
from betty.extension import Nginx

project = Project(ancestry=app.project.ancestry)
project.configuration.autowrite = False
project.configuration.configuration_file_path = app.project.configuration.configuration_file_path
project.configuration.update(app.project.configuration)
project.configuration.debug = True
app = App(app.configuration, project)
# Work around https://github.com/bartfeenstra/betty/issues/1056.
app.extensions[Nginx].configuration.https = False
super().__init__(app)
self._container: Container | None = None
self._output_directory: AiofilesContextManagerTempDir[None, Any, Any] | None = None

async def start(self) -> None:
from betty.extension import Nginx

await super().start()
logging.getLogger(__name__).info('Starting a Dockerized nginx web server...')
self._output_directory = TemporaryDirectory()
Expand All @@ -36,10 +41,6 @@ async def start(self) -> None:
docker_directory_path = Path(output_directory_name)
dockerfile_file_path = docker_directory_path / 'Dockerfile'

self._app.project.configuration.debug = True
# Work around https://github.com/bartfeenstra/betty/issues/1056.
self._app.extensions[Nginx].configuration.https = False

await generate_configuration_file(
self._app,
destination_file_path=nginx_configuration_file_path,
Expand Down
100 changes: 49 additions & 51 deletions betty/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@
import asyncio
import json
import logging
import multiprocessing
import os
import queue
import shutil
import threading
from concurrent.futures import ProcessPoolExecutor, Executor, Future, as_completed
from concurrent.futures import Executor, Future, as_completed, ThreadPoolExecutor
from contextlib import suppress
from ctypes import c_char_p
from multiprocessing.managers import ValueProxy
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import cast, AsyncContextManager, Self, Any, ParamSpec, Callable, Concatenate, MutableSequence
from typing import cast, AsyncContextManager, Self, Any, ParamSpec, Callable, Concatenate

import aiofiles
import dill
from aiofiles.os import makedirs
from aiofiles.threadpool.text import AsyncTextIOWrapper

Expand All @@ -36,7 +33,7 @@
from betty.string import camel_case_to_kebab_case, camel_case_to_snake_case, upper_camel_case_to_lower_camel_case
from betty.task import Context

_GenerationProcessPoolTaskP = ParamSpec('_GenerationProcessPoolTaskP')
_GenerationWorkerPoolTaskP = ParamSpec('_GenerationWorkerPoolTaskP')


def getLogger() -> logging.Logger:
Expand All @@ -54,40 +51,42 @@ async def generate(self, task_context: GenerationContext) -> None:
class GenerationContext(Context):
def __init__(self, app: App):
super().__init__()
self._pickled_app = multiprocessing.Manager().Value(c_char_p, dill.dumps(app))
self._unpickle_app_lock: threading.Lock = multiprocessing.Manager().Lock()
self._app: App | None = None

def __getstate__(self) -> tuple[threading.Lock, MutableSequence[str], ValueProxy[bytes]]:
return self._claims_lock, self._claimed_task_ids, self._pickled_app
self._app = app

def __setstate__(self, state: tuple[threading.Lock, MutableSequence[str], ValueProxy[bytes]]) -> None:
self._claims_lock, self._claimed_task_ids, self._pickled_app = state
self._unpickle_app_lock = multiprocessing.Manager().Lock()
self._app = None
def __reduce__(self) -> tuple[type[GenerationContext], tuple[App]]:
return (

Check warning on line 57 in betty/generate.py

View check run for this annotation

Codecov / codecov/patch

betty/generate.py#L57

Added line #L57 was not covered by tests
GenerationContext,
(
self._app,
),
)

@property
def app(self) -> App:
with self._unpickle_app_lock:
if self._app is None:
self._app = cast(App, dill.loads(self._pickled_app.value))
return self._app


class _GenerationProcessPool:
@dataclass
class _GenerationQueueItem:
callable: Callable[Concatenate[GenerationContext, _GenerationWorkerPoolTaskP], Any]
args: _GenerationWorkerPoolTaskP.args
kwargs: _GenerationWorkerPoolTaskP.kwargs


class _GenerationWorkerPool:
def __init__(self, app: App, task_context: GenerationContext):
self._app = app
self._task_context = task_context
self._queue = multiprocessing.Manager().Queue()
self._cancel = multiprocessing.Manager().Event()
self._finish = multiprocessing.Manager().Event()
self._queue: queue.Queue[_GenerationQueueItem] = queue.Queue()
self._cancel = threading.Event()
self._finish = threading.Event()
self._executor: Executor | None = None
self._workers: list[Future[None]] = []

async def __aenter__(self) -> Self:
self._executor = ProcessPoolExecutor(max_workers=self._app.concurrency)
self._executor = ThreadPoolExecutor(max_workers=self._app.concurrency)
for _ in range(0, self._app.concurrency):
self._workers.append(self._executor.submit(_GenerationProcessPoolWorker(
self._workers.append(self._executor.submit(_GenerationPoolWorker(
self._queue,
self._cancel,
self._finish,
Expand All @@ -113,17 +112,17 @@ async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseExc

def delegate(
self,
task_callable: Callable[Concatenate[GenerationContext, _GenerationProcessPoolTaskP], Any],
*task_args: _GenerationProcessPoolTaskP.args,
**task_kwargs: _GenerationProcessPoolTaskP.kwargs,
task_callable: Callable[Concatenate[GenerationContext, _GenerationWorkerPoolTaskP], Any],
*task_args: _GenerationWorkerPoolTaskP.args,
**task_kwargs: _GenerationWorkerPoolTaskP.kwargs,
) -> None:
self._queue.put((task_callable, task_args, task_kwargs))
self._queue.put(_GenerationQueueItem(task_callable, task_args, task_kwargs))


class _GenerationProcessPoolWorker:
class _GenerationPoolWorker:
def __init__(
self,
task_queue: queue.Queue[tuple[Callable[Concatenate[GenerationContext, _GenerationProcessPoolTaskP], Any], _GenerationProcessPoolTaskP.args, _GenerationProcessPoolTaskP.kwargs]],
task_queue: queue.Queue[_GenerationQueueItem],
cancel: threading.Event,
finish: threading.Event,
async_concurrency: int,
Expand All @@ -137,24 +136,23 @@ def __init__(

@sync
async def __call__(self) -> None:
async with self._context.app:
await gather(*(
self._perform_tasks()
for _ in range(0, self._async_concurrency)
))
await gather(*(
self._perform_tasks()
for _ in range(0, self._async_concurrency)
))

async def _perform_tasks(self) -> None:
while not self._cancel.is_set():
try:
task_callable, task_args, task_kwargs = self._task_queue.get_nowait()
item = self._task_queue.get_nowait()
except queue.Empty:
if self._finish.is_set():
return
else:
await task_callable(
await item.callable(
self._context,
*task_args,
**task_kwargs,
*item.args,
**item.kwargs,
)


Expand All @@ -176,30 +174,30 @@ async def generate(app: App) -> None:

locales = app.project.configuration.locales

async with _GenerationProcessPool(app, task_context) as process_pool:
process_pool.delegate(_generate_dispatch)
process_pool.delegate(_generate_sitemap)
process_pool.delegate(_generate_json_schema)
process_pool.delegate(_generate_openapi)
async with _GenerationWorkerPool(app, task_context) as thread_pool:
thread_pool.delegate(_generate_dispatch)
thread_pool.delegate(_generate_sitemap)
thread_pool.delegate(_generate_json_schema)
thread_pool.delegate(_generate_openapi)

for locale in locales:
process_pool.delegate(_generate_public, locale)
thread_pool.delegate(_generate_public, locale)

for entity_type in app.entity_types:
if not issubclass(entity_type, UserFacingEntity):
continue
if app.project.configuration.entity_types[entity_type].generate_html_list:
for locale in locales:
process_pool.delegate(_generate_entity_type_list_html, locale, entity_type)
process_pool.delegate(_generate_entity_type_list_json, entity_type)
thread_pool.delegate(_generate_entity_type_list_html, locale, entity_type)
thread_pool.delegate(_generate_entity_type_list_json, entity_type)
for entity in app.project.ancestry[entity_type]:
if isinstance(entity.id, GeneratedEntityId):
continue

process_pool.delegate(_generate_entity_json, entity_type, entity.id)
thread_pool.delegate(_generate_entity_json, entity_type, entity.id)
if is_public(entity):
for locale in locales:
process_pool.delegate(_generate_entity_html, locale, entity_type, entity.id)
thread_pool.delegate(_generate_entity_html, locale, entity_type, entity.id)

# Log the generated pages.
for locale in app.project.configuration.locales:
Expand Down
40 changes: 0 additions & 40 deletions betty/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,46 +1061,6 @@ def add_association(
self._associations[owner_type][owner_attr_name][owner_id].append((associate_type, associate_id))


class PickleableEntityGraph(_EntityGraphBuilder):
def __init__(self, *entities: Entity) -> None:
super().__init__()
self._pickled = False
for entity in entities:
self._entities[entity.type][entity.id] = entity

def __getstate__(self) -> tuple[_EntityGraphBuilderEntities, _EntityGraphBuilderAssociations]:
self._flatten()
return self._entities, self._associations

def __setstate__(self, state: tuple[_EntityGraphBuilderEntities, _EntityGraphBuilderAssociations]) -> None:
self._entities, self._associations = state
self._built = False
self._pickled = False

def _flatten(self) -> None:
if self._pickled:
raise RuntimeError('This entity graph has been pickled already.')
self._pickled = True

for owner in self._iter():
unaliased_entity = unalias(owner)
entity_type = unaliased_entity.type

for association in EntityTypeAssociationRegistry.get_all_associations(entity_type):
associates: Iterable[Entity]
if isinstance(association, ToOneEntityTypeAssociation):
associate = association.get(unaliased_entity)
if associate is None:
continue
associates = [associate]
else:
associates = association.get(unaliased_entity)
for associate in associates:
self._associations[entity_type][association.owner_attr_name][owner.id].append(
(associate.type, associate.id),
)


@contextmanager
def record_added(entities: EntityCollection[EntityT]) -> Iterator[MultipleTypesEntityCollection[EntityT]]:
"""
Expand Down
9 changes: 1 addition & 8 deletions betty/model/ancestry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from betty.media_type import MediaType
from betty.model import many_to_many, Entity, one_to_many, many_to_one, many_to_one_to_many, \
MultipleTypesEntityCollection, EntityCollection, UserFacingEntity, EntityTypeAssociationRegistry, \
PickleableEntityGraph, GeneratedEntityId, get_entity_type_name
GeneratedEntityId, get_entity_type_name
from betty.model.event_type import EventType, UnknownEventType
from betty.serde.dump import DictDump, Dump, dump_default
from betty.string import camel_case_to_kebab_case
Expand Down Expand Up @@ -1916,13 +1916,6 @@ def __init__(self):
super().__init__()
self._check_graph = True

def __getstate__(self) -> PickleableEntityGraph:
return PickleableEntityGraph(*self)

def __setstate__(self, state: PickleableEntityGraph) -> None:
self._collections = {}
self.add_unchecked_graph(*state.build())

def add_unchecked_graph(self, *entities: Entity) -> None:
self._check_graph = False
try:
Expand Down
3 changes: 2 additions & 1 deletion betty/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,12 @@ def __init__(
self,
*,
project_id: str | None = None,
ancestry: Ancestry | None = None,
):
super().__init__()
self._id = project_id
self._configuration = ProjectConfiguration()
self._ancestry = Ancestry()
self._ancestry = ancestry or Ancestry()

def __getstate__(self) -> tuple[str | None, VoidableDump, Path, Ancestry]:
return self._id, self._configuration.dump(), self._configuration.configuration_file_path, self._ancestry
Expand Down
11 changes: 5 additions & 6 deletions betty/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

from __future__ import annotations

import multiprocessing
from collections.abc import MutableSequence
import threading


class Context:
def __init__(self):
self._claims_lock = multiprocessing.Manager().Lock()
self._claimed_task_ids: MutableSequence[str] = multiprocessing.Manager().list()
self._claims_lock = threading.Lock()
self._claimed_task_ids: set[str] = set()

def claim(self, task_id: str) -> bool:
with self._claims_lock:
if task_id in self._claimed_task_ids:
return False
self._claimed_task_ids.append(task_id)
return True
self._claimed_task_ids.add(task_id)
return True
Loading

0 comments on commit 2225218

Please sign in to comment.