-
Notifications
You must be signed in to change notification settings - Fork 6
/
uploader_rules.py
executable file
·238 lines (204 loc) · 10.7 KB
/
uploader_rules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
# Contributors: Guillaume Destuynder <kang@mozilla.com>
import argparse
import glob
import json
import logging
import os
import sys
from authzero import AuthZero, AuthZeroRule
import difflib
MAINTENANCE_RULE_NAME = 'default-deny-for-maintenance'
class NotARulesDirectory(Exception):
pass
class DotDict(dict):
"""return a dict.item notation for dict()'s"""
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __init__(self, dict_):
super().__init__()
for key, value in dict_.items():
if hasattr(value, 'keys'):
value = DotDict(value)
self[key] = value
def empty_directory(directory):
if not os.path.exists(directory):
os.makedirs(directory)
elif os.listdir(directory):
raise argparse.ArgumentTypeError(
"Directory {} is not empty. Please choose either a directory "
"which doesn't exist or an empty directory".format(directory))
return directory
if __name__ == "__main__":
# Logging
logger_format = "[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s"
logging.basicConfig(format=logger_format, datefmt="%H:%M:%S", stream=sys.stdout)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Default credentials loading
try:
with open('credentials.json', 'r') as fd:
credentials = DotDict(json.load(fd))
require_creds = False
except FileNotFoundError:
credentials = DotDict({'client_id': '', 'client_secret': '', 'uri': 'auth-dev.mozilla.auth0.com'})
require_creds = True
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--uri', default=credentials.uri, help='URI to Auth0 management API')
parser.add_argument('-c', '--clientid', default=credentials.client_id, required=require_creds, help='Auth0 client id')
parser.add_argument('-s', '--clientsecret', default=credentials.client_secret, required=require_creds, help='Auth0 client secret')
parser.add_argument('-r', '--rules-dir', default='rules', help='Directory containing rules in Auth0 format')
parser.add_argument('-b', '--backup-rules-to-directory', type=empty_directory, metavar='DIRECTORY', help='Download all rules from the API and save them to this directory.')
parser.add_argument('--delete-all-rules-first-causing-outage', action='store_true', help="Before uploading rules, delete all rules causing an outage")
parser.add_argument('-d', '--dry-run', action='store_true', help="Show what would be done but don't actually make any changes")
args = parser.parse_args()
config = DotDict({'client_id': args.clientid, 'client_secret': args.clientsecret, 'uri': args.uri})
authzero = AuthZero(config)
authzero.get_access_token()
logger.debug("Got access token for client_id:{}".format(args.clientid))
dry_run_message = 'Dry Run : Action not taken : ' if args.dry_run else ''
# on any error, `authzero` will raise an exception and python will exit with non-zero code
# Remote rules loader
remote_rules = authzero.get_rules()
logger.debug("Loaded {} remote rules from current Auth0 deployment".format(len(remote_rules)))
if args.backup_rules_to_directory:
for rule in remote_rules:
js_filename = os.path.join(
args.backup_rules_to_directory,
'{}.js'.format(rule['name']))
metadata_filename = js_filename + 'on'
metadata = {
'enabled': rule['enabled'],
'order': rule['order']
}
logger.debug("{}Writing metadata file {}".format(
dry_run_message, metadata_filename))
if not args.dry_run:
with open(metadata_filename, 'x') as f:
json.dump(metadata, f, sort_keys=True, indent=4, separators=(',', ': '))
logger.debug("{}Writing js file {}".format(
dry_run_message, js_filename))
if not args.dry_run:
with open(js_filename, 'x') as f:
f.write(rule['script'])
print("To restore from this backup run {} --rules-dir {}".format(
os.path.basename(__file__),
args.backup_rules_to_directory
))
sys.exit(0)
# Local rules loader
if not os.path.isdir(args.rules_dir):
raise NotARulesDirectory(args.rules_dir)
# Process all local rules
local_rules_files = glob.glob("{}/*.json".format(args.rules_dir))
local_rules = []
for local_rules_file in local_rules_files:
logger.debug("Reading local rule configuration {}".format(local_rules_file))
local_rule = AuthZeroRule()
# Overload the object with our own statuses
local_rule.is_new = False
local_rule.is_the_same = False
# Rule name comes from the filename with the auth0 format
local_rule.name = local_rules_file.split('/')[-1].split('.')[:-1][0]
with open(local_rules_file, 'r') as fd:
rule_conf = DotDict(json.load(fd))
local_rule.enabled = bool(rule_conf.enabled)
local_rule.order = int(rule_conf.order)
local_rules_file_js = local_rules_file.rstrip('on') # Equivalent to s/blah.json/blah.js/
logger.debug("Reading local rule code {}".format(local_rules_file_js))
with open(local_rules_file_js, 'r') as fd:
local_rule.script = fd.read()
if args.delete_all_rules_first_causing_outage and local_rule.name != MAINTENANCE_RULE_NAME:
# If we're deleting all rules, then we will create all rules anew
# after deletion with the exception of the maintenance rule
remote_rule_indexes = []
else:
# Match with existing remote rule if we need to update.. this uses the rule name!
remote_rule_indexes = [i for i, _ in enumerate(remote_rules) if _.get('name') == local_rule.name]
if remote_rule_indexes:
# If there's multi matches it means we have duplicate rule names and we're screwed.
# To fix that we'd need to change the auth0 local format to use rule ids (which we could eventually)
if len(remote_rule_indexes) > 1:
raise Exception('RuleMatchByNameFailed', (local_rule.name, remote_rule_indexes))
remote_rule_index = remote_rule_indexes[0]
local_rule.id = remote_rules[remote_rule_index].get('id')
local_rule.is_new = False
# Is the rule different?
remote_rule = remote_rules[remote_rule_index]
rules_match = (
(local_rule.script == remote_rule.get('script')) &
(local_rule.enabled == bool(remote_rule.get('enabled'))) &
(local_rule.stage == remote_rule.get('stage')) &
(local_rule.order == remote_rule.get('order')))
if rules_match:
local_rule.is_the_same = True
else:
logger.debug('Difference found in {} :'.format(local_rule.name))
for line in difflib.unified_diff(
remote_rule.get('script').splitlines(),
local_rule.script.splitlines(),
fromfile='auth0-{}'.format(local_rule.name),
tofile='local-{}'.format(local_rule.name)):
logger.debug(line)
else:
# No remote rule match, so it's a new rule
logger.debug('Rule only exists locally, considered new and to be created: {}'.format(local_rule.name))
local_rule.is_new = True
if not local_rule.validate():
logger.error('Rule failed validation: {}'.format(local_rule.name))
sys.exit(127)
else:
local_rules.append(local_rule)
logger.debug("Found {} local rules".format(len(local_rules)))
if args.delete_all_rules_first_causing_outage:
rules_to_remove = [x for x in remote_rules if x.get('name') != MAINTENANCE_RULE_NAME]
logger.debug("Found {} rules that will be deleted remotely".format(len(rules_to_remove)))
else:
# Find dead rules (i.e. to remove/rules that only exist remotely)
rules_to_remove = [x for x in remote_rules if x.get('id') not in [y.id for y in local_rules]]
logger.debug("Found {} rules that not longer exist locally and will be deleted remotely".format(len(rules_to_remove)))
maintenance_rule = next(
x for x in local_rules if x.name == MAINTENANCE_RULE_NAME)
if args.delete_all_rules_first_causing_outage:
maintenance_rule.enabled = True
logger.debug("[+] {}Enabling maintenance rule denying all logins globally {} {}".format(
dry_run_message, maintenance_rule.name, maintenance_rule.id))
if not args.dry_run:
authzero.update_rule(maintenance_rule.id, maintenance_rule)
# Update or create (or delete) rules as needed
## Delete first in case we need to get some order numbers free'd
for rule in rules_to_remove:
logger.debug("[-] {}Removing rule {} ({}) from Auth0".format(
dry_run_message, rule['name'], rule['id']))
if not args.dry_run:
authzero.delete_rule(rule['id'])
## Update & Create (I believe this may be atomic swaps for updates)
for local_rule in local_rules:
if local_rule.is_new:
if args.delete_all_rules_first_causing_outage and local_rule.name == MAINTENANCE_RULE_NAME:
continue
logger.debug("[+] {}Creating new rule {} on Auth0".format(
dry_run_message, local_rule.name))
if not args.dry_run:
result = authzero.create_rule(local_rule)
logger.debug("+ New rule created with id {}".format(result.get('id')))
elif local_rule.is_the_same:
logger.debug("[=] Rule {} is unchanged, will not update".format(local_rule.name))
else:
logger.debug("[~] {}Updating rule {} ({}) on Auth0".format(
dry_run_message, local_rule.name, local_rule.id))
if not args.dry_run:
authzero.update_rule(local_rule.id, local_rule)
if args.delete_all_rules_first_causing_outage:
maintenance_rule.enabled = False
logger.debug("[-] {}Disabling maintenance rule {} {}".format(
dry_run_message, maintenance_rule.name, maintenance_rule.id))
if not args.dry_run:
authzero.update_rule(maintenance_rule.id, maintenance_rule)
sys.exit(0)