Skip to content

Commit

Permalink
make all '' to "" in cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft committed Sep 8, 2020
1 parent dd573c5 commit 807b69b
Show file tree
Hide file tree
Showing 4 changed files with 27,559 additions and 367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ChangeFeedPaged(PageIterator):
:ivar int results_per_page:
The maximum number of results retrieved per API call.
:ivar dict continuation_token:
:ivar str continuation_token:
The continuation token to retrieve the next page of results.
:ivar current_page:
The current page of listed results.
Expand All @@ -42,7 +42,7 @@ class ChangeFeedPaged(PageIterator):
Filters the results to return only events which happened after this time.
:param datetime end_time:
Filters the results to return only events which happened before this time.
:param dict continuation_token:
:param str continuation_token:
An continuation token with which to start listing events from the previous position.
"""
def __init__(
Expand All @@ -58,12 +58,12 @@ def __init__(
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
continuation_token = eval(continuation_token) if continuation_token else None
continuation_token = json.loads(continuation_token) if continuation_token else None

if continuation_token and container_client.primary_hostname != continuation_token['UrlHost']:
raise ValueError('The token is not for the current storage account.')
if continuation_token and continuation_token['CursorVersion'] != 1:
raise ValueError('The CursorVersion is not supported by the current SDK.')
if continuation_token and (container_client.primary_hostname != continuation_token["UrlHost"]):
raise ValueError("The token is not for the current storage account.")
if continuation_token and (continuation_token["CursorVersion"] != 1):
raise ValueError("The CursorVersion is not supported by the current SDK.")
self.results_per_page = results_per_page or 5000
self.current_page = None
self._change_feed = ChangeFeed(container_client, self.results_per_page, start_time=start_time,
Expand All @@ -81,11 +81,11 @@ def _extract_data_cb(self, event_list):
self.current_page = event_list
try:
cursor = copy.deepcopy(self._change_feed.cursor)
shard_cursors = cursor['CurrentSegmentCursor']['ShardCursors']
cursor['CurrentSegmentCursor']['ShardCursors'] = [v for v in shard_cursors.values()]
shard_cursors = cursor["CurrentSegmentCursor"]["ShardCursors"]
cursor["CurrentSegmentCursor"]["ShardCursors"] = list(shard_cursors.values())
except AttributeError:
pass
return str(cursor), self.current_page
return json.dumps(cursor), self.current_page


class ChangeFeed(object):
Expand All @@ -97,17 +97,17 @@ def __init__(self, client, page_size, start_time=None, end_time=None, cf_cursor=
self.start_time = start_time

# the end time is in str format
end_time_in_cursor = cf_cursor['EndTime'] if cf_cursor else None
end_time_in_cursor = cf_cursor["EndTime"] if cf_cursor else None
# convert the end time in str format to a datetime object
end_time_in_cursor_obj = \
datetime.strptime(end_time_in_cursor, '%Y-%m-%dT%H:%M:%S+00:00') if end_time_in_cursor else None
datetime.strptime(end_time_in_cursor, "%Y-%m-%dT%H:%M:%S+00:00") if end_time_in_cursor else None
# self.end_time is in datetime format
self.end_time = end_time or end_time_in_cursor_obj

cur_segment_cursor = cf_cursor['CurrentSegmentCursor'] if cf_cursor else None
cur_segment_cursor = cf_cursor["CurrentSegmentCursor"] if cf_cursor else None

self.cursor = {"CursorVersion": 1,
"EndTime": self.end_time.strftime('%Y-%m-%dT%H:%M:%S+00:00') if self.end_time else "",
"EndTime": self.end_time.strftime("%Y-%m-%dT%H:%M:%S+00:00") if self.end_time else "",
"UrlHost": self.client.primary_hostname}
self._initialize(cur_segment_cursor=cur_segment_cursor)

Expand Down Expand Up @@ -147,7 +147,7 @@ def _initialize(self, cur_segment_cursor=None):
start_year = self.start_time.year
except AttributeError:
try:
start_date = self._parse_datetime_from_segment_path(cur_segment_cursor.get('SegmentPath'))
start_date = self._parse_datetime_from_segment_path(cur_segment_cursor.get("SegmentPath"))
start_year = start_date.year
except AttributeError:
start_year = ""
Expand All @@ -163,7 +163,7 @@ def _initialize(self, cur_segment_cursor=None):

# if change_feed_cursor is specified, start from the specified segment
if cur_segment_cursor:
while next_segment_path and next_segment_path != cur_segment_cursor['SegmentPath']:
while next_segment_path and next_segment_path != cur_segment_cursor["SegmentPath"]:
next_segment_path = next(self._segment_paths_generator)

self.current_segment = self._get_next_segment(
Expand All @@ -185,7 +185,7 @@ def _get_segment_paths(self, start_year=""):
for path in paths:
yield path.name

# if not searching by prefix, all paths would have been iterated already, so it's time to yield None
# if not searching by prefix, all paths would have been iterated already, so it"s time to yield None
if not start_year:
break
# search the segment prefixed with next year.
Expand Down Expand Up @@ -221,9 +221,10 @@ def __init__(self, client, segment_path, page_size, segment_cursor=None):
self.segment_path = segment_path
self.page_size = page_size
self.shards = collections.deque()
self.cursor = {'ShardCursors': {}, 'SegmentPath': self.segment_path}
self.cursor = {"ShardCursors": {}, "SegmentPath": self.segment_path}
self._initialize(segment_cursor=segment_cursor)
# cursor is in this format {"segment_path", path, "CurrentShardPath": shard_path, "segment_cursor": ShardCursors dict}
# cursor is in this format:
# {"segment_path", path, "CurrentShardPath": shard_path, "segment_cursor": ShardCursors dict}

def __iter__(self):
return self
Expand All @@ -240,8 +241,8 @@ def __next__(self):
pass

# update cursor
self.cursor['ShardCursors'][shard.shard_path] = shard.cursor
self.cursor['CurrentShardPath'] = shard.shard_path
self.cursor["ShardCursors"][shard.shard_path] = shard.cursor
self.cursor["CurrentShardPath"] = shard.shard_path

if not segment_events:
raise StopIteration
Expand All @@ -255,20 +256,20 @@ def _initialize(self, segment_cursor=None):
segment_content = segment_content.decode()
segment_dict = json.loads(segment_content)

raw_shard_paths = segment_dict['chunkFilePaths']
raw_shard_paths = segment_dict["chunkFilePaths"]
shard_paths = []
# to strip the overhead of all raw shard paths
for raw_shard_path in raw_shard_paths:
shard_paths.append(raw_shard_path.replace('$blobchangefeed/', '', 1))
shard_paths.append(raw_shard_path.replace("$blobchangefeed/", "", 1))

# TODO: we can optimize to initiate shards in parallel
if not segment_cursor:
for shard_path in shard_paths:
self.shards.append(Shard(self.client, shard_path))
else:
start_shard_path = segment_cursor['CurrentShardPath']
shard_cursors = {shard_cursor['CurrentChunkPath'][:-10]: shard_cursor
for shard_cursor in segment_cursor['ShardCursors']}
start_shard_path = segment_cursor["CurrentShardPath"]
shard_cursors = {shard_cursor["CurrentChunkPath"][:-10]: shard_cursor
for shard_cursor in segment_cursor["ShardCursors"]}

if shard_paths:
# Initialize all shards using the shard cursors
Expand Down Expand Up @@ -318,7 +319,7 @@ def _initialize(self, shard_cursor=None):
# move cursor to the expected chunk
if shard_cursor:
while self.unprocessed_chunk_path_props and \
self.unprocessed_chunk_path_props[0].name != shard_cursor.get('CurrentChunkPath'):
self.unprocessed_chunk_path_props[0].name != shard_cursor.get("CurrentChunkPath"):
self.unprocessed_chunk_path_props.popleft()
self.current_chunk = self._get_next_chunk(chunk_cursor=shard_cursor)
else:
Expand All @@ -336,7 +337,7 @@ def __init__(self, client, chunk_path, chunk_cursor=None):
self.client = client
self.chunk_path = chunk_path
self.file_reader = None
self.cursor = {'CurrentChunkPath': chunk_path} # to track the current position in avro file
self.cursor = {"CurrentChunkPath": chunk_path} # to track the current position in avro file
self._data_stream = None
self._initialize(chunk_cursor=chunk_cursor)

Expand All @@ -346,12 +347,12 @@ def __iter__(self):
def __next__(self):
try:
event = next(self.file_reader)
self.cursor['EventIndex'] = self._data_stream.event_index
self.cursor['BlockOffset'] = self._data_stream.object_position
self.cursor["EventIndex"] = self._data_stream.event_index
self.cursor["BlockOffset"] = self._data_stream.object_position
return event
except StopIteration:
self.cursor['EventIndex'] = self._data_stream.event_index
self.cursor['BlockOffset'] = self._data_stream.object_position
self.cursor["EventIndex"] = self._data_stream.event_index
self.cursor["BlockOffset"] = self._data_stream.object_position
raise StopIteration

next = __next__ # Python 2 compatibility.
Expand All @@ -360,15 +361,15 @@ def _initialize(self, chunk_cursor=None):
# To get all events in a chunk
blob_client = self.client.get_blob_client(self.chunk_path)

file_offset = chunk_cursor.get('BlockOffset') if chunk_cursor else 0
file_offset = chunk_cursor.get("BlockOffset") if chunk_cursor else 0

# An offset means the avro data doesn't have avro header,
# so only when the data stream has a offset we need header stream to help
header_stream = ChangeFeedStreamer(blob_client) if file_offset else None
self._data_stream = ChangeFeedStreamer(blob_client, chunk_file_start=file_offset)
self.file_reader = DataFileReader(self._data_stream, DatumReader(), header_reader=header_stream)

event_index = chunk_cursor.get('EventIndex') if chunk_cursor else 0
event_index = chunk_cursor.get("EventIndex") if chunk_cursor else 0
for _ in range(0, event_index):
next(self.file_reader)

Expand All @@ -385,7 +386,7 @@ def __init__(self, blob_client, chunk_file_start=0):
self.event_index = 0
self._point = self._chunk_file_start # file cursor position relative to the whole chunk file, not the buffered
self._chunk_size = 4 * 1024 * 1024
self._buf = b''
self._buf = b""
self._buf_start = self._chunk_file_start # the start position of the chunk file to buffer
self._chunk_size_snapshot = blob_client.get_blob_properties().size
length = self._chunk_size_snapshot - self._chunk_file_start
Expand Down Expand Up @@ -456,4 +457,3 @@ def track_object_position(self):

def set_object_index(self, event_index):
self.event_index = event_index

Loading

0 comments on commit 807b69b

Please sign in to comment.