Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

call on_error if timeout in flush #16485

Merged
merged 4 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/search/azure-search-documents/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 11.1.0 (2021-02-09)
## 11.1.0 (2021-02-10)

**Breaking Changes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi
:type index_name: str
:param credential: A credential to authorize search client requests
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword bool auto_flush: if the auto flush mode is on. Default to True.
:keyword int auto_flush_interval: how many max seconds if between 2 flushes. This only takes effect
when auto_flush is on. Default to 60 seconds. If a non-positive number is set, it will be default
to 86400s (1 day)
when auto_flush is on. Default to 60 seconds.
:keyword int initial_batch_action_count: The initial number of actions to group into a batch when
tuning the behavior of the sender. The default value is 512.
:keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3.
Expand Down Expand Up @@ -107,13 +105,18 @@ def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argument
:param int timeout: time out setting. Default is 86400s (one day)
:return: True if there are errors. Else False
:rtype: bool
:raises ~azure.core.exceptions.ServiceResponseTimeoutError:
"""
has_error = False
begin_time = int(time.time())
while len(self.actions) > 0:
now = int(time.time())
remaining = timeout - (now - begin_time)
if remaining < 0:
if self._on_error:
actions = self._index_documents_batch.dequeue_actions()
for action in actions:
self._on_error(action)
raise ServiceResponseTimeoutError("Service response time out")
result = self._process(timeout=remaining, raise_error=False)
if result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, endpoint, index_name, credential, **kwargs):
self._batch_action_count = kwargs.pop('initial_batch_action_count', self._DEFAULT_INITIAL_BATCH_ACTION_COUNT)
self._auto_flush_interval = kwargs.pop('auto_flush_interval', self._DEFAULT_AUTO_FLUSH_INTERVAL)
if self._auto_flush_interval <= 0:
self._auto_flush_interval = 86400
raise ValueError("auto_flush_interval must be a positive number.")
self._max_retries_per_action = kwargs.pop('max_retries_per_action ', self._DEFAULT_MAX_RETRIES)
self._endpoint = endpoint # type: str
self._index_name = index_name # type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi
:type index_name: str
:param credential: A credential to authorize search client requests
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword bool auto_flush: if the auto flush mode is on. Default to True.
:keyword int auto_flush_interval: how many max seconds if between 2 flushes. This only takes effect
when auto_flush is on. Default to 60 seconds. If a non-positive number is set, it will be default
to 86400s (1 day)
when auto_flush is on. Default to 60 seconds.
:keyword int initial_batch_action_count: The initial number of actions to group into a batch when
tuning the behavior of the sender. The default value is 512.
:keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3.
Expand Down Expand Up @@ -106,13 +104,18 @@ async def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argum
:param int timeout: time out setting. Default is 86400s (one day)
:return: True if there are errors. Else False
:rtype: bool
:raises ~azure.core.exceptions.ServiceResponseTimeoutError:
"""
has_error = False
begin_time = int(time.time())
while len(self.actions) > 0:
now = int(time.time())
remaining = timeout - (now - begin_time)
if remaining < 0:
if self._on_error:
actions = await self._index_documents_batch.dequeue_actions()
for action in actions:
await self._on_error(action)
raise ServiceResponseTimeoutError("Service response time out")
result = await self._process(timeout=remaining, raise_error=False)
if result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from unittest import mock
except ImportError:
import mock

import pytest
from azure.search.documents.aio import (
SearchIndexingBufferedSender,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.core.exceptions import HttpResponseError, ServiceResponseTimeoutError
from azure.search.documents.models import IndexingResult

CREDENTIAL = AzureKeyCredential(key="test_api_key")
Expand Down Expand Up @@ -102,6 +102,31 @@ async def mock_fail_index_documents(actions, timeout=86400):
await client.flush()
assert on_error.called

async def test_callback_error_on_timeout(self):
async def mock_fail_index_documents(actions, timeout=86400):
if len(actions) > 0:
print("There is something wrong")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print statement? :)

result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 400
result.succeeded = False
self.uploaded = self.uploaded + len(actions) - 1
time.sleep(1)
return [result]

on_error = mock.AsyncMock()
async with SearchIndexingBufferedSender("endpoint",
"index name",
CREDENTIAL,
auto_flush=False,
on_error=on_error) as client:
client._index_documents_actions = mock_fail_index_documents
client._index_key = "id"
await client.upload_documents([{"id": 0},{"id": 1}])
with pytest.raises(ServiceResponseTimeoutError):
await client.flush(timeout=-1)
assert on_error.call_count == 2

async def test_callback_progress(self):
async def mock_successful_index_documents(actions, timeout=86400):
if len(actions) > 0:
Expand Down
29 changes: 28 additions & 1 deletion sdk/search/azure-search-documents/tests/test_buffered_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
except ImportError:
import mock

import pytest
from azure.search.documents import (
SearchIndexingBufferedSender,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.core.exceptions import HttpResponseError, ServiceResponseTimeoutError
from azure.search.documents.models import IndexingResult

CREDENTIAL = AzureKeyCredential(key="test_api_key")
Expand Down Expand Up @@ -106,6 +107,32 @@ def mock_fail_index_documents(actions, timeout=86400):
client.flush()
assert on_error.called

def test_callback_error_on_timeout(self):
def mock_fail_index_documents(actions, timeout=86400):
import time
if len(actions) > 0:
print("There is something wrong")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print statement here too

result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 400
result.succeeded = False
self.uploaded = self.uploaded + len(actions) - 1
time.sleep(1)
return [result]

on_error = mock.Mock()
with SearchIndexingBufferedSender("endpoint",
"index name",
CREDENTIAL,
auto_flush=False,
on_error=on_error) as client:
client._index_documents_actions = mock_fail_index_documents
client._index_key = "id"
client.upload_documents([{"id": 0},{"id": 1}])
with pytest.raises(ServiceResponseTimeoutError):
client.flush(timeout=-1)
assert on_error.call_count == 2

def test_callback_progress(self):
def mock_successful_index_documents(actions, timeout=86400):
if len(actions) > 0:
Expand Down