Skip to content

Commit

Permalink
Adding update_sync_token functionality to Appconfig Clients (Azure#17421
Browse files Browse the repository at this point in the history
)

* breaking off into a second PR for just update_sync_token functionality

* adding newline

* updating based on xiangs comments, updating changelog

* changes to sync token test, upgrade to 1.2.0
  • Loading branch information
seankane-msft authored Mar 30, 2021
1 parent 340e41d commit 7cb210f
Show file tree
Hide file tree
Showing 13 changed files with 2,223 additions and 16 deletions.
5 changes: 4 additions & 1 deletion sdk/appconfiguration/azure-appconfiguration/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

-------------------

## 1.1.2 (Unreleased)
## 1.2.0 (Unreleased)

### Features

- Adds method `update_sync_token` to include sync tokens from EventGrid notifications.

## 1.1.1 (2020-10-05)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ def __init__(self, base_url, credential, **kwargs):
base_user_agent=USER_AGENT, **kwargs
)

self._sync_token_policy = None

pipeline = kwargs.get("pipeline")

if pipeline is None:
self._sync_token_policy = SyncTokenPolicy()
aad_mode = not isinstance(credential, AppConfigConnectionStringCredential)
pipeline = self._create_appconfig_pipeline(
credential=credential, aad_mode=aad_mode, base_url=base_url, **kwargs
Expand Down Expand Up @@ -140,7 +143,7 @@ def _create_appconfig_pipeline(
self._config.headers_policy,
self._config.user_agent_policy,
self._config.retry_policy,
SyncTokenPolicy(),
self._sync_token_policy,
credential_policy,
self._config.logging_policy, # HTTP request/response log
DistributedTracingPolicy(**kwargs),
Expand Down Expand Up @@ -578,3 +581,22 @@ def set_read_only(
raise HttpResponseError(message=error.message, response=error.response)
except binascii.Error:
raise binascii.Error("Connection string secret has incorrect padding")

def update_sync_token(self, token):
# type: (str) -> None

"""Add a sync token to the internal list of tokens.
:param token: The sync token to be added to the internal list of tokens
:type token: str
"""
if not self._sync_token_policy:
raise AttributeError(
"Client has no sync token policy, possibly because it was not provided during instantiation."
)
self._sync_token_policy.add_token(token)

def close(self):
# type: (...) -> None

"""Close all connections made by the client"""
self._impl._client.close()
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def from_sync_token_string(cls, sync_token):
class SyncTokenPolicy(SansIOHTTPPolicy):
"""A simple policy that enable the given callback
with the response.
:keyword callback raw_response_hook: Callback function. Will be invoked on response.
"""

Expand All @@ -66,7 +65,6 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument
def on_request(self, request): # type: ignore # pylint: disable=arguments-differ
# type: (PipelineRequest) -> None
"""This is executed before sending the request to the next policy.
:param request: The PipelineRequest object.
:type request: ~azure.core.pipeline.PipelineRequest
"""
Expand All @@ -79,7 +77,6 @@ def on_request(self, request): # type: ignore # pylint: disable=arguments-diffe
def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ
# type: (PipelineRequest, PipelineResponse) -> None
"""This is executed after the request comes back from the policy.
:param request: The PipelineRequest object.
:type request: ~azure.core.pipeline.PipelineRequest
:param response: The PipelineResponse object.
Expand All @@ -93,11 +90,22 @@ def on_response(self, request, response): # type: ignore # pylint: disable=argu
return
for sync_token_string in sync_token_strings:
sync_token = SyncToken.from_sync_token_string(sync_token_string)
if not sync_token:
continue
existing_token = self._sync_tokens.get(sync_token.token_id, None)
if not existing_token:
self._sync_tokens[sync_token.token_id] = sync_token
continue
if existing_token.sequence_number < sync_token.sequence_number:
self._sync_tokens[sync_token.token_id] = sync_token
self._update_sync_token(sync_token)

def add_token(self, full_raw_tokens):
# type: (str) -> None
raw_tokens = full_raw_tokens.split(",")
for raw_token in raw_tokens:
sync_token = SyncToken.from_sync_token_string(raw_token)
self._update_sync_token(sync_token)

def _update_sync_token(self, sync_token):
# type: (SyncToken) -> None
if not sync_token:
return
existing_token = self._sync_tokens.get(sync_token.token_id, None)
if not existing_token:
self._sync_tokens[sync_token.token_id] = sync_token
return
if existing_token.sequence_number < sync_token.sequence_number:
self._sync_tokens[sync_token.token_id] = sync_token
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "1.1.2"
VERSION = "1.2.0"
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ def __init__(self, base_url, credential, **kwargs):
base_user_agent=USER_AGENT, **kwargs
)

self._sync_token_policy = None

pipeline = kwargs.get("pipeline")

if pipeline is None:
self._sync_token_policy = SyncTokenPolicy()
aad_mode = not isinstance(credential, AppConfigConnectionStringCredential)
pipeline = self._create_appconfig_pipeline(
credential=credential, aad_mode=aad_mode, base_url=base_url, **kwargs
Expand Down Expand Up @@ -148,8 +151,8 @@ def _create_appconfig_pipeline(
self._config.headers_policy,
self._config.user_agent_policy,
self._config.retry_policy,
self._sync_token_policy,
credential_policy,
SyncTokenPolicy(),
self._config.logging_policy, # HTTP request/response log
DistributedTracingPolicy(**kwargs),
HttpLoggingPolicy(**kwargs),
Expand Down Expand Up @@ -597,3 +600,19 @@ async def set_read_only(
raise HttpResponseError(message=error.message, response=error.response)
except binascii.Error:
raise binascii.Error("Connection string secret has incorrect padding")

def update_sync_token(self, token):
# type: (str) -> None

"""Add a sync token to the internal list of tokens.
:param token: The sync token to be added to the internal list of tokens
:type token: str
"""

self._sync_token_policy.add_token(token)

async def close(self):
# type: (...) -> None

"""Close all connections made by the client"""
await self._impl._client.close()
Loading

0 comments on commit 7cb210f

Please sign in to comment.