Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow endpoint logging 2023 10 10 #1109

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ocrd/ocrd/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def ocrd_cli_wrap_processor(
subcommand=None,
address=None,
queue=None,
log_filename=None,
database=None,
# ocrd_network params end #
**kwargs
Expand Down
1 change: 1 addition & 0 deletions ocrd/ocrd/decorators/ocrd_cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def cli(mets_url):
option('-D', '--dump-module-dir', is_flag=True, default=False),
option('-h', '--help', is_flag=True, default=False),
option('-V', '--version', is_flag=True, default=False),
option('--log-filename', default=None),
# Subcommand, only used for 'worker'/'server'. Cannot be handled in
# click because processors use the @command decorator and even if they
# were using `group`, you cannot combine have a command with
Expand Down
3 changes: 2 additions & 1 deletion ocrd/ocrd/lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ ocrd__parse_argv () {
-I|--input-file-grp) ocrd__argv[input_file_grp]=$2 ; shift ;;
-w|--working-dir) ocrd__argv[working_dir]=$(realpath "$2") ; shift ;;
-m|--mets) ocrd__argv[mets_file]=$(realpath "$2") ; shift ;;
--log-filename) ocrd__argv[log_filename]="$2" ; shift ;;
--mets-server-url) ocrd_argv[mets_server_url]="$2" ; shift ;;
--overwrite) ocrd__argv[overwrite]=true ;;
--profile) ocrd__argv[profile]=true ;;
Expand All @@ -168,7 +169,7 @@ ocrd__parse_argv () {
if ! [ -v ocrd__worker_queue ]; then
ocrd__raise "For the Processing Worker --queue is required"
fi
ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}"
ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" --log-filename "${ocrd__argv[log_filename]}"
elif [ ${ocrd__subcommand} = "server" ]; then
if ! [ -v ocrd__worker_address ]; then
ocrd__raise "For the Processor Server --address is required"
Expand Down
2 changes: 2 additions & 0 deletions ocrd/ocrd/processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None)
--database The MongoDB server address in format
"mongodb://{host}:{port}"
[mongodb://localhost:27018]
--log-filename Filename to redirect STDOUT/STDERR to,
if specified.
'''

processing_server_options = '''\
Expand Down
9 changes: 6 additions & 3 deletions ocrd_network/ocrd_network/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,12 @@ def start_native_processor(
# printed with `echo $!` but it is printed inbetween other output. Because of that I added
# `xyz` before and after the code to easily be able to filter out the pid via regex when
# returning from the function
log_path = f'/tmp/deployed_{processor_name}.log'
stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n")
stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')

# TODO: Check here again
# log_path = f'/tmp/deployed_{processor_name}.log'
# stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n")
# stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
stdin.write(f'{cmd} &\n')
stdin.write('echo xyz$!xyz \n exit \n')
output = stdout.read().decode('utf-8')
stdout.close()
Expand Down
62 changes: 34 additions & 28 deletions ocrd_network/ocrd_network/process_helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
from typing import List, Optional
import logging
from contextlib import nullcontext

from ocrd.processor.helpers import run_cli, run_processor
from .utils import get_ocrd_workspace_instance
from ocrd_utils import redirect_stderr_and_stdout_to_file, initLogging


# A wrapper for run_processor() and run_cli()
Expand All @@ -15,44 +17,48 @@ def invoke_processor(
output_file_grps: List[str],
page_id: str,
parameters: dict,
mets_server_url: Optional[str] = None
mets_server_url: Optional[str] = None,
log_filename : str = None,
) -> None:
if not (processor_class or executable):
raise ValueError(f'Missing processor class and executable')
raise ValueError('Missing processor class and executable')
input_file_grps_str = ','.join(input_file_grps)
output_file_grps_str = ','.join(output_file_grps)

workspace = get_ocrd_workspace_instance(
mets_path=abs_path_to_mets,
mets_server_url=mets_server_url
)
ctx_mgr = redirect_stderr_and_stdout_to_file(log_filename) if log_filename else nullcontext()
with ctx_mgr:
initLogging(force_reinit=True)
workspace = get_ocrd_workspace_instance(
mets_path=abs_path_to_mets,
mets_server_url=mets_server_url
)

if processor_class:
try:
run_processor(
processorClass=processor_class,
if processor_class:
try:
run_processor(
processorClass=processor_class,
workspace=workspace,
input_file_grp=input_file_grps_str,
output_file_grp=output_file_grps_str,
page_id=page_id,
parameter=parameters,
instance_caching=True,
mets_server_url=mets_server_url,
log_level=logging.DEBUG
)
except Exception as e:
raise RuntimeError(f"Python executable '{processor_class.__dict__}' exited with: {e}")
else:
return_code = run_cli(
executable=executable,
workspace=workspace,
mets_url=abs_path_to_mets,
input_file_grp=input_file_grps_str,
output_file_grp=output_file_grps_str,
page_id=page_id,
parameter=parameters,
instance_caching=True,
parameter=json.dumps(parameters),
mets_server_url=mets_server_url,
log_level=logging.DEBUG
)
except Exception as e:
raise RuntimeError(f"Python executable '{processor_class.__dict__}' exited with: {e}")
else:
return_code = run_cli(
executable=executable,
workspace=workspace,
mets_url=abs_path_to_mets,
input_file_grp=input_file_grps_str,
output_file_grp=output_file_grps_str,
page_id=page_id,
parameter=json.dumps(parameters),
mets_server_url=mets_server_url,
log_level=logging.DEBUG
)
if return_code != 0:
raise RuntimeError(f"CLI executable '{executable}' exited with: {return_code}")
if return_code != 0:
raise RuntimeError(f"CLI executable '{executable}' exited with: {return_code}")
22 changes: 14 additions & 8 deletions ocrd_network/ocrd_network/processing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

from datetime import datetime
import logging
from os import getpid
from os import getpid, makedirs

import pika.spec
import pika.adapters.blocking_connection
from pika.exceptions import AMQPConnectionError

from ocrd_utils import initLogging, getLogger
from ocrd_utils import getLogger

from time import sleep

Expand All @@ -43,13 +43,15 @@


class ProcessingWorker:
def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, processor_class=None) -> None:
initLogging()
logging_suffix = f'{processor_name}.{getpid()}'
def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, processor_class=None, log_filename:str=None) -> None:
self.log = getLogger(f'ocrd_network.processing_worker')
file_handler = logging.FileHandler(f'/tmp/ocrd_worker_{logging_suffix}.log', mode='a')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
self.log.addHandler(file_handler)
if not log_filename:
log_filename = f'/tmp/ocrd_worker_{processor_name}.{getpid()}.log'
self.log_filename = log_filename
# TODO: Use that handler once the separate job logs is resolved
# file_handler = logging.FileHandler(log_filename, mode='a')
# file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# self.log.addHandler(file_handler)

try:
verify_database_uri(mongodb_addr)
Expand Down Expand Up @@ -212,13 +214,17 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None:
start_time=start_time
)
try:
# TODO: Refactor the root logging dir for jobs
# makedirs(name='/tmp/ocrd_processing_jobs_logs', exist_ok=True)
# log_filename = f'/tmp/ocrd_processing_jobs_logs/{job_id}.log'
invoke_processor(
processor_class=self.processor_class,
executable=self.processor_name,
abs_path_to_mets=path_to_mets,
input_file_grps=input_file_grps,
output_file_grps=output_file_grps,
page_id=page_id,
log_filename=self.log_filename,
parameters=processing_message.parameters,
mets_server_url=mets_server_url
)
Expand Down
1 change: 1 addition & 0 deletions ocrd_utils/ocrd_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
atomic_write,
pushd_popd,
unzip_file_to_dir,
redirect_stderr_and_stdout_to_file,
)

from .str import (
Expand Down
13 changes: 10 additions & 3 deletions ocrd_utils/ocrd_utils/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
'pushd_popd',
'unzip_file_to_dir',
'atomic_write',
'redirect_stderr_and_stdout_to_file',
]

from tempfile import TemporaryDirectory, gettempdir
from functools import lru_cache
import contextlib
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from distutils.spawn import find_executable as which
from json import loads
from json.decoder import JSONDecodeError
Expand Down Expand Up @@ -44,7 +45,7 @@ def abspath(url):
url = url[len('file://'):]
return abspath_(url)

@contextlib.contextmanager
@contextmanager
def pushd_popd(newcwd=None, tempdir=False):
if newcwd and tempdir:
raise Exception("pushd_popd can accept either newcwd or tempdir, not both")
Expand Down Expand Up @@ -201,7 +202,7 @@ def get_fileobject(self, **kwargs):
chmod(fd, mode)
return f

@contextlib.contextmanager
@contextmanager
def atomic_write(fpath):
with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f:
yield f
Expand Down Expand Up @@ -249,3 +250,9 @@ def guess_media_type(input_file : str, fallback : str = None, application_xml :
if mimetype == 'application/xml':
mimetype = application_xml
return mimetype

@contextmanager
def redirect_stderr_and_stdout_to_file(filename):
with open(filename, 'at', encoding='utf-8') as f:
with redirect_stderr(f), redirect_stdout(f):
yield
13 changes: 13 additions & 0 deletions tests/utils/test_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from pathlib import Path
from os import environ as ENV, getcwd
from os.path import expanduser, join
import sys

from ocrd_utils.os import (
list_resource_candidates,
redirect_stderr_and_stdout_to_file,
guess_media_type,
)

Expand Down Expand Up @@ -45,6 +47,17 @@ def test_guess_media_type(self):
assert guess_media_type(testdata / 'mets-with-metsDocumentID.xml') == 'application/xml'
assert guess_media_type(testdata / 'mets-with-metsDocumentID.xml', application_xml='text/x-mets') == 'text/x-mets'

def test_redirect_stderr_and_stdout_to_file(self):
# TODO test logging is redirected properly without running into
# pytest's capturing intricacies
fname = '/tmp/test-redirect.txt'
Path(fname).write_bytes(b'')
with redirect_stderr_and_stdout_to_file(fname):
print('one')
sys.stdout.write('two\n')
sys.stderr.write('three\n')
print('four', file=sys.stderr)
assert Path(fname).read_text(encoding='utf-8') == 'one\ntwo\nthree\nfour\n'

if __name__ == '__main__':
main(__file__)