From 86be3282334347d29220d849d72dca5238c02920 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Tue, 17 Sep 2024 14:44:27 -0700 Subject: [PATCH] fix: Tensorboard - Fix error in tensorboard batch upload of nested dirs PiperOrigin-RevId: 675716748 --- .../cloud/aiplatform/tensorboard/uploader.py | 1 + tests/unit/aiplatform/test_uploader.py | 89 ++++++++++++++++--- 2 files changed, 78 insertions(+), 12 deletions(-) diff --git a/google/cloud/aiplatform/tensorboard/uploader.py b/google/cloud/aiplatform/tensorboard/uploader.py index c24455ae5e..b77a176004 100644 --- a/google/cloud/aiplatform/tensorboard/uploader.py +++ b/google/cloud/aiplatform/tensorboard/uploader.py @@ -386,6 +386,7 @@ def _pre_create_runs_and_time_series(self): if (run_name and run_name != ".") else uploader_utils.DEFAULT_RUN_NAME ) + run_name = uploader_utils.reformat_run_name(run_name) run_names.append(run_name) for event in events: _filter_graph_defs(event) diff --git a/tests/unit/aiplatform/test_uploader.py b/tests/unit/aiplatform/test_uploader.py index 46e3141ac6..1b145eab0c 100644 --- a/tests/unit/aiplatform/test_uploader.py +++ b/tests/unit/aiplatform/test_uploader.py @@ -591,6 +591,11 @@ def test_start_uploading_without_create_experiment_fails(self): with self.assertRaisesRegex(RuntimeError, "call create_experiment()"): uploader.start_uploading() + @parameterized.parameters( + {"nested_run_dir": ""}, + {"nested_run_dir": "nested-dir/"}, + {"nested_run_dir": "double/nested-dir/"}, + ) @patch.object( uploader_utils.OnePlatformResourceManager, "get_run_resource_name", @@ -599,7 +604,11 @@ def test_start_uploading_without_create_experiment_fails(self): @patch.object(metadata, "_experiment_tracker", autospec=True) @patch.object(experiment_resources, "Experiment", autospec=True) def test_start_uploading_scalars( - self, experiment_resources_mock, experiment_tracker_mock, run_resource_mock + self, + experiment_resources_mock, + experiment_tracker_mock, + run_resource_mock, + nested_run_dir, ): experiment_resources_mock.get.return_value = _TEST_EXPERIMENT_NAME experiment_tracker_mock.set_experiment.return_value = _TEST_EXPERIMENT_NAME @@ -628,21 +637,21 @@ def test_start_uploading_scalars( mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ { - "run 1": _apply_compat( + f"{nested_run_dir}run 1": _apply_compat( [_scalar_event("1.1", 5.0), _scalar_event("1.2", 5.0)] ), - "run 2": _apply_compat( + f"{nested_run_dir}run 2": _apply_compat( [_scalar_event("2.1", 5.0), _scalar_event("2.2", 5.0)] ), }, { - "run 3": _apply_compat( + f"{nested_run_dir}run 3": _apply_compat( [_scalar_event("3.1", 5.0), _scalar_event("3.2", 5.0)] ), - "run 4": _apply_compat( + f"{nested_run_dir}run 4": _apply_compat( [_scalar_event("4.1", 5.0), _scalar_event("4.2", 5.0)] ), - "run 5": _apply_compat( + f"{nested_run_dir}run 5": _apply_compat( [_scalar_event("5.1", 5.0), _scalar_event("5.2", 5.0)] ), }, @@ -666,11 +675,20 @@ def test_start_uploading_scalars( self.assertEqual(mock_tracker.blob_tracker.call_count, 0) @parameterized.parameters( - {"existing_experiment": None, "one_platform_run_name": None}, - {"existing_experiment": None, "one_platform_run_name": "."}, + { + "existing_experiment": None, + "one_platform_run_name": None, + "nested_run_dir": "", + }, + { + "existing_experiment": None, + "one_platform_run_name": ".", + "nested_run_dir": "nested-dir/", + }, { "existing_experiment": _TEST_EXPERIMENT_NAME, "one_platform_run_name": _TEST_ONE_PLATFORM_RUN_NAME, + "nested_run_dir": "double/nested-dir/", }, ) @patch.object( @@ -693,6 +711,7 @@ def test_start_uploading_scalars_one_shot( run_resource_mock, existing_experiment, one_platform_run_name, + nested_run_dir, ): """Check that one-shot uploading stops without AbortUploadError.""" @@ -760,10 +779,10 @@ def batch_create_time_series(parent, requests): mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ { - "run 1": _apply_compat( + f"{nested_run_dir}run 1": _apply_compat( [_scalar_event("tag_1.1", 5.0), _scalar_event("tag_1.2", 5.0)] ), - "run 2": _apply_compat( + f"{nested_run_dir}run 2": _apply_compat( [_scalar_event("tag_2.1", 5.0), _scalar_event("tag_2.2", 5.0)] ), }, @@ -772,10 +791,10 @@ def batch_create_time_series(parent, requests): mock_logdir_loader_pre_create = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader_pre_create.get_run_events.side_effect = [ { - "run 1": _apply_compat( + f"{nested_run_dir}run 1": _apply_compat( [_scalar_event("tag_1.1", 5.0), _scalar_event("tag_1.2", 5.0)] ), - "run 2": _apply_compat( + f"{nested_run_dir}run 2": _apply_compat( [_scalar_event("tag_2.1", 5.0), _scalar_event("tag_2.2", 5.0)] ), }, @@ -804,6 +823,52 @@ def batch_create_time_series(parent, requests): self.assertEqual(mock_tracker.blob_tracker.call_count, 0) experiment_tracker_mock.set_experiment.assert_called_once() + @parameterized.parameters( + {"nested_run_dir": ""}, + {"nested_run_dir": "nested-dir/"}, + {"nested_run_dir": "double/nested-dir/"}, + ) + @patch.object(metadata, "_experiment_tracker", autospec=True) + @patch.object(experiment_resources, "Experiment", autospec=True) + def test_upload_nested_scalars_one_shot( + self, + experiment_resources_mock, + experiment_tracker_mock, + nested_run_dir, + ): + """Check that one-shot uploading stops without AbortUploadError.""" + + logdir = self.get_temp_dir() + uploader = _create_uploader( + logdir=logdir, + ) + uploader.create_experiment() + + run_1 = f"{nested_run_dir}run 1" + run_2 = f"{nested_run_dir}run 2" + + mock_dispatcher = mock.create_autospec(uploader_lib._Dispatcher) + uploader._dispatcher = mock_dispatcher + mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) + mock_logdir_loader.get_run_events.side_effect = [ + { + run_1: _apply_compat( + [_scalar_event("tag_1.1", 5.0), _scalar_event("tag_1.2", 5.0)] + ), + run_2: _apply_compat( + [_scalar_event("tag_2.1", 5.0), _scalar_event("tag_2.2", 5.0)] + ), + }, + ] + with mock.patch.object(uploader, "_logdir_loader", mock_logdir_loader): + uploader._upload_once() + + self.assertEqual(1, mock_logdir_loader.get_run_events.call_count) + self.assertEqual(1, mock_dispatcher.dispatch_requests.call_count) + run_to_events = mock_dispatcher.dispatch_requests.call_args[0][0] + self.assertIn(run_1, run_to_events) + self.assertIn(run_2, run_to_events) + @patch.object(metadata, "_experiment_tracker", autospec=True) @patch.object(experiment_resources, "Experiment", autospec=True) def test_upload_empty_logdir(