Skip to content

Commit

Permalink
1. add a while True for sample
Browse files Browse the repository at this point in the history
2. make the list of shards in cursor to a dict in internal code
3. test list 3-shard events in multiple times generate same results as
   list all events at once
4. Java is using sequential list, so it could give 1 shard cursor even
   there are 3 shards, the test makes sure python is working with 1 shard
   cursor.
  • Loading branch information
xiafu-msft committed Sep 4, 2020
1 parent 4d0b71d commit 6992395
Show file tree
Hide file tree
Showing 5 changed files with 59,978 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def list_changes(self, **kwargs):
:keyword datetime start_time:
Filters the results to return only events which happened after this time.
If start_time and continuation_token are both set, start_time will be ignored.
:keyword datetime end_time:
Filters the results to return only events which happened before this time.
:keyword int results_per_page:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# pylint: disable=too-few-public-methods, too-many-instance-attributes
# pylint: disable=super-init-not-called, too-many-lines
import collections
import copy
import json
from datetime import datetime

Expand Down Expand Up @@ -50,13 +51,14 @@ def __init__(
start_time=None,
end_time=None,
continuation_token=None):
if start_time and continuation_token:
raise ValueError("start_time and continuation_token shouldn't be specified at the same time")
super(ChangeFeedPaged, self).__init__(
get_next=self._get_next_cf,
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
# If start_time and continuation_token are both set, start_time will be ignored.
start_time = None if start_time and continuation_token else start_time

continuation_token = eval(continuation_token) if continuation_token else None

if continuation_token and container_client.primary_hostname != continuation_token['UrlHost']:
Expand All @@ -78,8 +80,13 @@ def _get_next_cf(self, continuation_token): # pylint:disable=unused-argument

def _extract_data_cb(self, event_list):
self.current_page = event_list

return str(self._change_feed.cursor), self.current_page
try:
cursor = copy.deepcopy(self._change_feed.cursor)
shard_cursors = cursor['CurrentSegmentCursor']['ShardCursors']
cursor['CurrentSegmentCursor']['ShardCursors'] = [v for v in shard_cursors.values()]
except AttributeError:
pass
return str(cursor), self.current_page


class ChangeFeed(object):
Expand Down Expand Up @@ -215,7 +222,7 @@ def __init__(self, client, segment_path, page_size, segment_cursor=None):
self.segment_path = segment_path
self.page_size = page_size
self.shards = collections.deque()
self.cursor = {'ShardCursors': [], 'SegmentPath': self.segment_path}
self.cursor = {'ShardCursors': {}, 'SegmentPath': self.segment_path}
self._initialize(segment_cursor=segment_cursor)
# cursor is in this format {"segment_path", path, "CurrentShardPath": shard_path, "segment_cursor": ShardCursors dict}

Expand All @@ -234,14 +241,7 @@ def __next__(self):
pass

# update cursor
is_shard_cursor_in_list = False
for shard_cursor in self.cursor['ShardCursors']:
if shard_cursor['CurrentChunkPath'] == shard.cursor['CurrentChunkPath']:
shard_cursor['EventIndex'] = shard.cursor['EventIndex']
shard_cursor['BlockOffset'] = shard.cursor['BlockOffset']
is_shard_cursor_in_list = True
if not is_shard_cursor_in_list:
self.cursor['ShardCursors'].append(shard.cursor)
self.cursor['ShardCursors'][shard.shard_path] = shard.cursor
self.cursor['CurrentShardPath'] = shard.shard_path

if not segment_events:
Expand All @@ -268,11 +268,13 @@ def _initialize(self, segment_cursor=None):
self.shards.append(Shard(self.client, shard_path))
else:
start_shard_path = segment_cursor['CurrentShardPath']
shard_cursors = {shard_cursor['CurrentChunkPath'][:-10]: shard_cursor
for shard_cursor in segment_cursor['ShardCursors']}

if shard_paths:
# Initialize all shards using the shard cursors, skip those finished shards
for shard_cursor in segment_cursor['ShardCursors']:
self.shards.append(Shard(self.client, shard_cursor['CurrentChunkPath'][:-10], shard_cursor))
# Initialize all shards using the shard cursors
for shard_path in shard_paths:
self.shards.append(Shard(self.client, shard_path, shard_cursors.get(shard_path)))

# the move the shard behind start_shard_path one to the left most place, the left most shard is the next
# shard we should read based on continuation token.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,37 +98,28 @@ def list_events_in_live_mode(self):
# Instantiate a ChangeFeedClient
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
# to get continuation token
start_time = datetime(2020, 8, 19, 10)
change_feed = cf_client.list_changes(start_time=start_time).by_page()

for page in change_feed:
for event in page:
print(event)
token = change_feed.continuation_token
start_time = datetime(2020, 9, 1, 1)
token = None

sleep(120)
print("continue printing events")
# restart using the continuation token
change_feed2 = cf_client.list_changes(results_per_page=56).by_page(continuation_token=token)
change_feed_page2 = next(change_feed2)
for event in change_feed_page2:
print(event)
while True:
# start_time will be ignored if start_time and continuation_token are both non-empty
change_feed = cf_client.list_changes(start_time=start_time).by_page(continuation_token=token)

sleep(120)
print("continue printing events")
for page in change_feed:
for event in page:
print(event)
token = change_feed.continuation_token

token2 = change_feed2.continuation_token
change_feed3 = cf_client.list_changes(results_per_page=56).by_page(continuation_token=token2)
change_feed_page3 = next(change_feed3)
for event in change_feed_page3:
print(event)
sleep(60)
print("continue printing events")


if __name__ == '__main__':
sample = ChangeFeedSamples()
sample.list_events_by_page()
sample.list_all_events()
sample.list_range_of_events()
sample.list_events_in_live_mode_continuously()
sample.list_events_using_continuation_token()
sample.list_events_in_live_mode()

Loading

0 comments on commit 6992395

Please sign in to comment.