From b5403c150414728a3417659a8a321eb9f72e8508 Mon Sep 17 00:00:00 2001 From: utmhikari <361914599@qq.com> Date: Sun, 6 Dec 2020 18:30:50 +0800 Subject: [PATCH] update fastapi version, add run_in_processpool api --- application/concurrency.py | 32 ++++++++++++++++++++++++++++++++ controller/item.py | 13 ++++++++++++- requirements.txt | 28 +++++++++++++++------------- service/item.py | 21 ++++++++++++++++++++- 4 files changed, 79 insertions(+), 15 deletions(-) create mode 100644 application/concurrency.py diff --git a/application/concurrency.py b/application/concurrency.py new file mode 100644 index 0000000..2b7fdff --- /dev/null +++ b/application/concurrency.py @@ -0,0 +1,32 @@ +""" +Concurrency module +""" +import asyncio +from concurrent.futures import ProcessPoolExecutor +import functools +import typing +from application import logger +import pprint + +LOGGER = logger.get_application_logger() + +T = typing.TypeVar('T') + +PROCESS_POOL_EXECUTOR = ProcessPoolExecutor() + + +async def run_in_processpool(func: typing.Callable[..., T], + *args: typing.Any, + **kwargs: typing.Any) -> T: + """ + run in process pool executor + :param func: function + :param args: args + :param kwargs: keyword args + :return: + """ + LOGGER.debug('Run in processpool with func: %s, args: %s, kwargs: %s' % + (pprint.pformat(func), pprint.pformat(args), pprint.pformat(kwargs))) + loop = asyncio.get_event_loop() + f = functools.partial(func, **kwargs) + return await loop.run_in_executor(None, f, *args) diff --git a/controller/item.py b/controller/item.py index 95eef4f..2efa8d8 100644 --- a/controller/item.py +++ b/controller/item.py @@ -50,7 +50,7 @@ def update_item(item_id: int, item: Item): return success(item) -@router.get('/v1/items/print') +@router.get('/v1/items/print/instant') def print_items(background_tasks: BackgroundTasks): """ print item info in background @@ -58,3 +58,14 @@ def print_items(background_tasks: BackgroundTasks): """ background_tasks.add_task(item_service.print_items_one_by_one, 1) return success(msg='Printing items now~') + + +@router.get('/v1/items/print/delay') +def delay_print_items(background_tasks: BackgroundTasks): + """ + print item info in delayed mode + :param background_tasks: + :return: + """ + background_tasks.add_task(item_service.print_items_one_by_one_in_another_process) + return success(msg='launched print items delayed!') diff --git a/requirements.txt b/requirements.txt index d8f06ab..ae3ef94 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,27 @@ atomicwrites==1.4.0 -attrs==19.3.0 -certifi==2020.6.20 +attrs==20.3.0 +certifi==2020.12.5 chardet==3.0.4 click==7.1.2 -colorama==0.4.3 -fastapi==0.61.1 -h11==0.9.0 +colorama==0.4.4 +fastapi==0.62.0 +h11==0.11.0 idna==2.10 -more-itertools==8.4.0 -packaging==20.4 +iniconfig==1.0.1 +more-itertools==8.6.0 +packaging==20.7 pluggy==0.13.1 py==1.9.0 -pydantic==1.6.1 +pydantic==1.7.3 pyparsing==2.4.7 -pytest==6.1.1 -python-dotenv==0.14.0 +pytest==6.1.2 +python-dotenv==0.15.0 python-multipart==0.0.5 -requests==2.24.0 +requests==2.25.0 six==1.15.0 starlette==0.13.6 -urllib3==1.25.10 -uvicorn==0.11.8 +toml==0.10.1 +urllib3==1.26.2 +uvicorn==0.12.3 wcwidth==0.2.5 websockets==8.1 diff --git a/service/item.py b/service/item.py index c2a425f..89897dd 100644 --- a/service/item.py +++ b/service/item.py @@ -1,7 +1,9 @@ from typing import Dict, List from model.item import Item from application.logger import get_service_logger -import time, pprint +import time +import pprint +from application import concurrency LOGGER = get_service_logger('ITEM') @@ -42,7 +44,24 @@ def update_item(item_id: int, item: Item) -> bool: def print_items_one_by_one(interval: int = 3): if interval <= 0: interval = 3 + LOGGER.info('Start printing items!') for item_id, item_info in _ITEMS.items(): LOGGER.info('Item %d: %s' % (item_id, pprint.pformat(item_info))) time.sleep(interval) LOGGER.info('Finish printing items!') + + +def __print_single_item(tag: str = 'UNKNOWN', item: Item = Item(name='', price=0.0)): + LOGGER.info('[%s] %s' % (tag, item.json())) + + +async def print_items_one_by_one_in_another_process(interval: int = 3): + if interval <= 0: + interval = 3 + LOGGER.info('Start printing items!') + for item_id, item_info in _ITEMS.items(): + await concurrency.run_in_processpool(__print_single_item, + str(item_id), + item=item_info) + time.sleep(interval) + LOGGER.info('Finish printing items!')