Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: App Profile multi cluster routing support with specified cluster ids #549

Merged
merged 3 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions google/cloud/bigtable/app_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class AppProfile(object):
when routing_policy_type is
ROUTING_POLICY_TYPE_SINGLE.

:type: multi_cluster_ids: list
:param: multi_cluster_ids: (Optional) The set of clusters to route to.
The order is ignored; clusters will be tried in order of distance.
If left empty, all clusters are eligible.

:type: allow_transactional_writes: bool
:param: allow_transactional_writes: (Optional) If true, allow
transactional writes for
Expand All @@ -72,13 +77,15 @@ def __init__(
routing_policy_type=None,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
):
self.app_profile_id = app_profile_id
self._instance = instance
self.routing_policy_type = routing_policy_type
self.description = description
self.cluster_id = cluster_id
self.multi_cluster_ids = multi_cluster_ids
self.allow_transactional_writes = allow_transactional_writes

@property
Expand Down Expand Up @@ -184,13 +191,17 @@ def _update_from_pb(self, app_profile_pb):
self.routing_policy_type = None
self.allow_transactional_writes = None
self.cluster_id = None

self.multi_cluster_ids = None
self.description = app_profile_pb.description

routing_policy_type = None
if app_profile_pb._pb.HasField("multi_cluster_routing_use_any"):
routing_policy_type = RoutingPolicyType.ANY
self.allow_transactional_writes = False
if app_profile_pb.multi_cluster_routing_use_any.cluster_ids:
self.multi_cluster_ids = (
app_profile_pb.multi_cluster_routing_use_any.cluster_ids
)
else:
routing_policy_type = RoutingPolicyType.SINGLE
self.cluster_id = app_profile_pb.single_cluster_routing.cluster_id
Expand All @@ -215,7 +226,9 @@ def _to_pb(self):

if self.routing_policy_type == RoutingPolicyType.ANY:
multi_cluster_routing_use_any = (
instance.AppProfile.MultiClusterRoutingUseAny()
instance.AppProfile.MultiClusterRoutingUseAny(
cluster_ids=self.multi_cluster_ids
)
)
else:
single_cluster_routing = instance.AppProfile.SingleClusterRouting(
Expand Down Expand Up @@ -312,6 +325,7 @@ def update(self, ignore_warnings=None):
``routing_policy_type``
``description``
``cluster_id``
``multi_cluster_ids``
``allow_transactional_writes``

For example:
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/bigtable/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ def app_profile(
routing_policy_type=None,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
):
"""Factory to create AppProfile associated with this instance.
Expand Down Expand Up @@ -742,6 +743,11 @@ def app_profile(
when routing_policy_type is
ROUTING_POLICY_TYPE_SINGLE.

:type: multi_cluster_ids: list
:param: multi_cluster_ids: (Optional) The set of clusters to route to.
The order is ignored; clusters will be tried in order of distance.
If left empty, all clusters are eligible.

:type: allow_transactional_writes: bool
:param: allow_transactional_writes: (Optional) If true, allow
transactional writes for
Expand All @@ -756,6 +762,7 @@ def app_profile(
routing_policy_type=routing_policy_type,
description=description,
cluster_id=cluster_id,
multi_cluster_ids=multi_cluster_ids,
allow_transactional_writes=allow_transactional_writes,
)

Expand Down
7 changes: 6 additions & 1 deletion tests/system/test_instance_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def _create_app_profile_helper(
routing_policy_type,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
ignore_warnings=None,
):
Expand All @@ -33,14 +34,15 @@ def _create_app_profile_helper(
routing_policy_type=routing_policy_type,
description=description,
cluster_id=cluster_id,
multi_cluster_ids=multi_cluster_ids,
allow_transactional_writes=allow_transactional_writes,
)
assert app_profile.allow_transactional_writes == allow_transactional_writes

app_profile.create(ignore_warnings=ignore_warnings)

# Load a different app_profile objec form the server and
# verrify that it is the same
# verify that it is the same
alt_app_profile = instance.app_profile(app_profile_id)
alt_app_profile.reload()

Expand All @@ -67,6 +69,7 @@ def _modify_app_profile_helper(
routing_policy_type,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
ignore_warnings=None,
):
Expand All @@ -75,6 +78,7 @@ def _modify_app_profile_helper(
routing_policy_type=routing_policy_type,
description=description,
cluster_id=cluster_id,
multi_cluster_ids=multi_cluster_ids,
allow_transactional_writes=allow_transactional_writes,
)

Expand All @@ -87,6 +91,7 @@ def _modify_app_profile_helper(
assert alt_profile.description == description
assert alt_profile.routing_policy_type == routing_policy_type
assert alt_profile.cluster_id == cluster_id
assert alt_profile.multi_cluster_ids == multi_cluster_ids
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a few more system tests that create an app profile with multi cluster routing and ids specified, and also an update test adding or removing a cluster id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the test below: test_instance_create_app_profile_create_with_multi_cluster_ids

assert alt_profile.allow_transactional_writes == allow_transactional_writes


Expand Down
150 changes: 144 additions & 6 deletions tests/unit/test_app_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
PROJECT, INSTANCE_ID, APP_PROFILE_ID
)
CLUSTER_ID = "cluster-id"
CLUSTER_ID_2 = "cluster-id-2"
OP_ID = 8765
OP_NAME = "operations/projects/{}/instances/{}/appProfiles/{}/operations/{}".format(
PROJECT, INSTANCE_ID, APP_PROFILE_ID, OP_ID
Expand Down Expand Up @@ -54,6 +55,7 @@ def test_app_profile_constructor_defaults():
assert app_profile.routing_policy_type is None
assert app_profile.description is None
assert app_profile.cluster_id is None
assert app_profile.multi_cluster_ids is None
assert app_profile.allow_transactional_writes is None


Expand Down Expand Up @@ -92,9 +94,32 @@ def test_app_profile_constructor_explicit():
assert app_profile2.routing_policy_type == SINGLE
assert app_profile2.description == DESCRIPTION_2
assert app_profile2.cluster_id == CLUSTER_ID
assert app_profile2.multi_cluster_ids is None
assert app_profile2.allow_transactional_writes == ALLOW_WRITES


def test_app_profile_constructor_multi_cluster_ids():
from google.cloud.bigtable.enums import RoutingPolicyType

ANY = RoutingPolicyType.ANY
DESCRIPTION_1 = "routing policy any"
client = _Client(PROJECT)
instance = _Instance(INSTANCE_ID, client)

app_profile1 = _make_app_profile(
APP_PROFILE_ID,
instance,
routing_policy_type=ANY,
description=DESCRIPTION_1,
multi_cluster_ids=[CLUSTER_ID, CLUSTER_ID_2],
)
assert app_profile1.app_profile_id == APP_PROFILE_ID
assert app_profile1._instance is instance
assert app_profile1.routing_policy_type == ANY
assert app_profile1.description == DESCRIPTION_1
assert app_profile1.multi_cluster_ids == [CLUSTER_ID, CLUSTER_ID_2]


def test_app_profile_name():
credentials = _make_credentials()
client = _make_client(project=PROJECT, credentials=credentials, admin=True)
Expand Down Expand Up @@ -147,24 +172,56 @@ def test_app_profile_from_pb_success_w_routing_any():
client = _Client(PROJECT)
instance = _Instance(INSTANCE_ID, client)

desctiption = "routing any"
description = "routing any"
routing = RoutingPolicyType.ANY
multi_cluster_routing_use_any = data_v2_pb2.AppProfile.MultiClusterRoutingUseAny()

app_profile_pb = data_v2_pb2.AppProfile(
name=APP_PROFILE_NAME,
description=desctiption,
description=description,
multi_cluster_routing_use_any=multi_cluster_routing_use_any,
)

app_profile = AppProfile.from_pb(app_profile_pb, instance)
assert isinstance(app_profile, AppProfile)
assert app_profile._instance is instance
assert app_profile.app_profile_id == APP_PROFILE_ID
assert app_profile.description == description
assert app_profile.routing_policy_type == routing
assert app_profile.cluster_id is None
assert app_profile.multi_cluster_ids is None
assert app_profile.allow_transactional_writes is False


def test_app_profile_from_pb_success_w_routing_any_multi_cluster_ids():
from google.cloud.bigtable_admin_v2.types import instance as data_v2_pb2
from google.cloud.bigtable.app_profile import AppProfile
from google.cloud.bigtable.enums import RoutingPolicyType

client = _Client(PROJECT)
instance = _Instance(INSTANCE_ID, client)

description = "routing any"
routing = RoutingPolicyType.ANY
multi_cluster_routing_use_any = data_v2_pb2.AppProfile.MultiClusterRoutingUseAny(
cluster_ids=[CLUSTER_ID, CLUSTER_ID_2]
)

app_profile_pb = data_v2_pb2.AppProfile(
name=APP_PROFILE_NAME,
description=description,
multi_cluster_routing_use_any=multi_cluster_routing_use_any,
)

app_profile = AppProfile.from_pb(app_profile_pb, instance)
assert isinstance(app_profile, AppProfile)
assert app_profile._instance is instance
assert app_profile.app_profile_id == APP_PROFILE_ID
assert app_profile.description == desctiption
assert app_profile.description == description
assert app_profile.routing_policy_type == routing
assert app_profile.cluster_id is None
assert app_profile.allow_transactional_writes is False
assert app_profile.multi_cluster_ids == [CLUSTER_ID, CLUSTER_ID_2]


def test_app_profile_from_pb_success_w_routing_single():
Expand All @@ -175,7 +232,7 @@ def test_app_profile_from_pb_success_w_routing_single():
client = _Client(PROJECT)
instance = _Instance(INSTANCE_ID, client)

desctiption = "routing single"
description = "routing single"
allow_transactional_writes = True
routing = RoutingPolicyType.SINGLE
single_cluster_routing = data_v2_pb2.AppProfile.SingleClusterRouting(
Expand All @@ -185,17 +242,18 @@ def test_app_profile_from_pb_success_w_routing_single():

app_profile_pb = data_v2_pb2.AppProfile(
name=APP_PROFILE_NAME,
description=desctiption,
description=description,
single_cluster_routing=single_cluster_routing,
)

app_profile = AppProfile.from_pb(app_profile_pb, instance)
assert isinstance(app_profile, AppProfile)
assert app_profile._instance is instance
assert app_profile.app_profile_id == APP_PROFILE_ID
assert app_profile.description == desctiption
assert app_profile.description == description
assert app_profile.routing_policy_type == routing
assert app_profile.cluster_id == CLUSTER_ID
assert app_profile.multi_cluster_ids is None
assert app_profile.allow_transactional_writes == allow_transactional_writes


Expand Down Expand Up @@ -290,6 +348,7 @@ def test_app_profile_reload_w_routing_any():
assert app_profile.routing_policy_type == routing
assert app_profile.description == description
assert app_profile.cluster_id is None
assert app_profile.multi_cluster_ids is None
assert app_profile.allow_transactional_writes is None

# Perform the method and check the result.
Expand All @@ -298,6 +357,7 @@ def test_app_profile_reload_w_routing_any():
assert app_profile.routing_policy_type == RoutingPolicyType.SINGLE
assert app_profile.description == description_from_server
assert app_profile.cluster_id == cluster_id_from_server
assert app_profile.multi_cluster_ids is None
assert app_profile.allow_transactional_writes == allow_transactional_writes


Expand Down Expand Up @@ -394,6 +454,7 @@ def test_app_profile_create_w_routing_any():
assert result.description == description
assert result.allow_transactional_writes is False
assert result.cluster_id is None
assert result.multi_cluster_ids is None


def test_app_profile_create_w_routing_single():
Expand Down Expand Up @@ -454,6 +515,7 @@ def test_app_profile_create_w_routing_single():
assert result.description == description
assert result.allow_transactional_writes == allow_writes
assert result.cluster_id == CLUSTER_ID
assert result.multi_cluster_ids is None


def test_app_profile_create_w_wrong_routing_policy():
Expand Down Expand Up @@ -540,6 +602,82 @@ def test_app_profile_update_w_routing_any():
)


def test_app_profile_update_w_routing_any_multi_cluster_ids():
from google.longrunning import operations_pb2
from google.protobuf.any_pb2 import Any
from google.cloud.bigtable_admin_v2.types import (
bigtable_instance_admin as messages_v2_pb2,
)
from google.cloud.bigtable.enums import RoutingPolicyType
from google.cloud.bigtable_admin_v2.services.bigtable_instance_admin import (
BigtableInstanceAdminClient,
)
from google.protobuf import field_mask_pb2

credentials = _make_credentials()
client = _make_client(project=PROJECT, credentials=credentials, admin=True)
instance = client.instance(INSTANCE_ID)

routing = RoutingPolicyType.SINGLE
description = "to routing policy single"
allow_writes = True
app_profile = _make_app_profile(
APP_PROFILE_ID,
instance,
routing_policy_type=routing,
description=description,
cluster_id=CLUSTER_ID,
allow_transactional_writes=allow_writes,
multi_cluster_ids=[CLUSTER_ID, CLUSTER_ID_2],
)

# Create response_pb
metadata = messages_v2_pb2.UpdateAppProfileMetadata()
type_url = "type.googleapis.com/{}".format(
messages_v2_pb2.UpdateAppProfileMetadata._meta._pb.DESCRIPTOR.full_name
)
response_pb = operations_pb2.Operation(
name=OP_NAME,
metadata=Any(type_url=type_url, value=metadata._pb.SerializeToString()),
)

# Patch the stub used by the API method.
instance_api = mock.create_autospec(BigtableInstanceAdminClient)
# Mock api calls
instance_api.app_profile_path.return_value = (
"projects/project/instances/instance-id/appProfiles/app-profile-id"
)

client._instance_admin_client = instance_api

# Perform the method and check the result.
ignore_warnings = True
expected_request_update_mask = field_mask_pb2.FieldMask(
paths=["description", "single_cluster_routing"]
)

expected_request = {
"request": {
"app_profile": app_profile._to_pb(),
"update_mask": expected_request_update_mask,
"ignore_warnings": ignore_warnings,
}
}

instance_api.update_app_profile.return_value = response_pb
app_profile._instance._client._instance_admin_client = instance_api
result = app_profile.update(ignore_warnings=ignore_warnings)
actual_request = client._instance_admin_client.update_app_profile.call_args_list[
0
].kwargs

assert actual_request == expected_request
assert (
result.metadata.type_url
== "type.googleapis.com/google.bigtable.admin.v2.UpdateAppProfileMetadata"
)


def test_app_profile_update_w_routing_single():
from google.longrunning import operations_pb2
from google.protobuf.any_pb2 import Any
Expand Down