diff --git a/tests/pyensign/test_connection.py b/tests/pyensign/test_connection.py index bcc6e48..25c2b86 100644 --- a/tests/pyensign/test_connection.py +++ b/tests/pyensign/test_connection.py @@ -635,6 +635,30 @@ async def source_events(): mock_write.side_effect = None await client.close() + @pytest.mark.asyncio + async def test_publish_close(self, client): + """ + Closing the client should flush the publish queue. + """ + + events = [ + Event("event {}".format(i).encode(), mimetype="text/plain") + for i in range(1000) + ] + + async def source_events(): + for event in events: + yield event + + await client.publish(OTTERS_TOPIC, source_events()) + for publisher in client.publishers.values(): + assert not publisher.queue._request_queue.empty() + + # Close the client, events should be flushed. + await client.close() + for publisher in client.publishers.values(): + assert publisher.queue._request_queue.empty() + def test_publish_sync(self, client): """ Test executing publish from synchronous code as a coroutine. diff --git a/tests/pyensign/test_ensign.py b/tests/pyensign/test_ensign.py index 2aafd8f..61b3df7 100644 --- a/tests/pyensign/test_ensign.py +++ b/tests/pyensign/test_ensign.py @@ -1198,6 +1198,35 @@ async def test_set_topic_sharding_strategy(self, mock_set_policy, strategy, ensi state = await ensign.set_topic_sharding_strategy(123, strategy) assert state == TopicState.READY + @pytest.mark.asyncio + async def test_live_multi_publish(self, live, creds, authserver, ensignserver): + if not live: + pytest.skip("Skipping live test") + if not authserver: + pytest.skip("Skipping live test") + if not ensignserver: + pytest.skip("Skipping live test") + + ensign = Ensign(endpoint=ensignserver, auth_url=authserver, cred_path=creds) + events = [ + Event("event {}".format(i).encode(), mimetype="text/plain") + for i in range(10) + ] + topic = "python-events" + + # Ensure the topic exists + await ensign.ensure_topic_exists(topic) + + # Publish the event + await ensign.publish(topic, events) + + # Close the client to flush the events + await ensign.close() + + # All events should be published + for event in events: + assert event.published() + @pytest.mark.asyncio async def test_live_pubsub(self, live, creds, authserver, ensignserver): if not live: