From deedb45377d610a9df6ea67207413a4f1ea57d31 Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Fri, 8 Sep 2023 22:18:32 +0000 Subject: [PATCH] Changed FakeQuantumRunStream to support arbitrary response and exception timing (#6253) * Changed FakeQuantumRunStream to support arbitrary response and exception timing * Fixed type errors and task got bad yield errors * Fixed more type errors and replaced anext() calls * Addressed maffoo's comments --- .../cirq_google/engine/stream_manager_test.py | 534 +++++++++--------- 1 file changed, 282 insertions(+), 252 deletions(-) diff --git a/cirq-google/cirq_google/engine/stream_manager_test.py b/cirq-google/cirq_google/engine/stream_manager_test.py index 7b56dcb8bb3..3732547cdca 100644 --- a/cirq-google/cirq_google/engine/stream_manager_test.py +++ b/cirq-google/cirq_google/engine/stream_manager_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import AsyncIterable, AsyncIterator, Awaitable, List, Union +from typing import AsyncIterable, AsyncIterator, Awaitable, List, Sequence, Union import asyncio import concurrent from unittest import mock @@ -21,6 +21,7 @@ import pytest import google.api_core.exceptions as google_exceptions +from cirq_google.engine.asyncio_executor import AsyncioExecutor from cirq_google.engine.stream_manager import ( _get_retry_request_or_raise, ProgramAlreadyExistsError, @@ -49,63 +50,114 @@ # StreamManager test suite constants REQUEST_PROJECT_NAME = 'projects/proj' REQUEST_PROGRAM = quantum.QuantumProgram(name='projects/proj/programs/prog') -REQUEST_JOB = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') +REQUEST_JOB0 = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') +REQUEST_JOB1 = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job1') -def setup_fake_quantum_run_stream_client(client_constructor, responses_and_exceptions): - grpc_client = FakeQuantumRunStream(responses_and_exceptions) - client_constructor.return_value = grpc_client - return grpc_client +def setup_client(client_constructor): + fake_client = FakeQuantumRunStream() + client_constructor.return_value = fake_client + return fake_client + + +def setup(client_constructor): + fake_client = setup_client(client_constructor) + return fake_client, StreamManager(fake_client) class FakeQuantumRunStream: """A fake Quantum Engine client which supports QuantumRunStream and CancelQuantumJob.""" - def __init__( - self, responses_and_exceptions: List[Union[quantum.QuantumRunStreamResponse, BaseException]] - ): - self.stream_requests: List[quantum.QuantumRunStreamRequest] = [] - self.cancel_requests: List[quantum.CancelQuantumJobRequest] = [] - self.responses_and_exceptions = responses_and_exceptions + def __init__(self): + self.all_stream_requests: List[quantum.QuantumRunStreamRequest] = [] + self.all_cancel_requests: List[quantum.CancelQuantumJobRequest] = [] + self._executor = AsyncioExecutor.instance() + self._request_buffer = duet.AsyncCollector[quantum.QuantumRunStreamRequest]() + # asyncio.Queue needs to be initialized inside the asyncio thread because all callers need + # to use the same event loop. + self._responses_and_exceptions_future = duet.AwaitableFuture[asyncio.Queue]() async def quantum_run_stream( self, requests: AsyncIterator[quantum.QuantumRunStreamRequest], **kwargs ) -> Awaitable[AsyncIterable[quantum.QuantumRunStreamResponse]]: """Fakes the QuantumRunStream RPC. - Expects the number of requests to be the same as len(self.responses_and_exceptions). - - For every request, a response or exception is popped from `self.responses_and_exceptions`. - Before the next request: - * If it is a response, it is sent back through the stream. - * If it is an exception, the exception is raised. + Once a request is received, it is appended to `stream_requests`, and the test calling + `wait_for_requests()` is notified. - This fake does not support out-of-order responses. + The response is sent when a test calls `reply()` with a `QuantumRunStreamResponse`. If a + test calls `reply()` with an exception, it is raised here to the `quantum_run_stream()` + caller. - No responses are ever made if `self.responses_and_exceptions` is empty. + This is called from the asyncio thread. """ + responses_and_exceptions: asyncio.Queue = asyncio.Queue() + self._responses_and_exceptions_future.try_set_result(responses_and_exceptions) - async def run_async_iterator(): + async def read_requests(): async for request in requests: - self.stream_requests.append(request) - - if not self.responses_and_exceptions: - while True: - await asyncio.sleep(1) - - response_or_exception = self.responses_and_exceptions.pop(0) - if isinstance(response_or_exception, BaseException): + self.all_stream_requests.append(request) + self._request_buffer.add(request) + + async def response_iterator(): + asyncio.create_task(read_requests()) + while True: + response_or_exception = await responses_and_exceptions.get() + if isinstance(response_or_exception, quantum.QuantumRunStreamResponse): + yield response_or_exception + else: # isinstance(response_or_exception, BaseException) + self._responses_and_exceptions_future = duet.AwaitableFuture[asyncio.Queue]() raise response_or_exception - response_or_exception.message_id = request.message_id - yield response_or_exception await asyncio.sleep(0) - return run_async_iterator() + return response_iterator() async def cancel_quantum_job(self, request: quantum.CancelQuantumJobRequest) -> None: - self.cancel_requests.append(request) + """Records the cancellation in `cancel_requests`. + + This is called from the asyncio thread. + """ + self.all_cancel_requests.append(request) await asyncio.sleep(0) + async def wait_for_requests(self, num_requests=1) -> Sequence[quantum.QuantumRunStreamRequest]: + """Wait til `num_requests` number of requests are received via `quantum_run_stream()`. + + This must be called from the duet thread. + + Returns: + The received requests. + """ + requests = [] + for _ in range(num_requests): + requests.append(await self._request_buffer.__anext__()) + return requests + + async def reply( + self, response_or_exception: Union[quantum.QuantumRunStreamResponse, BaseException] + ): + """Sends a response or raises an exception to the `quantum_run_stream()` caller. + + If input response is missing `message_id`, it is defaulted to the `message_id` of the most + recent request. This is to support the most common use case of responding immediately after + a request. + + Assumes that at least one request must have been submitted to the StreamManager. + + This must be called from the duet thread. + """ + responses_and_exceptions = await self._responses_and_exceptions_future + if ( + isinstance(response_or_exception, quantum.QuantumRunStreamResponse) + and not response_or_exception.message_id + ): + response_or_exception.message_id = self.all_stream_requests[-1].message_id + + async def send(): + await responses_and_exceptions.put(response_or_exception) + + await self._executor.submit(send) + class TestResponseDemux: @pytest.fixture @@ -207,49 +259,38 @@ async def test_publish_exception_after_publishing_response_does_not_change_futur class TestStreamManager: @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_submit_expects_result_response(self, client_constructor): + # Arrange + expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - # Arrange - expected_result = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' - ) - mock_responses = [quantum.QuantumRunStreamResponse(result=expected_result)] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses - ) - manager = StreamManager(fake_client) - # Act - actual_result = await manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result)) + actual_result = await actual_result_future manager.stop() # Assert assert actual_result == expected_result - assert len(fake_client.stream_requests) == 1 + assert len(fake_client.all_stream_requests) == 1 # assert that the first request is a CreateQuantumProgramAndJobRequest. - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] duet.run(test) @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_submit_program_without_name_raises(self, client_constructor): + _, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - # Arrange - expected_result = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' - ) - mock_responses = [quantum.QuantumRunStreamResponse(result=expected_result)] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses - ) - manager = StreamManager(fake_client) - with pytest.raises(ValueError, match='Program name must be set'): await manager.submit( - REQUEST_PROJECT_NAME, quantum.QuantumProgram(), REQUEST_JOB + REQUEST_PROJECT_NAME, quantum.QuantumProgram(), REQUEST_JOB0 ) manager.stop() @@ -257,20 +298,17 @@ async def test(): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_submit_cancel_future_expects_engine_cancellation_rpc_call(self, client_constructor): + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=[] - ) - manager = StreamManager(fake_client) - - result_future = manager.submit(REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB) + result_future = manager.submit(REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0) result_future.cancel() await duet.sleep(1) # Let cancellation complete asynchronously manager.stop() - assert len(fake_client.cancel_requests) == 1 - assert fake_client.cancel_requests[0] == quantum.CancelQuantumJobRequest( + assert len(fake_client.all_cancel_requests) == 1 + assert fake_client.all_cancel_requests[0] == quantum.CancelQuantumJobRequest( name='projects/proj/programs/prog/jobs/job0' ) @@ -280,31 +318,28 @@ async def test(): def test_submit_stream_broken_twice_expects_retry_with_get_quantum_result_twice( self, client_constructor ): + expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - expected_result = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' - ) - mock_responses_and_exceptions = [ - google_exceptions.ServiceUnavailable('unavailable'), - google_exceptions.ServiceUnavailable('unavailable'), - quantum.QuantumRunStreamResponse(result=expected_result), - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses_and_exceptions - ) - manager = StreamManager(fake_client) - - actual_result = await manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) + await fake_client.wait_for_requests() + await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable')) + await fake_client.wait_for_requests() + await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable')) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result)) + actual_result = await actual_result_future manager.stop() assert actual_result == expected_result - assert len(fake_client.stream_requests) == 3 - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] - assert 'get_quantum_result' in fake_client.stream_requests[1] - assert 'get_quantum_result' in fake_client.stream_requests[2] + assert len(fake_client.all_stream_requests) == 3 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] + assert 'get_quantum_result' in fake_client.all_stream_requests[1] + assert 'get_quantum_result' in fake_client.all_stream_requests[2] duet.run(test) @@ -319,25 +354,24 @@ async def test(): def test_submit_with_retryable_stream_breakage_expects_get_result_request( self, client_constructor, error ): + expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - mock_responses = [ - error, - quantum.QuantumRunStreamResponse( - result=quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') - ), - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) - manager = StreamManager(fake_client) - - await manager.submit(REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB) + await fake_client.wait_for_requests() + await fake_client.reply(error) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result)) + await actual_result_future manager.stop() - assert len(fake_client.stream_requests) == 2 - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] - assert 'get_quantum_result' in fake_client.stream_requests[1] + assert len(fake_client.all_stream_requests) == 2 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] + assert 'get_quantum_result' in fake_client.all_stream_requests[1] duet.run(test) @@ -360,80 +394,73 @@ async def test(): def test_submit_with_non_retryable_stream_breakage_raises_error( self, client_constructor, error ): + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - mock_responses = [ - error, - quantum.QuantumRunStreamResponse( - result=quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') - ), - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) - manager = StreamManager(fake_client) - + await fake_client.wait_for_requests() + await fake_client.reply(error) with pytest.raises(type(error)): - await manager.submit(REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB) + await actual_result_future manager.stop() - assert len(fake_client.stream_requests) == 1 - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] + assert len(fake_client.all_stream_requests) == 1 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] duet.run(test) @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_submit_expects_job_response(self, client_constructor): + expected_job = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - expected_job = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') - mock_responses = [quantum.QuantumRunStreamResponse(job=expected_job)] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses - ) - manager = StreamManager(fake_client) - - actual_job = await manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + actual_job_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(job=expected_job)) + actual_job = await actual_job_future manager.stop() assert actual_job == expected_job - assert len(fake_client.stream_requests) == 1 - # assert that the first request is a CreateQuantumProgramAndJobRequest. - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] + assert len(fake_client.all_stream_requests) == 1 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] duet.run(test) @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_submit_job_does_not_exist_expects_create_quantum_job_request(self, client_constructor): + expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - expected_result = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) - mock_responses_and_exceptions = [ - google_exceptions.ServiceUnavailable('unavailable'), + await fake_client.wait_for_requests() + await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable')) + await fake_client.wait_for_requests() + await fake_client.reply( quantum.QuantumRunStreamResponse( error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST) - ), - quantum.QuantumRunStreamResponse(result=expected_result), - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses_and_exceptions - ) - manager = StreamManager(fake_client) - - actual_result = await manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + ) ) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result)) + actual_result = await actual_result_future manager.stop() assert actual_result == expected_result - assert len(fake_client.stream_requests) == 3 - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] - assert 'get_quantum_result' in fake_client.stream_requests[1] - assert 'create_quantum_job' in fake_client.stream_requests[2] + assert len(fake_client.all_stream_requests) == 3 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] + assert 'get_quantum_result' in fake_client.all_stream_requests[1] + assert 'create_quantum_job' in fake_client.all_stream_requests[2] duet.run(test) @@ -441,39 +468,41 @@ async def test(): def test_submit_program_does_not_exist_expects_create_quantum_program_and_job_request( self, client_constructor ): + expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - expected_result = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) - mock_responses_and_exceptions = [ - google_exceptions.ServiceUnavailable('unavailable'), + await fake_client.wait_for_requests() + await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable')) + await fake_client.wait_for_requests() + await fake_client.reply( quantum.QuantumRunStreamResponse( error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST) - ), + ) + ) + await fake_client.wait_for_requests() + await fake_client.reply( quantum.QuantumRunStreamResponse( error=quantum.StreamError( code=quantum.StreamError.Code.PROGRAM_DOES_NOT_EXIST ) - ), - quantum.QuantumRunStreamResponse(result=expected_result), - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses_and_exceptions - ) - manager = StreamManager(fake_client) - - actual_result = await manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + ) ) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result)) + actual_result = await actual_result_future manager.stop() assert actual_result == expected_result - assert len(fake_client.stream_requests) == 4 - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] - assert 'get_quantum_result' in fake_client.stream_requests[1] - assert 'create_quantum_job' in fake_client.stream_requests[2] - assert 'create_quantum_program_and_job' in fake_client.stream_requests[3] + assert len(fake_client.all_stream_requests) == 4 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] + assert 'get_quantum_result' in fake_client.all_stream_requests[1] + assert 'create_quantum_job' in fake_client.all_stream_requests[2] + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[3] duet.run(test) @@ -481,124 +510,129 @@ async def test(): def test_submit_program_already_exists_expects_program_already_exists_error( self, client_constructor ): + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - mock_responses_and_exceptions = [ + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 + ) + await fake_client.wait_for_requests() + await fake_client.reply( quantum.QuantumRunStreamResponse( error=quantum.StreamError( code=quantum.StreamError.Code.PROGRAM_ALREADY_EXISTS ) ) - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses_and_exceptions ) - manager = StreamManager(fake_client) - with pytest.raises(ProgramAlreadyExistsError): - await manager.submit(REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB) + await actual_result_future manager.stop() duet.run(test) @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_submit_twice_in_parallel_expect_result_responses(self, client_constructor): + expected_result0 = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + expected_result1 = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job1') + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - request_job1 = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job1') - expected_result0 = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' + actual_result0_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) - expected_result1 = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job1' + actual_result1_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB1 ) - mock_responses = [ - quantum.QuantumRunStreamResponse(result=expected_result0), - quantum.QuantumRunStreamResponse(result=expected_result1), - ] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses + await fake_client.wait_for_requests(num_requests=2) + await fake_client.reply( + quantum.QuantumRunStreamResponse( + message_id=fake_client.all_stream_requests[0].message_id, + result=expected_result0, + ) ) - manager = StreamManager(fake_client) + await fake_client.reply( + quantum.QuantumRunStreamResponse( + message_id=fake_client.all_stream_requests[1].message_id, + result=expected_result1, + ) + ) + actual_result1 = await actual_result1_future + actual_result0 = await actual_result0_future + manager.stop() + assert actual_result0 == expected_result0 + assert actual_result1 == expected_result1 + assert len(fake_client.all_stream_requests) == 2 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[1] + + duet.run(test) + + @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) + def test_submit_twice_and_break_stream_expect_result_responses(self, client_constructor): + expected_result0 = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + expected_result1 = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job1') + fake_client, manager = setup(client_constructor) + + async def test(): + async with duet.timeout_scope(5): actual_result0_future = manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) actual_result1_future = manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, request_job1 + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB1 + ) + await fake_client.wait_for_requests(num_requests=2) + await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable')) + await fake_client.wait_for_requests(num_requests=2) + await fake_client.reply( + quantum.QuantumRunStreamResponse( + message_id=next( + req.message_id + for req in fake_client.all_stream_requests[2:] + if req.get_quantum_result.parent == expected_result0.parent + ), + result=expected_result0, + ) + ) + await fake_client.reply( + quantum.QuantumRunStreamResponse( + message_id=next( + req.message_id + for req in fake_client.all_stream_requests[2:] + if req.get_quantum_result.parent == expected_result1.parent + ), + result=expected_result1, + ) ) - actual_result1 = await actual_result1_future actual_result0 = await actual_result0_future + actual_result1 = await actual_result1_future manager.stop() assert actual_result0 == expected_result0 assert actual_result1 == expected_result1 - assert len(fake_client.stream_requests) == 2 - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] - assert 'create_quantum_program_and_job' in fake_client.stream_requests[1] + assert len(fake_client.all_stream_requests) == 4 + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[1] + assert 'get_quantum_result' in fake_client.all_stream_requests[2] + assert 'get_quantum_result' in fake_client.all_stream_requests[3] duet.run(test) - # TODO(#5996) Update fake client implementation to support this test case. - # @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) - # def test_submit_twice_and_break_stream_expect_result_responses(self, client_constructor): - # async def test(): - # async with duet.timeout_scope(5): - # request_job1 = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job1') - # expected_result0 = quantum.QuantumResult( - # parent='projects/proj/programs/prog/jobs/job0' - # ) - # expected_result1 = quantum.QuantumResult( - # parent='projects/proj/programs/prog/jobs/job1' - # ) - # # TODO the current fake client doesn't have the response timing flexibility - # # required by this test. - # # Ideally, the client raises ServiceUnavailable after both initial requests are - # # sent. - # mock_responses = [ - # google_exceptions.ServiceUnavailable('unavailable'), - # google_exceptions.ServiceUnavailable('unavailable'), - # quantum.QuantumRunStreamResponse(result=expected_result0), - # quantum.QuantumRunStreamResponse(result=expected_result1), - # ] - # fake_client = setup_fake_quantum_run_stream_client( - # client_constructor, responses_and_exceptions=mock_responses - # ) - # manager = StreamManager(fake_client) - - # actual_result0_future = manager.submit( - # REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB - # ) - # actual_result1_future = manager.submit( - # REQUEST_PROJECT_NAME, REQUEST_PROGRAM, request_job1 - # ) - # actual_result1 = await actual_result1_future - # actual_result0 = await actual_result0_future - # manager.stop() - - # assert actual_result0 == expected_result0 - # assert actual_result1 == expected_result1 - # assert len(fake_client.stream_requests) == 2 - # assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] - # assert 'create_quantum_program_and_job' in fake_client.stream_requests[1] - - # duet.run(test) - @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_stop_cancels_existing_sends(self, client_constructor): + fake_client, manager = setup(client_constructor) + async def test(): async with duet.timeout_scope(5): - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=[] - ) - manager = StreamManager(fake_client) - actual_result_future = manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) # Wait for the manager to submit a request. If request submission runs after stop(), # it will start the manager again and the test will block waiting for a response. - await duet.sleep(1) + await fake_client.wait_for_requests() manager.stop() with pytest.raises(concurrent.futures.CancelledError): @@ -609,28 +643,24 @@ async def test(): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_stop_then_send_expects_result_response(self, client_constructor): """New requests should work after stopping the manager.""" + expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') + fake_client, manager = setup(client_constructor) async def test(): async with duet.timeout_scope(5): - expected_result = quantum.QuantumResult( - parent='projects/proj/programs/prog/jobs/job0' - ) - mock_responses = [quantum.QuantumRunStreamResponse(result=expected_result)] - fake_client = setup_fake_quantum_run_stream_client( - client_constructor, responses_and_exceptions=mock_responses - ) - manager = StreamManager(fake_client) - manager.stop() - actual_result = await manager.submit( - REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB + actual_result_future = manager.submit( + REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0 ) + await fake_client.wait_for_requests() + await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result)) + actual_result = await actual_result_future manager.stop() assert actual_result == expected_result - assert len(fake_client.stream_requests) == 1 + assert len(fake_client.all_stream_requests) == 1 # assert that the first request is a CreateQuantumProgramAndJobRequest. - assert 'create_quantum_program_and_job' in fake_client.stream_requests[0] + assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0] duet.run(test)