Skip to content

Commit

Permalink
Add test for deleting stream with consumer group (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius authored Jul 30, 2020
1 parent 64cbce7 commit 91346c2
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,39 @@ def setUp(self):
path=self._path,
raise_for_status=[200, 204, 404])

def test_delete_stream_with_cg(self):
num_shards = 8

# check that the stream doesn't exist
self.assertFalse(self._stream_exists())

# create a stream
self._client.create_stream(container=self._container,
path=self._path,
shard_count=num_shards)

# write data to all shards so there are files
for shard_id in range(num_shards):
self._client.put_records(container=self._container,
path=self._path,
records=[
{'shard_id': shard_id, 'data': f'data for shard {shard_id}'}
])

# write several "consumer group state" files
for cg_id in range(3):
self._client.put_object(container=self._container,
path=os.path.join(self._path, f'cg{cg_id}-state.json'))

# check that the stream doesn't exist
self.assertTrue(self._stream_exists())

# delete the stream
self._client.delete_stream(container=self._container, path=self._path)

# check that the stream doesn't exist
self.assertFalse(self._stream_exists())

def test_stream(self):

# create a stream w/8 shards
Expand Down Expand Up @@ -162,6 +195,12 @@ def test_stream(self):
self._client.delete_stream(container=self._container,
path=self._path)

def _stream_exists(self):
response = self._client.describe_stream(container=self._container,
path=self._path,
raise_for_status=v3io.dataplane.RaiseForStatus.never)
return response.status_code == 200


class TestObject(Test):

Expand Down

0 comments on commit 91346c2

Please sign in to comment.