-
Notifications
You must be signed in to change notification settings - Fork 68
/
manage.py
421 lines (370 loc) · 15.7 KB
/
manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
""" autopilotpattern/mysql ContainerPilot handlers """
from __future__ import print_function
from datetime import datetime
import os
import socket
import subprocess
import sys
# pylint: disable=invalid-name,no-self-use,dangerous-default-value
from manager.client import MySQL, MySQLError
from manager.config import ContainerPilot
from manager.discovery import Consul
from manager.env import PRIMARY_KEY, BACKUP_NAME
from manager.network import get_ip
from manager.storage.manta_stor import Manta
from manager.storage.minio_stor import Minio
from manager.storage.local import Local
from manager.utils import log, debug, \
PRIMARY, REPLICA, UNASSIGNED, \
UnknownPrimary, WaitTimeoutError
class Node(object):
"""
Node represents the state of our running container and carries
around the MySQL config, and clients for Consul and Snapshots.
"""
def __init__(self, mysql=None, cp=None, consul=None, snaps=None):
self.mysql = mysql
self.consul = consul
self.snaps = snaps
self.cp = cp
self.hostname = socket.gethostname()
self.name = 'mysql-{}'.format(self.hostname)
self.ip = get_ip()
@debug(log_output=True)
def is_primary(self):
"""
Check if this node is the primary by checking in-memory cache,
then Consul, then MySQL replication status. Caches its result so
the node `state` field needs to be set to UNASSIGNED if you want
to force a check of Consul, etc.
"""
log.debug('state: %s' % self.cp.state)
if self.cp.state != UNASSIGNED:
return self.cp.state == PRIMARY
try:
# am I already replicating from somewhere else?
_, primary_ip = self.mysql.get_primary()
if not primary_ip:
pass
elif primary_ip == self.ip:
self.cp.state = PRIMARY
return True
else:
self.cp.state = REPLICA
return False
except (MySQLError, WaitTimeoutError, UnknownPrimary) as ex:
log.debug('could not determine primary via mysqld status: %s', ex)
try:
# am I already reporting I'm a healthy primary to Consul?
_, primary_ip = self.consul.get_primary()
if not primary_ip:
pass
elif primary_ip == self.ip:
self.cp.state = PRIMARY
return True
else:
self.cp.state = REPLICA
return False
except (UnknownPrimary, ValueError) as ex:
log.debug('could not determine primary via Consul: %s', ex)
# am I listed in the Consul PRIMARY_KEY??
_, primary_name = self.consul.read_lock(PRIMARY_KEY)
log.debug('primary_name: %s' % primary_name)
if primary_name == self.name:
self.cp.state = PRIMARY
return True
self.cp.state = UNASSIGNED
return False
def is_replica(self):
""" check if we're the replica """
return not self.is_primary() and self.cp.state != UNASSIGNED
def is_snapshot_node(self):
""" check if we're the node that's going to execute the snapshot """
# TODO: we want to have the replicas all do a lock on the snapshot task
return self.is_primary()
# ---------------------------------------------------------
# Top-level functions called by ContainerPilot
@debug
def pre_start(node):
"""
the top-level ContainerPilot `preStart` handler.
MySQL must be running in order to execute most of our setup behavior
so we're just going to make sure the directory structures are in
place and then let the first health check handler take it from there
"""
# make sure that if we've pulled in an external data volume that
# the mysql user can read it
my = node.mysql
my.take_ownership()
my.render()
if not os.path.isdir(os.path.join(my.datadir, 'mysql')):
last_backup = node.consul.has_snapshot()
if last_backup:
node.snaps.get_backup(last_backup)
my.restore_from_snapshot(last_backup)
else:
if not my.initialize_db():
log.info('Skipping database setup.')
@debug
def health(node):
"""
The top-level ContainerPilot `health` handler. Runs a simple health check.
Also acts as a check for whether the ContainerPilot configuration needs
to be reloaded (if it's been changed externally).
"""
# Because we need MySQL up to finish initialization, we need to check
# for each pass thru the health check that we've done so. The happy
# path is to check a lock file against the node state (which has been
# set above) and immediately return when we discover the lock exists.
# Otherwise, we bootstrap the instance for its *current* state.
assert_initialized_for_state(node)
if node.is_primary():
# If this lock is allowed to expire and the health check for the
# primary fails the `onChange` handlers for the replicas will try
# to failover and then the primary will obtain a new lock.
# If this node can update the lock but the DB fails its health check,
# then the operator will need to manually intervene if they want to
# force a failover. This architecture is a result of Consul not
# permitting us to acquire a new lock on a health-checked session if the
# health check is *currently* failing, but has the happy side-effect of
# reducing the risk of flapping on a transient health check failure.
node.consul.renew_session()
# Simple health check; exceptions result in a non-zero exit code
node.mysql.query('select 1')
elif node.is_replica():
# TODO: we should make this check actual replication health
# and not simply that replication has been established
if not node.mysql.query('show slave status'):
log.error('Replica is not replicating.')
sys.exit(1)
else:
# If we're still somehow marked UNASSIGNED we exit now. This is a
# byzantine failure mode where the end-user needs to intervene.
log.error('Cannot determine MySQL state; failing health check.')
sys.exit(1)
node.consul.unlock_failover()
@debug
def on_change(node):
""" The top-level ContainerPilot onChange handler """
# first check if this node has already been set primary by a completed
# call to failover and update the ContainerPilot config as needed.
if node.is_primary():
log.debug('[on_change] this node is primary, no failover required.')
if node.cp.update():
# we're ignoring the lock here intentionally
node.consul.put(PRIMARY_KEY, node.name)
node.cp.reload()
return
# check if another node has been set primary already and is reporting
# as healthy, in which case there's no failover required. Note that
# we can't simply check if we're a replica via .is_replica() b/c that
# trusts mysqld's view of the world.
try:
node.consul.get_primary(timeout=1)
log.debug('[on_change] primary is already healthy, no failover required')
return
except (UnknownPrimary, WaitTimeoutError) as ex:
log.debug('[on_change] no primary from consul: %s', ex)
if node.consul.lock_failover(node.name):
try:
nodes = node.consul.client.health.service(REPLICA, passing=True)[1]
ips = [instance['Service']['Address'] for instance in nodes]
log.info('[on_change] Executing failover with candidates: %s', ips)
node.mysql.failover(ips)
except Exception:
# On failure we bubble-up the exception and fail the onChange.
# Either another instance that didn't overlap in time will
# complete failover or we'll be left w/o a primary and require
# manual intervention via `mysqlrpladmin failover`
node.consul.unlock_failover()
raise
else:
log.info('[on_change] Failover in progress on another node, '
'waiting to complete.')
node.consul.wait_for_failover_lock()
# need to determine replicaton status at this point, so make
# sure we refresh .state from mysqld/Consul
node.cp.state = UNASSIGNED
if node.is_primary():
log.info('[on_change] node %s is primary after failover', node.name)
if node.cp.update():
# we're intentionally ignoring the advisory lock here
ok = node.consul.put(PRIMARY_KEY, node.name)
log.debug('[on_change] %s obtained lock: %s', node.name, ok)
node.cp.reload()
return
elif node.is_replica():
log.info('[on_change] node %s is replica after failover', node.name)
if node.cp.state == UNASSIGNED:
log.error('[on_change] this node is neither primary or replica '
'after failover; check replication status on cluster.')
sys.exit(1)
@debug
def snapshot_task(node):
"""
Create a snapshot and send it to the object store if this is the
node and time to do so.
"""
# bail-out early if we can avoid making a DB connection
if not node.is_snapshot_node() or not node.consul.lock_snapshot(node.name):
return
binlog_file = node.mysql.get_binlog()
if node.consul.is_snapshot_stale(binlog_file):
# we'll let exceptions bubble up here. The task will fail
# and be logged, and when the BACKUP_LOCK_KEY expires we can
# alert on that externally.
try:
write_snapshot(node)
finally:
node.consul.unlock_snapshot()
@debug
def write_snapshot(node):
"""
Calls out to innobackupex to snapshot the DB, then pushes the file
to Snapshot storage and writes that the work is completed in Consul.
"""
now = datetime.utcnow()
# we don't want .isoformat() here because of URL encoding
backup_id = now.strftime('{}'.format(BACKUP_NAME))
backup_time = now.isoformat()
with open('/tmp/backup.tar', 'w') as f:
subprocess.check_call(['/usr/bin/innobackupex',
'--user={}'.format(node.mysql.repl_user),
'--password={}'.format(node.mysql.repl_password),
'--no-timestamp',
#'--compress',
'--stream=tar',
'/tmp/backup'], stdout=f)
log.info('snapshot completed, uploading to object store')
out = node.snaps.put_backup(backup_id, '/tmp/backup.tar')
log.info('snapshot uploaded to %s', out)
# write the filename of the binlog to Consul so that we know if
# we've rotated since the last backup.
# query lets KeyError bubble up -- something's broken
results = node.mysql.query('show master status')
binlog_file = results[0]['File']
node.consul.record_backup(backup_id, backup_time, binlog_file)
# ---------------------------------------------------------
# run_as_* functions determine the top-level behavior of a node
@debug(log_output=True)
def assert_initialized_for_state(node):
"""
If the node has not yet been set up, find the correct state and
initialize for that state. After the first health check we'll have
written a lock file and will never hit this path again.
"""
LOCK_PATH = '/var/run/init.lock'
try:
os.mkdir(LOCK_PATH, 0700)
except OSError:
# the lock file exists so we've already initialized
return True
# the check for primary will set the state if its known. If another
# instance is the primary then we'll be marked as REPLICA, so if
# we can't determine after the check which we are then we're likely
# the first instance (this will get safely verified later).
if node.is_primary() or node.cp.state == UNASSIGNED:
try:
if not run_as_primary(node):
log.error('Tried to mark node %s primary but primary exists, '
'exiting for retry on next check.', node.name)
os.rmdir(LOCK_PATH)
sys.exit(1)
except MySQLError as ex:
# We've made it only partly thru setup. Setup isn't idempotent
# but should be safe to retry if we can make more progress. At
# worst we end up with a bunch of failure logs.
log.error('Failed to set up %s as primary (%s). Exiting but will '
'retry setup. Check logs following this line to see if '
'setup needs reconfiguration or manual intervention to '
'continue.', node.name, ex)
os.rmdir(LOCK_PATH)
sys.exit(1)
if node.cp.update():
os.rmdir(LOCK_PATH)
node.cp.reload()
# this is racy with the SIGHUP that ContainerPilot just got
# sent, but if the Consul agent shuts down quickly enough we
# end up sending extra API calls to it and get a bunch of log
# spam. This forces us to exit early.
sys.exit(0)
else:
try:
run_as_replica(node)
except (UnknownPrimary, MySQLError) as ex:
log.error('Failed to set up %s for replication (%s). Exiting for retry '
'on next check.', node.name, ex)
os.rmdir(LOCK_PATH)
sys.exit(1)
return False
@debug
def run_as_primary(node):
"""
The overall workflow here is ported and reworked from the
Oracle-provided Docker image:
https://github.com/mysql/mysql-docker/blob/mysql-server/5.7/docker-entrypoint.sh
"""
if not node.consul.mark_as_primary(node.name):
return False
node.cp.state = PRIMARY
conn = node.mysql.wait_for_connection()
my = node.mysql
if conn:
# if we can make a connection w/o a password then this is the
# first pass. *Note: the conn is not the same as `node.conn`!*
my.set_timezone_info()
my.setup_root_user(conn)
my.create_db(conn)
my.create_default_user(conn)
my.create_repl_user(conn)
my.expire_root_password(conn)
else:
# in case this is a newly-promoted primary
my.execute('STOP SLAVE')
# although backups will be run from any instance, we need to first
# snapshot the primary so that we can bootstrap replicas.
write_snapshot(node)
return True
@debug
def run_as_replica(node):
"""
Set up GTID-based replication to the primary; once this is set the
replica will automatically try to catch up with the primary's last
transactions. UnknownPrimary or mysqlconn.Errors are allowed to
bubble up to the caller.
"""
log.info('Setting up replication.')
node.cp.state = REPLICA
_, primary_ip = node.consul.get_primary(timeout=30)
node.mysql.setup_replication(primary_ip)
# ---------------------------------------------------------
def main():
"""
Parse argument as command and execute that command with
parameters containing the state of MySQL, ContainerPilot, etc.
Default behavior is to run `pre_start` DB initialization.
"""
if len(sys.argv) == 1:
consul = Consul(envs={'CONSUL': os.environ.get('CONSUL', 'consul')})
cmd = pre_start
else:
consul = Consul()
try:
cmd = globals()[sys.argv[1]]
except KeyError:
log.error('Invalid command: %s', sys.argv[1])
sys.exit(1)
my = MySQL()
snapshot_backend = os.environ.get('SNAPSHOT_BACKEND', 'manta')
if snapshot_backend == 'local':
snaps = Local()
elif snapshot_backend == 'minio':
snaps = Minio()
else:
snaps = Manta()
cp = ContainerPilot()
cp.load()
node = Node(mysql=my, consul=consul, snaps=snaps, cp=cp)
cmd(node)
if __name__ == '__main__':
main()