Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLOC-4478] Consul configuration [DO NOT MERGE] #2878

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c1c9a1d
Get consul running on acceptance test clusters
wallrj Jul 28, 2016
0eb6837
A simpler way to bootstrap the consul cluster. This works with the ad…
wallrj Jul 28, 2016
b4b5bdb
Factor out the configuration store operations so that I can implement…
wallrj Jul 29, 2016
7692dd0
Start writing and testing a Consul config store
wallrj Jul 29, 2016
80b76fa
Some tests and a treq implementation copied from the apiclient package.
wallrj Aug 1, 2016
277a3be
loop until the consul server is 'ready'
wallrj Aug 1, 2016
ee22315
A way to load configuration stre plugins
wallrj Aug 2, 2016
79f3c40
Move ConfigurationStore implementations to a separate package
wallrj Aug 3, 2016
8bec56a
Add retry to the consul store.
wallrj Aug 3, 2016
3c3a55b
Let's try this!
wallrj Aug 3, 2016
9c2de81
Update our TLS dropin configuration for Docker 1.12.0
wallrj Aug 3, 2016
ec2b0f3
Docker on Centos-7 is no longer socket activated...so hard code the u…
wallrj Aug 3, 2016
6f1ca8e
A hack to prevent the acceptance test script cleaning up my consul co…
wallrj Aug 3, 2016
b151add
Merge remote-tracking branch 'origin/fix-acceptance-add-id-FLOC-4480-…
wallrj Aug 3, 2016
c551708
Merge remote-tracking branch 'origin/master' into consul-configuratio…
wallrj Aug 4, 2016
b5572c3
Fix broken apiclient functional tests
wallrj Aug 4, 2016
d960d9b
Merge remote-tracking branch 'origin/master' into consul-configuratio…
wallrj Aug 12, 2016
d70fad8
Add an SQL configuration store and functional tests against MariaDB.
wallrj Aug 16, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin/package-files/systemd/flocker-control.service
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Description=Flocker Control Service

[Service]
ExecStart=/usr/sbin/flocker-control --port tcp:4523 --agent-port tcp:4524 --journald
ExecStart=/usr/sbin/flocker-control --port tcp:4523 --agent-port tcp:4524 --journald --configuration-store-plugin consul
Restart=always

PrivateTmp=true
Expand Down
4 changes: 4 additions & 0 deletions docs/installation/enabling-control-service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ On AWS, an external firewall is used instead, which will need to be configured s
Ubuntu 16.04
============

In a previous step we started a ``consul`` "server" on each of the nodes and introduced them to form a cluster.
With the consul database running we can now start the control service.
It will create a unique Flocker key in the database and store the Flocker configuration there.

.. task:: enable_flocker_control ubuntu-16.04
:prompt: [root@control-node]#

Expand Down
23 changes: 23 additions & 0 deletions docs/installation/install-node.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,29 @@ Installing on Ubuntu 16.04

.. XXX FLOC-3454 to create a task directive for installing the plugin

#. **Install** ``consul`` **(or a supported distributed key / value database):**

Launch ``consul`` on each of the nodes by running the official `Hashicorp Consul Docker image <https://www.hashicorp.com/blog/official-consul-docker-image.html>`_.

* Supply the local IP address which ``consul`` will use when connecting to other servers in the cluster.

* On the first ``consul`` server, add the ``-bootstrap`` argument.
This will be the server to which you join all the subsequent servers in the cluster.

.. task:: consul_start True 192.0.2.101
:prompt: [root@ubuntu]#

* On all subsequent nodes run the ``consul`` server, without the ``-bootstrap`` argument.
And join the server to the first by running ``consul join`` as follows:

.. task:: consul_start True 192.0.2.102
:prompt: [root@ubuntu]#

Now we will run ``consul join`` in order to introduce this server to the bootstrap server and hence all other members of the consul cluster.

.. task:: consul_join 192.0.2.101
:prompt: [root@ubuntu]#

#. **Repeat the previous steps for all other nodes:**

Log into your other nodes as root, and then complete step 2 and 3 until all the nodes in your cluster have installed the ``clusterhq-flocker-node`` and the optional ``clusterhq-flocker-docker-plugin`` package.
Expand Down
5 changes: 5 additions & 0 deletions flocker/acceptance/endtoend/test_dockerplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ def run_python_container(self, cluster, address, docker_arguments, script,
for container in client.containers():
client.remove_container(container["Id"], force=True)

# Give the test containers a meaningful name so that we can identify
# which test produced any container that don't get garbage collected.
if "name" not in docker_arguments:
docker_arguments["name"] = random_name(self)

container = client.create_container(
"python:2.7-slim",
["python2.7", "-c", script.getContent()] + list(script_arguments),
Expand Down
4 changes: 3 additions & 1 deletion flocker/acceptance/testtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,10 @@ def cleanup_all_containers(_):
# Remove all existing containers on the node, in case
# they're left over from previous test; they might e.g.
# have a volume bind-mounted, preventing its destruction.
# XXX But don't kill the consul containers
for container in client.containers():
client.remove_container(container["Id"], force=True)
if "acceptance" in [name for name in container["Names"]]:
client.remove_container(container["Id"], force=True)

def cleanup_flocker_containers(_):
cleaning_containers = api_clean_state(
Expand Down
3 changes: 2 additions & 1 deletion flocker/apiclient/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ def create_client(self):
clock = Clock()
_, self.port = find_free_port()
self.persistence_service = ConfigurationPersistenceService(
clock, FilePath(self.mktemp()))
reactor=clock
)
self.persistence_service.startService()
self.cluster_state_service = ClusterStateService(reactor)
self.cluster_state_service.startService()
Expand Down
4 changes: 2 additions & 2 deletions flocker/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ._retry import (
loop_until, timeout, poll_until, retry_failure, retry_effect_with_timeout,
get_default_retry_steps,
retry_if, decorate_methods, with_retry,
retry_if, decorate_methods, with_retry, backoff
)
from .version import parse_version, UnparseableVersion

Expand All @@ -36,7 +36,7 @@
'poll_until', 'retry_effect_with_timeout',

'decorate_methods',
'get_default_retry_steps', 'retry_if', 'with_retry',
'get_default_retry_steps', 'retry_if', 'with_retry', 'backoff',
'parse_version', 'UnparseableVersion',

'RACKSPACE_MINIMUM_VOLUME_SIZE',
Expand Down
183 changes: 100 additions & 83 deletions flocker/control/_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@
from collections import Set, Mapping, Iterable

from eliot import Logger, write_traceback, MessageType, Field, ActionType
from eliot.twisted import DeferredContext

from pyrsistent import PRecord, PVector, PMap, PSet, pmap, PClass

from pytz import UTC

from twisted.python.filepath import FilePath
from twisted.application.service import Service, MultiService
from twisted.internet.defer import succeed
from twisted.internet.defer import succeed, maybeDeferred
from twisted.internet.task import LoopingCall

from weakref import WeakKeyDictionary

from ._model import (
SERIALIZABLE_CLASSES, Deployment, Configuration, GenerationHash
)
from .configuration_storage.directory import DirectoryConfigurationStore

# The class at the root of the configuration tree.
ROOT_CLASS = Deployment
Expand Down Expand Up @@ -585,6 +587,20 @@ def update_leases(transform, persistence_service):
return succeed(new_leases)


def load_and_upgrade(config_json):
config_dict = loads(config_json)
config_version = config_dict.get('version', 1)
if config_version < _CONFIG_VERSION:
with _LOG_UPGRADE(configuration=config_json,
source_version=config_version,
target_version=_CONFIG_VERSION):
config_json = migrate_configuration(
config_version, _CONFIG_VERSION,
config_json, ConfigurationMigration)
config = wire_decode(config_json)
return config.deployment


class ConfigurationPersistenceService(MultiService):
"""
Persist configuration to disk, and load it back.
Expand All @@ -593,97 +609,84 @@ class ConfigurationPersistenceService(MultiService):
:ivar bytes _hash: A SHA256 hash of the configuration.
"""
logger = Logger()
_deployment = None
_hash = None

def __init__(self, reactor, path):
def __init__(self, reactor, configuration_saver=None,
initial_deployment=None):
"""
:param reactor: Reactor to use for thread pool.
:param FilePath path: Directory where desired deployment will be
persisted.
"""
MultiService.__init__(self)
self._path = path
self._config_path = self._path.child(b"current_configuration.json")
if configuration_saver is None:
configuration_saver = lambda deployment_data: None
self._configuration_save = configuration_saver
self._change_callbacks = []
if initial_deployment is None:
initial_deployment = Deployment()
initial_deployment_data = self._encode_deployment(initial_deployment)
self._hash = self._hash_deployment_data(initial_deployment_data)
self._deployment = initial_deployment
LeaseService(reactor, self).setServiceParent(self)

def startService(self):
if not self._path.exists():
self._path.makedirs()
self.load_configuration()
# Register the flocker-control service on this node
# curl -X PUT
# -d '{"Name": "flocker-control",
# "Check": {"tcp": "localhost:4523",
# "interval": "10s", "timeout": "1s"}}'
# http://localhost:8500/v1/agent/service/register
MultiService.startService(self)
_LOG_STARTUP(configuration=self.get()).write(self.logger)

def _process_v1_config(self, file_name, archive_name):
"""
Check if a v1 configuration file exists and upgrade it if necessary.
After upgrade, the v1 configuration file is retained with an archived
file name, which ensures the data is not lost but we do not override
a newer configuration version next time the service starts.

:param bytes file_name: The expected file name of a version 1
configuration.
:param bytes archive_name: The file name to which a version 1
configuration should be moved after it has been processed.
"""
v1_config_path = self._path.child(file_name)
v1_archived_path = self._path.child(archive_name)
# Check for a v1 config and upgrade to latest if found.
if v1_config_path.exists():
v1_json = v1_config_path.getContent()
with _LOG_UPGRADE(self.logger,
configuration=v1_json,
source_version=1,
target_version=_CONFIG_VERSION):
updated_json = migrate_configuration(
1, _CONFIG_VERSION, v1_json,
ConfigurationMigration
)
self._config_path.setContent(updated_json)
v1_config_path.moveTo(v1_archived_path)
_LOG_STARTUP(configuration=self.get()).write()

def configuration_hash(self):
"""
:return bytes: A hash of the configuration.
"""
return self._hash

def load_configuration(self):
@classmethod
def from_json_bytes(cls, reactor, json_bytes, configuration_saver):
if json_bytes:
initial_deployment = load_and_upgrade(json_bytes)
else:
initial_deployment = None

return cls(
reactor=reactor,
configuration_saver=configuration_saver,
initial_deployment=initial_deployment,
)

@classmethod
def from_configuration_store(cls, reactor, configuration_store):
"""
Load the persisted configuration, upgrading the configuration format
if an older version is detected.
"""
# Version 1 configurations are a special case. They do not store
# any version information in the configuration data itself, rather they
# can only be identified by the use of the file name
# current_configuration.v1.json
# Therefore we check for a version 1 configuration file and if it is
# found, the config is upgraded, written to current_configuration.json
# and the old file archived as current_configuration.v1.old.json
self._process_v1_config(
file_name=b"current_configuration.v1.json",
archive_name=b"current_configuration.v1.old.json"
)
d = configuration_store.initialize()
d.addCallback(lambda ignored: configuration_store.get_content())

def load(json_bytes):
return cls.from_json_bytes(
reactor=reactor,
json_bytes=json_bytes,
configuration_saver=configuration_store.set_content,
)
d.addCallback(load)
return d

# We can now safely attempt to detect and process a >v1 configuration
# file as normal.
if self._config_path.exists():
config_json = self._config_path.getContent()
config_dict = loads(config_json)
config_version = config_dict['version']
if config_version < _CONFIG_VERSION:
with _LOG_UPGRADE(self.logger,
configuration=config_json,
source_version=config_version,
target_version=_CONFIG_VERSION):
config_json = migrate_configuration(
config_version, _CONFIG_VERSION,
config_json, ConfigurationMigration)
config = wire_decode(config_json)
self._deployment = config.deployment
self._sync_save(config.deployment)
else:
self._deployment = Deployment()
self._sync_save(self._deployment)
@classmethod
def from_directory(cls, reactor, directory):
configuration_store = DirectoryConfigurationStore(
directory=directory
)
configuration_store.initialize_sync()
return cls.from_json_bytes(
reactor=reactor,
json_bytes=configuration_store.get_content_sync(),
configuration_saver=configuration_store.set_content,
)

def register(self, change_callback):
"""
Expand All @@ -694,14 +697,15 @@ def register(self, change_callback):
"""
self._change_callbacks.append(change_callback)

def _sync_save(self, deployment):
"""
Save and flush new configuration to disk synchronously.
"""
config = Configuration(version=_CONFIG_VERSION, deployment=deployment)
data = wire_encode(config)
self._hash = b16encode(mmh3_hash_bytes(data)).lower()
self._config_path.setContent(data)
def _encode_deployment(self, deployment):
config = Configuration(
version=_CONFIG_VERSION,
deployment=deployment
)
return wire_encode(config)

def _hash_deployment_data(self, deployment_data):
return b16encode(mmh3_hash_bytes(deployment_data)).lower()

def save(self, deployment):
"""
Expand All @@ -710,12 +714,10 @@ def save(self, deployment):
:return Deferred: Fires when write is finished.
"""
if deployment == self._deployment:
_LOG_UNCHANGED_DEPLOYMENT_NOT_SAVED().write(self.logger)
_LOG_UNCHANGED_DEPLOYMENT_NOT_SAVED().write()
return succeed(None)

with _LOG_SAVE(self.logger, configuration=deployment):
self._sync_save(deployment)
self._deployment = deployment
def finish(ignored):
# At some future point this will likely involve talking to a
# distributed system (e.g. ZooKeeper or etcd), so the API doesn't
# guarantee immediate saving of the data.
Expand All @@ -725,9 +727,24 @@ def save(self, deployment):
except:
# Second argument will be ignored in next Eliot release, so
# not bothering with particular value.
write_traceback(self.logger, u"")
write_traceback()
return succeed(None)

with _LOG_SAVE(configuration=deployment) as action:
deployment_data = self._encode_deployment(deployment)
self._hash = self._hash_deployment_data(deployment_data)
self._deployment = deployment
d = maybeDeferred(
self._configuration_save,
deployment_data
)

with action.context():
d = DeferredContext(d)
d.addCallback(finish)
d.addActionFinish()
return d.result

def get(self):
"""
Retrieve current configuration.
Expand Down
Empty file.
Loading