Skip to content

Commit

Permalink
Merge pull request #17 from cloudblue/LITE-24285
Browse files Browse the repository at this point in the history
LITE-24285: Added async execution capabilities
  • Loading branch information
ffaraone authored Aug 12, 2022
2 parents fcfef0d + a5011ff commit 829e298
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 270 deletions.
54 changes: 41 additions & 13 deletions executor/executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import inspect
import logging
import sys
from datetime import datetime

import pytz
from connect.client import ClientError, ConnectClient
from connect.client import AsyncConnectClient, ClientError, ConnectClient
from connect.reports.constants import REPORTS_ENV
from connect.reports.datamodels import Account, Report
from connect.reports.renderers import get_renderer
Expand Down Expand Up @@ -72,20 +74,30 @@ def normalize_parameters(connect_parameters):
return parameters


async def execute_report_async(entrypoint, args, renderer, output_file):
if inspect.iscoroutinefunction(entrypoint):
data = await entrypoint(*args)
else:
data = entrypoint(*args)
return await renderer.render_async(
data,
output_file,
start_time=datetime.now(tz=pytz.utc),
)


def _run_render(is_async, entrypoint, args, renderer, output_file):
if is_async:
return asyncio.run(execute_report_async(entrypoint, args, renderer, output_file))
else:
data = entrypoint(*args)
return renderer.render(data, output_file, start_time=datetime.now(tz=pytz.utc))


def execute_report(control_client, report_definition, connect_report): # noqa: CCR001
report_env = get_report_env()
reports_dir = get_default_reports_dir()

report_client = ConnectClient(
endpoint=report_env["api_endpoint"],
use_specs=False,
api_key=report_env["client_token"],
max_retries=5,
default_limit=500,
default_headers=get_user_agent(),
timeout=(360, 360),
resourceset_append=False,
)
connect_parameters = connect_report.get('parameters', [])
parameters = normalize_parameters(connect_parameters)

Expand Down Expand Up @@ -120,6 +132,23 @@ def progress(current_value, max_value):
logger.exception('An error ocurred while importing report entrypoint.')
handle_preparation_exception(e, control_client)

is_async = (
inspect.isasyncgenfunction(report_entry_point)
or inspect.iscoroutinefunction(report_entry_point)
)

client_class = AsyncConnectClient if is_async else ConnectClient
report_client = client_class(
endpoint=report_env["api_endpoint"],
use_specs=False,
api_key=report_env["client_token"],
max_retries=5,
default_limit=500,
default_headers=get_user_agent(),
timeout=(360, 360),
resourceset_append=False,
)

renderer_id = connect_report['renderer']
renderer_definition = next(
filter(
Expand Down Expand Up @@ -151,8 +180,7 @@ def progress(current_value, max_value):
renderer.set_extra_context,
],
)
data = report_entry_point(*args)
return renderer.render(data, '/report', start_time=datetime.now(tz=pytz.utc))
return _run_render(is_async, report_entry_point, args, renderer, '/report')
except Exception as e:
handle_exception(e, control_client, connect_report)

Expand Down
Loading

0 comments on commit 829e298

Please sign in to comment.