Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the network client #1269

Merged
merged 35 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2927a28
add integration test for client
MehmedGIT Aug 6, 2024
8e7cd3e
fix the test dir path in docker
MehmedGIT Aug 6, 2024
bd16dd7
update network client
MehmedGIT Aug 6, 2024
b2c0675
integration test for client
MehmedGIT Aug 6, 2024
db6e566
Fix flag typo
MehmedGIT Aug 6, 2024
bec81ba
try docker host ip
MehmedGIT Aug 6, 2024
4815896
remove the client server
MehmedGIT Aug 9, 2024
cb3460f
refactor status checks
MehmedGIT Aug 9, 2024
920c1a9
fix test
MehmedGIT Aug 9, 2024
2a843a8
fix: client processing request
MehmedGIT Aug 9, 2024
3a238a7
add: client workflow run
MehmedGIT Aug 9, 2024
50794f9
add timeout and wait to configs
MehmedGIT Aug 9, 2024
cc06fc3
Update src/ocrd_network/client_utils.py
MehmedGIT Aug 12, 2024
4115937
refine status check methods
MehmedGIT Aug 12, 2024
0136db0
add help for new env
MehmedGIT Aug 12, 2024
734bbf0
add cli job status check
MehmedGIT Aug 13, 2024
f86bc23
add: help section to the cli
MehmedGIT Aug 13, 2024
4194f9f
fix: required job id
MehmedGIT Aug 13, 2024
97b3eea
add docstring to cli commands
MehmedGIT Aug 13, 2024
8e7ba26
Fix: rename to block
MehmedGIT Aug 13, 2024
69808b6
Fix: server_utils.py > 404 to 400
MehmedGIT Aug 13, 2024
4de1e83
fix: set ps address if None in constructor
MehmedGIT Aug 13, 2024
d1af85b
fix: check report validation outside try block
MehmedGIT Aug 13, 2024
50f73c5
fix: the annoying string dict
MehmedGIT Aug 13, 2024
8f2861c
add: parameter_override
MehmedGIT Aug 13, 2024
06a371c
add sort to network agents
MehmedGIT Aug 13, 2024
4d85970
add: discovery cli, processors and processor
MehmedGIT Aug 13, 2024
bb3007d
add: check processing job log file
MehmedGIT Aug 13, 2024
ff4243f
fix: exception handling
MehmedGIT Aug 14, 2024
5f746c1
ocrd network client: parse parameters and overrides
kba Aug 14, 2024
8fc8bff
fix parameter parsing again
kba Aug 14, 2024
d73cfaa
Merge pull request #1270 from OCR-D/fix-parsing
MehmedGIT Aug 20, 2024
15cea57
:memo: changelog
kba Aug 22, 2024
18d743a
Merge branch 'master' into extend-network-client
kba Aug 22, 2024
6608539
refactor client cli: process -> run
MehmedGIT Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ network-module-test: assets
INTEGRATION_TEST_IN_DOCKER = docker exec core_test
network-integration-test:
$(DOCKER_COMPOSE) --file tests/network/docker-compose.yml up -d
-$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="$(TESTDIR)/network/*ocrd_all*.py"
-$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="tests/network/*ocrd_all*.py"
kba marked this conversation as resolved.
Show resolved Hide resolved
$(DOCKER_COMPOSE) --file tests/network/docker-compose.yml down --remove-orphans

network-integration-test-cicd:
Expand Down
68 changes: 43 additions & 25 deletions src/ocrd_network/cli/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import click
from json import dumps, loads
from typing import Optional

from ocrd.decorators import parameter_option
from ocrd_network import Client
from ocrd_utils import DEFAULT_METS_BASENAME
from ..client import Client


@click.group('client')
Expand Down Expand Up @@ -42,26 +42,28 @@ def processing_cli():
@click.option('--result-queue-name')
@click.option('--callback-url')
@click.option('--agent-type', default='worker')
def send_processing_request(
address: Optional[str],
processor_name: str,
mets: str,
input_file_grp: str,
output_file_grp: Optional[str],
page_id: Optional[str],
parameter: Optional[dict],
result_queue_name: Optional[str],
callback_url: Optional[str],
# TODO: This is temporally available to toggle
# between the ProcessingWorker/ProcessorServer
agent_type: Optional[str]
@click.option('-b', '--block-till-job-end', default=False)
def send_processing_job_request(
address: Optional[str],
processor_name: str,
mets: str,
input_file_grp: str,
output_file_grp: Optional[str],
page_id: Optional[str],
parameter: Optional[dict],
result_queue_name: Optional[str],
callback_url: Optional[str],
# TODO: This is temporally available to toggle
# between the ProcessingWorker/ProcessorServer
agent_type: Optional[str],
block_till_job_end: Optional[bool]
):
req_params = {
"path_to_mets": mets,
"description": "OCR-D Network client request",
"input_file_grps": input_file_grp.split(','),
"parameters": parameter if parameter else {},
"agent_type": agent_type,
"agent_type": agent_type
}
if output_file_grp:
req_params["output_file_grps"] = output_file_grp.split(',')
Expand All @@ -71,16 +73,13 @@ def send_processing_request(
req_params["result_queue_name"] = result_queue_name
if callback_url:
req_params["callback_url"] = callback_url

client = Client(
server_addr_processing=address
)
response = client.send_processing_request(
processor_name=processor_name,
req_params=req_params
)
processing_job_id = response.get('job_id', None)
client = Client(server_addr_processing=address)
processing_job_id = client.send_processing_job_request(
processor_name=processor_name, req_params=loads(dumps(req_params)))
assert processing_job_id
print(f"Processing job id: {processing_job_id}")
if block_till_job_end:
client.poll_job_status_till_timeout_fail_or_success(job_id=processing_job_id)


@client_cli.group('workflow')
Expand All @@ -91,6 +90,25 @@ def workflow_cli():
pass


@processing_cli.command('run')
@click.option('--address')
@click.option('-m', '--path-to-mets', required=True)
@click.option('-w', '--path-to-workflow', required=True)
@click.option('-b', '--block-till-job-end', default=False)
def send_workflow_job_request(
address: Optional[str],
path_to_mets: str,
path_to_workflow: str,
block_till_job_end: Optional[bool]
):
client = Client(server_addr_processing=address)
workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets)
assert workflow_job_id
print(f"Workflow job id: {workflow_job_id}")
if block_till_job_end:
client.poll_wf_status_till_timeout_fail_or_success(job_id=workflow_job_id)


@client_cli.group('workspace')
def workspace_cli():
"""
Expand Down
50 changes: 26 additions & 24 deletions src/ocrd_network/client.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
from json import dumps, loads
from requests import post as requests_post
from ocrd_utils import config, getLogger, LOG_FORMAT
from .client_utils import (
poll_job_status_till_timeout_fail_or_success,
poll_wf_status_till_timeout_fail_or_success,
post_ps_processing_request,
post_ps_workflow_request,
verify_server_protocol
)

from .constants import NETWORK_PROTOCOLS


# TODO: This is just a conceptual implementation and first try to
# trigger further discussions on how this should look like.
class Client:
def __init__(
self,
server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING,
server_addr_workflow: str = config.OCRD_NETWORK_SERVER_ADDR_WORKFLOW,
server_addr_workspace: str = config.OCRD_NETWORK_SERVER_ADDR_WORKSPACE
timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT,
wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP
):
self.log = getLogger(f"ocrd_network.client")
self.server_addr_processing = server_addr_processing
self.server_addr_workflow = server_addr_workflow
self.server_addr_workspace = server_addr_workspace

def send_processing_request(self, processor_name: str, req_params: dict):
verify_server_protocol(self.server_addr_processing)
req_url = f"{self.server_addr_processing}/processor/{processor_name}"
req_headers = {"Content-Type": "application/json; charset=utf-8"}
req_json = loads(dumps(req_params))
self.log.info(f"Sending processing request to: {req_url}")
self.log.debug(req_json)
response = requests_post(url=req_url, headers=req_headers, json=req_json)
return response.json()
self.polling_timeout = timeout
self.polling_wait = wait
self.polling_tries = int(timeout/wait)

def poll_job_status_till_timeout_fail_or_success(self, job_id: str) -> str:
return poll_job_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved

def poll_wf_status_till_timeout_fail_or_success(self, job_id: str) -> str:
return poll_wf_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)

def send_processing_job_request(self, processor_name: str, req_params: dict) -> str:
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)

def verify_server_protocol(address: str):
for protocol in NETWORK_PROTOCOLS:
if address.startswith(protocol):
return
raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}")
def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)
81 changes: 81 additions & 0 deletions src/ocrd_network/client_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from requests import get as request_get, post as request_post
from time import sleep
from .constants import JobState, NETWORK_PROTOCOLS


def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int):
if job_type not in ["workflow", "processor"]:
raise ValueError("Unknown job type, expected 'workflow' or 'processor'")
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
job_state = JobState.unset
while tries > 0:
sleep(wait)
if job_type == "processor":
job_state = get_ps_processing_job_status(ps_server_host, job_id)
if job_type == "workflow":
job_state = get_ps_workflow_job_status(ps_server_host, job_id)
if job_state == JobState.success or job_state == JobState.failed:
break
tries -= 1
return job_state


def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait)


def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait)


def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state


def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state


def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
request_url = f"{ps_server_host}/processor/run/{processor}"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
json=job_input
)
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
processing_job_id = response.json()["job_id"]
assert processing_job_id
return processing_job_id


# TODO: Can be extended to include other parameters such as page_wise
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
files={"workflow": open(path_to_wf, "rb")}
)
# print(response.json())
# print(response.__dict__)
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
wf_job_id = response.json()["job_id"]
assert wf_job_id
return wf_job_id


def verify_server_protocol(address: str):
for protocol in NETWORK_PROTOCOLS:
if address.startswith(protocol):
return
raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}")
10 changes: 10 additions & 0 deletions src/ocrd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ def _ocrd_download_timeout_parser(val):
description="Default address of Processing Server to connect to (for `ocrd network client processing`).",
default=(True, ''))

config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT",
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
description="Timeout for a blocking ocrd network client (seconds)",
parser=int,
default=(True, 3600))

config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP",
description="How many seconds to sleep before trying again (seconds)",
parser=int,
default=(True, 30))

config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW",
description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).",
default=(True, ''))
Expand Down
14 changes: 14 additions & 0 deletions tests/network/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@
parser=_ocrd_download_timeout_parser
)

test_config.add(
"OCRD_NETWORK_CLIENT_POLLING_TIMEOUT",
description="Timeout for a blocking ocrd network client",
parser=int,
default=(True, 3600)
)

test_config.add(
"OCRD_NETWORK_CLIENT_POLLING_SLEEP",
description="How many seconds to sleep before trying again (seconds)",
parser=int,
default=(True, 30)
)

test_config.add(
name="OCRD_NETWORK_SERVER_ADDR_PROCESSING",
description="Default address of Processing Server to connect to (for `ocrd network client processing`).",
Expand Down
15 changes: 7 additions & 8 deletions tests/network/test_integration_5_processing_server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from pathlib import Path
from requests import get as request_get
from src.ocrd_network.client_utils import (
poll_job_status_till_timeout_fail_or_success, poll_wf_status_till_timeout_fail_or_success,
post_ps_processing_request, post_ps_workflow_request)
from src.ocrd_network.constants import AgentType, JobState
from src.ocrd_network.logging_utils import get_processing_job_logging_file_path
from tests.base import assets
from tests.network.config import test_config
from tests.network.utils import poll_till_timeout_fail_or_success, post_ps_processing_request, post_ps_workflow_request


PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL

Expand Down Expand Up @@ -40,10 +43,8 @@ def test_processing_server_processing_request():
"parameters": {}
}
test_processor = "ocrd-dummy"
processing_job_id = post_ps_processing_request(PROCESSING_SERVER_URL, test_processor, test_processing_job_input)
job_state = poll_till_timeout_fail_or_success(
test_url=f"{PROCESSING_SERVER_URL}/processor/job/{processing_job_id}", tries=10, wait=10
)
process_job_id = post_ps_processing_request(PROCESSING_SERVER_URL, test_processor, test_processing_job_input)
job_state = poll_job_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, process_job_id, tries=10, wait=10)
assert job_state == JobState.success

# Check the existence of the results locally
Expand All @@ -58,9 +59,7 @@ def test_processing_server_workflow_request():
workspace_root = "kant_aufklaerung_1784/data"
path_to_mets = assets.path_to(f"{workspace_root}/mets.xml")
wf_job_id = post_ps_workflow_request(PROCESSING_SERVER_URL, path_to_dummy_wf, path_to_mets)
job_state = poll_till_timeout_fail_or_success(
test_url=f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}", tries=30, wait=10
)
job_state = poll_wf_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, wf_job_id, tries=10, wait=10)
assert job_state == JobState.success

# Check the existence of the results locally
Expand Down
38 changes: 38 additions & 0 deletions tests/network/test_integration_6_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pathlib import Path
from src.ocrd_network.constants import AgentType, JobState
from tests.base import assets
from tests.network.config import test_config
from ocrd_network.client import Client

PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL
timeout = test_config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT
wait = test_config.OCRD_NETWORK_CLIENT_POLLING_SLEEP


def test_client_processing_processor():
workspace_root = "kant_aufklaerung_1784/data"
path_to_mets = assets.path_to(f"{workspace_root}/mets.xml")
client = Client(PROCESSING_SERVER_URL, timeout, wait)
req_params = {
"path_to_mets": path_to_mets,
"description": "OCR-D Network client request",
"input_file_grps": ["OCR-D-IMG"],
"output_file_grps": ["OCR-D-DUMMY-TEST-CLIENT"],
"parameters": {},
"agent_type": AgentType.PROCESSING_WORKER
}
processing_job_id = client.send_processing_job_request(processor_name="ocrd-dummy", req_params=req_params)
assert processing_job_id
print(f"Processing job id: {processing_job_id}")
assert JobState.success == client.poll_job_status_till_timeout_fail_or_success(processing_job_id)


def test_client_processing_workflow():
workspace_root = "kant_aufklaerung_1784/data"
path_to_mets = assets.path_to(f"{workspace_root}/mets.xml")
# TODO: Improve the path resolution
path_to_dummy_wf = f"{Path(__file__).parent.resolve()}/dummy-workflow.txt"
client = Client(PROCESSING_SERVER_URL, timeout, wait)
wf_job_id = client.send_workflow_job_request(path_to_dummy_wf, path_to_mets)
print(f"Workflow job id: {wf_job_id}")
assert JobState.success == client.poll_wf_status_till_timeout_fail_or_success(wf_job_id)
9 changes: 3 additions & 6 deletions tests/network/test_integration_ocrd_all.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from src.ocrd_network.client_utils import poll_wf_status_till_timeout_fail_or_success, post_ps_workflow_request
from src.ocrd_network.constants import JobState
from tests.network.config import test_config
from tests.network.utils import poll_till_timeout_fail_or_success, post_ps_workflow_request


PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL

Expand All @@ -11,9 +12,5 @@ def test_ocrd_all_workflow():
path_to_wf = "/ocrd-data/assets/ocrd_all-test-workflow.txt"
path_to_mets = "/data/mets.xml"
wf_job_id = post_ps_workflow_request(PROCESSING_SERVER_URL, path_to_wf, path_to_mets)
job_state = poll_till_timeout_fail_or_success(
test_url=f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}",
tries=30,
wait=10
)
job_state = poll_wf_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, wf_job_id, tries=30, wait=10)
assert job_state == JobState.success
Loading
Loading