Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement security option in VM cluster managers #222

Merged
merged 8 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dask_cloudprovider/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class EC2Cluster(VMCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.

Notes
-----
Expand Down
2 changes: 1 addition & 1 deletion dask_cloudprovider/azure/azurevm.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class AzureVMCluster(VMCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.

Examples
--------
Expand Down
2 changes: 1 addition & 1 deletion dask_cloudprovider/digitalocean/droplet.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class DropletCluster(VMCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.

Examples
--------
Expand Down
10 changes: 6 additions & 4 deletions dask_cloudprovider/gcp/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ async def start_scheduler(self):
# scheduler must be publicly available, and firewall
# needs to be in place to allow access to 8786 on
# the external IP
self.address = f"tcp://{self.external_ip}:8786"
self.address = f"{self.cluster.protocol}://{self.external_ip}:8786"
else:
# if the client is running inside GCE environment
# it's better to use internal IP, which doesn't
# require firewall setup
self.address = f"tcp://{self.internal_ip}:8786"
self.address = f"{self.cluster.protocol}://{self.internal_ip}:8786"
await self.wait_for_scheduler()

# need to reserve internal IP for workers
Expand All @@ -320,7 +320,9 @@ def __init__(
self.scheduler = scheduler
self.worker_class = worker_class
self.name = f"dask-{self.cluster.uuid}-worker-{str(uuid.uuid4())[:8]}"
internal_scheduler = f"{self.cluster.scheduler_internal_ip}:8786"
internal_scheduler = (
f"{self.cluster.protocol}://{self.cluster.scheduler_internal_ip}:8786"
)
self.command = " ".join(
[
self.set_env,
Expand Down Expand Up @@ -444,7 +446,7 @@ class GCPCluster(VMCluster):
security : Security or bool (optional)
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.

Examples
--------
Expand Down
7 changes: 4 additions & 3 deletions dask_cloudprovider/gcp/tests/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ async def test_init():
@pytest.mark.asyncio
async def test_get_cloud_init():
skip_without_credentials()

cloud_init = GCPCluster.get_cloud_init(docker_args="--privileged")
cloud_init = GCPCluster.get_cloud_init(security=True, docker_args="--privileged")
assert "dask-scheduler" in cloud_init
assert "# Bootstrap" in cloud_init
assert " --privileged " in cloud_init
Expand All @@ -68,7 +67,9 @@ async def test_get_cloud_init():
async def test_create_cluster():
skip_without_credentials()

async with GCPCluster(asynchronous=True, env_vars={"FOO": "bar"}) as cluster:
async with GCPCluster(
asynchronous=True, env_vars={"FOO": "bar"}, security=True
) as cluster:

assert cluster.status == Status.running

Expand Down
47 changes: 43 additions & 4 deletions dask_cloudprovider/generic/vmcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from distributed.core import Status
from distributed.worker import Worker as _Worker
from distributed.scheduler import Scheduler as _Scheduler
from distributed.security import Security
from distributed.deploy.spec import SpecCluster, ProcessInterface
from distributed.utils import warn_on_duration, serialize_for_cli, cli_keywords

Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(
async def start(self):
self.cluster._log("Creating scheduler instance")
ip = await self.create_vm()
self.address = f"tcp://{ip}:8786"
self.address = f"{self.protocol}://{ip}:8786"
await self.wait_for_scheduler()
await super().start()

Expand Down Expand Up @@ -197,7 +198,7 @@ class VMCluster(SpecCluster):
security : Security or bool, optional
Configures communication security in this cluster. Can be a security
object, or True. If True, temporary self-signed credentials will
be created automatically.
be created automatically. Default is ``True``.

"""

Expand All @@ -221,28 +222,66 @@ def __init__(
docker_image="daskdev/dask:latest",
docker_args: str = "",
env_vars: dict = {},
security: bool = True,
protocol: str = None,
**kwargs,
):
if self.scheduler_class is None or self.worker_class is None:
raise RuntimeError(
"VMCluster is not intended to be used directly. See docstring for more info."
)
self._n_workers = n_workers

if not security:
self.security = None
elif security is True:
# True indicates self-signed temporary credentials should be used
self.security = Security.temporary()
elif not isinstance(security, Security):
raise TypeError("security must be a Security object")
else:
self.security = security

if protocol is None:
if self.security and self.security.require_encryption:
self.protocol = "tls"
else:
self.protocol = "tcp"
else:
self.protocol = protocol

if self.security and self.security.require_encryption:
dask.config.set(
{
"distributed.comm.default-scheme": self.protocol,
"distributed.comm.require-encryption": True,
"distributed.comm.tls.ca-file": self.security.tls_ca_file,
"distributed.comm.tls.scheduler.key": self.security.tls_scheduler_key,
"distributed.comm.tls.scheduler.cert": self.security.tls_scheduler_cert,
"distributed.comm.tls.worker.key": self.security.tls_worker_key,
"distributed.comm.tls.worker.cert": self.security.tls_worker_cert,
"distributed.comm.tls.client.key": self.security.tls_client_key,
"distributed.comm.tls.client.cert": self.security.tls_client_cert,
}
)

image = self.scheduler_options.get("docker_image", False) or docker_image
self.options["docker_image"] = image
self.scheduler_options["docker_image"] = image
self.scheduler_options["env_vars"] = env_vars
self.scheduler_options["protocol"] = protocol
self.scheduler_options["scheduler_options"] = scheduler_options
self.worker_options["env_vars"] = env_vars
self.options["docker_args"] = docker_args
self.scheduler_options["docker_args"] = docker_args
self.worker_options["docker_args"] = docker_args
self.worker_options["docker_image"] = image
self.worker_options["worker_class"] = worker_class
self.worker_options["protocol"] = protocol
self.worker_options["worker_options"] = worker_options
self.scheduler_options["scheduler_options"] = scheduler_options
self.uuid = str(uuid.uuid4())[:8]

super().__init__(**kwargs)
super().__init__(**kwargs, security=self.security)

async def call_async(self, f, *args, **kwargs):
"""Run a blocking function in a thread as a coroutine.
Expand Down
7 changes: 5 additions & 2 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,8 @@
]


# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {"https://docs.python.org/": None}
intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"dask": ("https://docs.dask.org/en/latest/", None),
"distributed": ("https://distributed.dask.org/en/latest/", None),
}
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ this code.
:caption: Advanced

troubleshooting.rst
security.rst
gpus.rst
packer.rst

Expand Down
47 changes: 47 additions & 0 deletions doc/source/security.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Security
========

Dask Cloudprovider aims to balance ease of use with security best practices. The two are not always compatible so this document aims to outline the compromises and decisions made in this library.

Public Schedulers
-----------------

For each cluster manager to work correctly it must be able to make a connection to the Dask scheduler on port ``8786``.
In many cluster managers the default option is to expose the Dask scheduler and dashboard to the internet via a public IP address.
This makes things quick and easy for new users to get up and running, but may pose a security risk long term.

Many organisations have policies which do not allow users to assign public IP addresses or open ports. Our best practices
advice is to use Dask Cloudprovider from within a cloud platform, either from a VM or a managed environment. Then disable public
networking.

See each cluster manager for configuration options.

Authentication and encryption
-----------------------------

Cluster managers such as :class:`dask_cloudprovider.aws.EC2Cluster`, :class:`dask_cloudprovider.azure.AzureVMCluster`,
:class:`dask_cloudprovider.gcp.GCPCluster` and :class:`dask_cloudprovider.digitalocean.DropletCluster` enable certificate based authentication
and encryption by default.

When a cluster is launched with any of these cluster managers a set of temporary keys will be generated and distributed to the cluster nodes
via their startup script. All communication between the client, scheduler and workers will then be encrypted and only clients and workers with
valid certificates will be able to connect to the scheduler.

You can also specify your own certificates using the :class:`distributed.security.Security` object.

.. code-block:: python

>>> from dask_cloudprovider.gcp import GCPCluster
>>> from dask.distributed import Client
>>> from distributed.security import Security
>>> sec = Security(tls_ca_file='cluster_ca.pem',
... tls_client_cert='cli_cert.pem',
... tls_client_key='cli_key.pem',
... require_encryption=True)
>>> cluster = GCPCluster(n_workers=1, security=sec)
>>> client = Client(cluster)
>>> client
<Client: 'tls://10.142.0.29:8786' processes=0 threads=0, memory=0 B>

You can disable secure connections by setting the ``security`` keyword argument to ``False``. This may be desirable when troubleshooting or
when running on a trusted network (entirely inside a VPC for example).