From e9295822ac83fbcafb8762ac4281c8107812cb26 Mon Sep 17 00:00:00 2001 From: RhettYing Date: Thu, 9 Nov 2023 02:30:34 +0000 Subject: [PATCH] cron check job status --- .github/workflows/continuous_integration.yml | 19 ++- checkJobStatus.py | 131 +++++++++++++++++++ 2 files changed, 147 insertions(+), 3 deletions(-) create mode 100644 checkJobStatus.py diff --git a/.github/workflows/continuous_integration.yml b/.github/workflows/continuous_integration.yml index 53b03ae..e11649c 100644 --- a/.github/workflows/continuous_integration.yml +++ b/.github/workflows/continuous_integration.yml @@ -47,12 +47,25 @@ jobs: python3 -m pip install pytest pip3 install boto3 - name: Submit Job - if: ${{ github.event_name == 'push' }} + id: submit-job shell: bash run: | echo "Start submitting job - Check" python3 ./submitJob.py --job-type CI-CPU --name hello_DGL-pytest-check-'${{ github.ref }}' \ --command "${{ env.COMMAND-PYTEST }}" \ --remote https://github.com/'${{ github.repository }}' \ - --source-ref '${{ github.ref }}' \ - --wait + --source-ref '${{ github.ref }}' + - name: Check batch job status + id: check-job-status + shell: bash + run: | + echo "Start checking job status - Check" + python3 ./checkJobStatus.py --job-id ${{ steps.submit-job.outputs }} \ + --job-name hello_DGL-pytest-check-'${{ github.ref }}' + schedule: # execute every 2 minutes + - cron: '*/2 * * * *' + - name: Exit if job status finished + shell: bash + run: | + echo "Start exiting job - Check" + python3 ./exitJob.py --job-id ${{ steps.submit-job.outputs }} diff --git a/checkJobStatus.py b/checkJobStatus.py new file mode 100644 index 0000000..2458893 --- /dev/null +++ b/checkJobStatus.py @@ -0,0 +1,131 @@ +# script to submit jobs to AWS Batch, queues and definitions are already existing and set up +import argparse +import random +import re +import sys +import time +from datetime import datetime + +import boto3 +from botocore.compat import total_seconds +from botocore.config import Config + + +job_type_info = { + 'CI-CPU': { + 'job_definition': 'hello_dgl', + 'job_queue': 'hello_dgl', + }, +} + +parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + +parser.add_argument('--profile', help='profile name of aws account.', type=str, + default=None) +parser.add_argument('--region', help='Default region when creating new connections', type=str, + default='us-west-2') +parser.add_argument('--name', help='name of the job', type=str, default='dummy') +parser.add_argument('--job-type', help='type of job to submit.', type=str, + choices=job_type_info.keys(), default='CI-CPU') +parser.add_argument('--command', help='command to run', type=str, + default='git rev-parse HEAD | tee stdout.log') +parser.add_argument('--wait', help='block wait until the job completes. ' + 'Non-zero exit code if job fails.', action='store_true') +parser.add_argument('--timeout', help='job timeout in seconds', default=10800, type=int) + +parser.add_argument('--source-ref', + help='ref in hello_DGL main github. e.g. master, refs/pull/500/head', + type=str, default='main') +parser.add_argument('--remote', + help='git repo address. https://github.com/dglai/hello_dgl.git', + type=str, default="https://github.com/dglai/hello_dgl.git") +parser.add_argument("--job-id", help="job id", type=str, default=None) +parser.add_argument("--job-name", help="job name", type=str, default=None) + +args = parser.parse_args() + +print(args) + +session = boto3.Session(profile_name=args.profile, region_name=args.region) +config = Config( + retries = dict( + max_attempts = 5 + ) +) + +batch, cloudwatch = [session.client(service_name=sn, config=config) for sn in ['batch', 'logs']] + +def printLogs(logGroupName, logStreamName, startTime): + kwargs = {'logGroupName': logGroupName, + 'logStreamName': logStreamName, + 'startTime': startTime, + 'startFromHead': True} + + lastTimestamp = startTime - 1 + while True: + logEvents = cloudwatch.get_log_events(**kwargs) + + for event in logEvents['events']: + lastTimestamp = event['timestamp'] + timestamp = datetime.utcfromtimestamp(lastTimestamp / 1000.0).isoformat() + print('[{}] {}'.format((timestamp + '.000')[:23] + 'Z', event['message'])) + + nextToken = logEvents['nextForwardToken'] + if nextToken and kwargs.get('nextToken') != nextToken: + kwargs['nextToken'] = nextToken + else: + break + return lastTimestamp + + +def nowInMillis(): + endTime = int(total_seconds(datetime.utcnow() - datetime(1970, 1, 1))) * 1000 + return endTime + + +def main(): + spin = ['-', '/', '|', '\\', '-', '/', '|', '\\'] + logGroupName = '/aws/batch/job' # This is the group where aws batch logs are stored in Cloudwatch + + # Printing actions parameters + print("GitHub SourceRef: ", args.source_ref) + print("GitHub Remote: ", args.remote) + + jobId = args.job_id + jobName = args.job_name + print(f"Job ID: {jobId}. Job Name: {jobName}") + + spinner = 0 + running = False + status_set = set() + startTime = 0 + logStreamName = None + + describeJobsResponse = batch.describe_jobs(jobs=[jobId]) + status = describeJobsResponse['jobs'][0]['status'] + if status == 'SUCCEEDED' or status == 'FAILED': + if logStreamName: + startTime = printLogs(logGroupName, logStreamName, startTime) + 1 + print('=' * 80) + print('Job [{} - {}] {}'.format(jobName, jobId, status)) + sys.exit(status == 'FAILED') + + elif status == 'RUNNING': + logStreamName = describeJobsResponse['jobs'][0]['container']['logStreamName'] + if not running: + running = True + print('\rJob [{}, {}] is RUNNING.'.format(jobName, jobId)) + if logStreamName: + print('Output [{}]:\n {}'.format(logStreamName, '=' * 80)) + if logStreamName: + startTime = printLogs(logGroupName, logStreamName, startTime) + 1 + elif status not in status_set: + status_set.add(status) + print('\rJob [%s - %s] is %-9s... %s' % (jobName, jobId, status, spin[spinner % len(spin)]),) + sys.stdout.flush() + spinner += 1 + print(f"Job status: {status}") + + +if __name__ == '__main__': + main()