-
Notifications
You must be signed in to change notification settings - Fork 8
/
deploy_flow.py
executable file
·76 lines (60 loc) · 1.82 KB
/
deploy_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python
import argparse
# Parse input arguments
def parse_args():
parser = argparse.ArgumentParser(
description="""
Deploy a flow for use with trigger examples."""
)
parser.add_argument(
"--flowdef",
required=True,
help="Name of file containing the flow definition and input schema definitions.",
)
parser.add_argument(
"--schema",
required=True,
help="Name of file containing the input schema definition.",
)
parser.add_argument(
"--title",
default="Flow from Trigger Examples",
help='Flow title. [default: "Flow from Trigger Examples"]',
)
parser.add_argument(
"--flowid", help="Flow ID; used only when updating a flow definition"
)
parser.set_defaults(verbose=True)
return parser.parse_args()
import json
from flows_service import create_flows_client
def deploy_flow():
args = parse_args()
fc = create_flows_client()
# Get flow and input schema definitions
with open(args.flowdef, "r") as f:
flow_def = f.read()
with open(args.schema, "r") as f:
schema = f.read()
if args.flowid:
# Assume we're updating an existing flow
flow_id = args.flowid
flow = fc.update_flow(
flow_id=flow_id,
title=args.title,
definition=json.loads(flow_def),
input_schema=json.loads(schema),
)
print(f"Updated flow {flow_id}")
else:
# Deploy a new flow
flow = fc.create_flow(
title=args.title,
definition=json.loads(flow_def),
input_schema=json.loads(schema),
)
flow_id = flow["id"]
print(f"Deployed flow {flow_id}")
return flow_id, flow["globus_auth_scope"]
if __name__ == "__main__":
deploy_flow()