Skip to content

Commit aaaf7e4

Browse files
committed
feat(cli): add --follow flag to logs command (reanahub#729)
1 parent 79d0483 commit aaaf7e4

File tree

4 files changed

+324
-92
lines changed

4 files changed

+324
-92
lines changed

AUTHORS.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The list of contributors in alphabetical order:
1515
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
1616
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
1717
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
18+
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
1819
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
1920
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
2021
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)

reana_client/cli/utils.py

+147
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@
99

1010
import functools
1111
import json
12+
import logging
1213
import os
1314
import shlex
1415
import sys
16+
import time
17+
import traceback
1518
from typing import Callable, NoReturn, Optional, List, Tuple, Union
1619

1720
import click
1821
import tablib
1922

23+
from reana_commons.config import REANA_COMPUTE_BACKENDS
2024
from reana_commons.utils import click_table_printer
2125

2226
from reana_client.config import (
@@ -409,3 +413,146 @@ def output_user_friendly_logs(workflow_logs, steps):
409413
f"Step {job_name_or_id} emitted no logs.",
410414
msg_type="info",
411415
)
416+
417+
418+
def retrieve_workflow_logs(
419+
workflow,
420+
access_token,
421+
json_format,
422+
filters=None,
423+
page=None,
424+
size=None,
425+
): # noqa: D301
426+
"""Retrieve workflow logs."""
427+
from reana_client.api.client import get_workflow_logs
428+
429+
available_filters = {
430+
"step": "job_name",
431+
"compute_backend": "compute_backend",
432+
"docker_img": "docker_img",
433+
"status": "status",
434+
}
435+
steps = []
436+
chosen_filters = dict()
437+
438+
if filters:
439+
try:
440+
for f in filters:
441+
key, value = f.split("=")
442+
if key not in available_filters:
443+
display_message(
444+
"Filter '{}' is not valid.\n"
445+
"Available filters are '{}'.".format(
446+
key,
447+
"' '".join(sorted(available_filters.keys())),
448+
),
449+
msg_type="error",
450+
)
451+
sys.exit(1)
452+
elif key == "step":
453+
steps.append(value)
454+
else:
455+
# Case insensitive for compute backends
456+
if (
457+
key == "compute_backend"
458+
and value.lower() in REANA_COMPUTE_BACKENDS
459+
):
460+
value = REANA_COMPUTE_BACKENDS[value.lower()]
461+
elif key == "status" and value not in RUN_STATUSES:
462+
display_message(
463+
"Input status value {} is not valid. ".format(value),
464+
msg_type="error",
465+
),
466+
sys.exit(1)
467+
chosen_filters[key] = value
468+
except Exception as e:
469+
logging.debug(traceback.format_exc())
470+
logging.debug(str(e))
471+
display_message(
472+
"Please provide complete --filter name=value pairs, "
473+
"for example --filter status=running.\n"
474+
"Available filters are '{}'.".format(
475+
"' '".join(sorted(available_filters.keys()))
476+
),
477+
msg_type="error",
478+
)
479+
sys.exit(1)
480+
481+
response = get_workflow_logs(
482+
workflow,
483+
access_token,
484+
steps=None if not steps else list(set(steps)),
485+
page=page,
486+
size=size,
487+
)
488+
workflow_logs = json.loads(response["logs"])
489+
if filters:
490+
for key, value in chosen_filters.items():
491+
unwanted_steps = [
492+
k
493+
for k, v in workflow_logs["job_logs"].items()
494+
if v[available_filters[key]] != value
495+
]
496+
for job_id in unwanted_steps:
497+
del workflow_logs["job_logs"][job_id]
498+
499+
if json_format:
500+
display_message(json.dumps(workflow_logs, indent=2))
501+
sys.exit(0)
502+
else:
503+
from reana_client.cli.utils import output_user_friendly_logs
504+
505+
output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps)))
506+
507+
508+
def follow_workflow_logs(
509+
workflow,
510+
access_token,
511+
interval,
512+
step=None,
513+
): # noqa: D301
514+
"""Continuously poll for workflow or job logs."""
515+
from reana_client.api.client import get_workflow_logs, get_workflow_status
516+
517+
msg = f"Following logs for workflow: {workflow}"
518+
if step:
519+
msg += f", step: {step}"
520+
display_message(msg, "info")
521+
522+
previous_logs = ""
523+
524+
while True:
525+
response = get_workflow_logs(
526+
workflow,
527+
access_token,
528+
steps=None if not step else [step],
529+
).get("logs")
530+
531+
if step:
532+
jobs = json.loads(response)["job_logs"]
533+
534+
if not jobs:
535+
raise Exception(f"Step data not found: {step}")
536+
537+
job = next(
538+
iter(json.loads(response)["job_logs"].values())
539+
) # get values of the first job
540+
logs = job["logs"]
541+
status = job["status"]
542+
else:
543+
logs = json.loads(response)["workflow_logs"]
544+
status = get_workflow_status(workflow, access_token).get("status")
545+
546+
previous_lines = previous_logs.splitlines()
547+
new_lines = logs.splitlines()
548+
549+
diff = "\n".join([x for x in new_lines if x not in previous_lines])
550+
if diff != "" and diff != "\n":
551+
display_message(diff)
552+
553+
if status in ["finished", "failed", "stopped", "deleted"]:
554+
display_message("")
555+
display_message(f"Finished, status: {status}", "info")
556+
return
557+
previous_logs = logs
558+
time.sleep(interval)

reana_client/cli/workflow.py

+66-92
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
key_value_to_dict,
3232
parse_filter_parameters,
3333
requires_environments,
34+
retrieve_workflow_logs,
35+
follow_workflow_logs,
3436
)
3537
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK
3638
from reana_client.printer import display_message
@@ -47,7 +49,7 @@
4749
validate_input_parameters,
4850
validate_workflow_name_parameter,
4951
)
50-
from reana_commons.config import INTERACTIVE_SESSION_TYPES, REANA_COMPUTE_BACKENDS
52+
from reana_commons.config import INTERACTIVE_SESSION_TYPES
5153
from reana_commons.errors import REANAValidationError
5254
from reana_commons.validation.operational_options import validate_operational_options
5355

@@ -886,6 +888,30 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data):
886888
multiple=True,
887889
help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.",
888890
)
891+
@click.option(
892+
"--follow",
893+
"follow",
894+
is_flag=True,
895+
default=False,
896+
help="Follow the logs of the of running workflow or job (similar to `tail -f`). "
897+
"If workflow or job finishes running, the command exits.",
898+
)
899+
@click.option(
900+
"-s",
901+
"--step",
902+
"step",
903+
help="Step name to follow logs for. "
904+
"If flag is supplied, command follows a specified job logs. "
905+
"If it is not supplied, command follows workflow logs. "
906+
"If --follow flag is not supplied, this flag is ignored.",
907+
)
908+
@click.option(
909+
"-i",
910+
"--interval",
911+
"interval",
912+
default=10,
913+
help="Sleep time in seconds between log polling if log following is enabled. [default=10]",
914+
)
889915
@add_pagination_options
890916
@check_connection
891917
@click.pass_context
@@ -894,115 +920,63 @@ def workflow_logs(
894920
workflow,
895921
access_token,
896922
json_format,
897-
steps=None,
923+
follow,
924+
interval,
898925
filters=None,
899926
page=None,
900927
size=None,
928+
step=None,
901929
): # noqa: D301
902930
"""Get workflow logs.
903931
904-
The ``logs`` command allows to retrieve logs of running workflow. Note that
905-
only finished steps of the workflow are returned, the logs of the currently
906-
processed step is not returned until it is finished.
932+
The ``logs`` command allows to retrieve logs of a running workflow.
933+
Either retrive logs and print the result or follow the logs of a running workflow/job.
907934
908935
Examples:\n
909936
\t $ reana-client logs -w myanalysis.42
910-
\t $ reana-client logs -w myanalysis.42 -s 1st_step
937+
\t $ reana-client logs -w myanalysis.42 --json
938+
\t $ reana-client logs -w myanalysis.42 --filter status=running
939+
\t $ reana-client logs -w myanalysis.42 --step 1st_step --follow
911940
"""
912-
from reana_client.api.client import get_workflow_logs
913-
914-
available_filters = {
915-
"step": "job_name",
916-
"compute_backend": "compute_backend",
917-
"docker_img": "docker_img",
918-
"status": "status",
919-
}
920-
steps = []
921-
chosen_filters = dict()
922-
923941
logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
924942
for p in ctx.params:
925943
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
926-
if workflow:
927-
if filters:
928-
try:
929-
for f in filters:
930-
key, value = f.split("=")
931-
if key not in available_filters:
932-
display_message(
933-
"Filter '{}' is not valid.\n"
934-
"Available filters are '{}'.".format(
935-
key,
936-
"' '".join(sorted(available_filters.keys())),
937-
),
938-
msg_type="error",
939-
)
940-
sys.exit(1)
941-
elif key == "step":
942-
steps.append(value)
943-
else:
944-
# Case insensitive for compute backends
945-
if (
946-
key == "compute_backend"
947-
and value.lower() in REANA_COMPUTE_BACKENDS
948-
):
949-
value = REANA_COMPUTE_BACKENDS[value.lower()]
950-
elif key == "status" and value not in RUN_STATUSES:
951-
display_message(
952-
"Input status value {} is not valid. ".format(value),
953-
msg_type="error",
954-
),
955-
sys.exit(1)
956-
chosen_filters[key] = value
957-
except Exception as e:
958-
logging.debug(traceback.format_exc())
959-
logging.debug(str(e))
960-
display_message(
961-
"Please provide complete --filter name=value pairs, "
962-
"for example --filter status=running.\n"
963-
"Available filters are '{}'.".format(
964-
"' '".join(sorted(available_filters.keys()))
965-
),
966-
msg_type="error",
967-
)
968-
sys.exit(1)
969-
try:
970-
response = get_workflow_logs(
944+
945+
if step and not follow:
946+
display_message(
947+
"Ignoring --step as it can only be used together with --follow.",
948+
msg_type="warning",
949+
)
950+
if filters and follow:
951+
display_message(
952+
"Ignoring --filters as it cannot be used together with --follow.",
953+
msg_type="warning",
954+
)
955+
if json_format and follow:
956+
display_message(
957+
"Ignoring --json as it cannot be used together with --follow.",
958+
msg_type="warning",
959+
)
960+
try:
961+
if follow:
962+
follow_workflow_logs(workflow, access_token, interval, step)
963+
else:
964+
retrieve_workflow_logs(
971965
workflow,
972966
access_token,
973-
steps=None if not steps else list(set(steps)),
967+
json_format,
968+
filters=filters,
974969
page=page,
975970
size=size,
976971
)
977-
workflow_logs = json.loads(response["logs"])
978-
if filters:
979-
for key, value in chosen_filters.items():
980-
unwanted_steps = [
981-
k
982-
for k, v in workflow_logs["job_logs"].items()
983-
if v[available_filters[key]] != value
984-
]
985-
for job_id in unwanted_steps:
986-
del workflow_logs["job_logs"][job_id]
987-
988-
if json_format:
989-
display_message(json.dumps(workflow_logs, indent=2))
990-
sys.exit(0)
991-
else:
992-
from reana_client.cli.utils import output_user_friendly_logs
993-
994-
output_user_friendly_logs(
995-
workflow_logs, None if not steps else list(set(steps))
996-
)
997-
except Exception as e:
998-
logging.debug(traceback.format_exc())
999-
logging.debug(str(e))
1000-
display_message(
1001-
"Cannot retrieve the logs of a workflow {}: \n"
1002-
"{}".format(workflow, str(e)),
1003-
msg_type="error",
1004-
)
1005-
sys.exit(1)
972+
except Exception as e:
973+
logging.debug(traceback.format_exc())
974+
logging.debug(str(e))
975+
display_message(
976+
"Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)),
977+
msg_type="error",
978+
)
979+
sys.exit(1)
1006980

1007981

1008982
@workflow_execution_group.command("validate")

0 commit comments

Comments
 (0)