From 847e08b4a8c7c0b35c47f491d22e30a3fded0591 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 14 Jul 2023 08:50:01 -0700 Subject: [PATCH 1/5] Mock dask_cuda.LocalCUDACluster instead of dask.distributed.LocalCluster --- tests/test_downloader.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/tests/test_downloader.py b/tests/test_downloader.py index bf600b5938..dc82588848 100644 --- a/tests/test_downloader.py +++ b/tests/test_downloader.py @@ -37,6 +37,14 @@ def dask_distributed(fail_missing: bool): fail_missing=fail_missing) +@pytest.fixture(autouse=True, scope='session') +def dask_cuda(fail_missing: bool): + """ + Mark tests requiring dask.distributed + """ + yield import_or_skip("dask_cuda", reason="Downloader requires dask_cuda", fail_missing=fail_missing) + + @pytest.mark.usefixtures("restore_environ") @pytest.mark.parametrize('use_env', [True, False]) @pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"]) @@ -83,7 +91,7 @@ def test_constructor_invalid_dltype(use_env: bool): @pytest.mark.usefixtures("restore_environ") @pytest.mark.parametrize('dl_method,use_processes', [("dask", True), ("dask_thread", False)]) @mock.patch('dask.config') -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMock, dl_method: str, @@ -93,11 +101,11 @@ def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, assert downloader.get_dask_cluster() is mock_dask_cluster mock_dask_config.set.assert_called_once() - mock_dask_cluster.assert_called_once_with(start=True, processes=use_processes) + mock_dask_cluster.assert_called_once_with() @mock.patch('dask.config') -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') @pytest.mark.parametrize('dl_method', ["dask", "dask_thread"]) def test_close(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMock, dl_method: str): mock_dask_cluster.return_value = mock_dask_cluster @@ -111,7 +119,7 @@ def test_close(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMo mock_dask_cluster.close.assert_called_once() -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') @pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing"]) def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str): mock_dask_cluster.return_value = mock_dask_cluster @@ -129,7 +137,7 @@ def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str): @mock.patch('multiprocessing.get_context') @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') def test_download(mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, mock_dask_config: mock.MagicMock, From 6908ef209e4cee1e9a5bc2decedc13beb3251459 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 14 Jul 2023 08:55:27 -0700 Subject: [PATCH 2/5] Mock dask_cuda.LocalCUDACluster instead of dask.distributed.LocalCluster --- .../examples/digital_fingerprinting/test_dfp_file_to_df.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py index caa7ed9c14..a28b15164f 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py +++ b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py @@ -102,7 +102,7 @@ def test_constructor(config: Config): @mock.patch('multiprocessing.get_context') @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') @mock.patch('dfp.stages.dfp_file_to_df._single_object_to_dataframe') def test_get_or_create_dataframe_from_s3_batch_cache_miss(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, @@ -198,7 +198,7 @@ def test_get_or_create_dataframe_from_s3_batch_cache_miss(mock_obf_to_df: mock.M @mock.patch('multiprocessing.get_context') @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') @mock.patch('dfp.stages.dfp_file_to_df._single_object_to_dataframe') def test_get_or_create_dataframe_from_s3_batch_cache_hit(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, @@ -266,7 +266,7 @@ def test_get_or_create_dataframe_from_s3_batch_cache_hit(mock_obf_to_df: mock.Ma @mock.patch('multiprocessing.get_context') @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask.distributed.LocalCluster') +@mock.patch('dask_cuda.LocalCUDACluster') @mock.patch('dfp.stages.dfp_file_to_df._single_object_to_dataframe') def test_get_or_create_dataframe_from_s3_batch_none_noop(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, From 393c7f613972abcdbee935702c152a45459798a6 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 14 Jul 2023 08:57:50 -0700 Subject: [PATCH 3/5] Skip DFP tests if dask_cuda is not installed --- tests/examples/digital_fingerprinting/conftest.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/examples/digital_fingerprinting/conftest.py b/tests/examples/digital_fingerprinting/conftest.py index f997e391a8..a6550740e0 100644 --- a/tests/examples/digital_fingerprinting/conftest.py +++ b/tests/examples/digital_fingerprinting/conftest.py @@ -35,6 +35,14 @@ def dask_distributed(fail_missing: bool): yield import_or_skip("dask.distributed", reason=SKIP_REASON, fail_missing=fail_missing) +@pytest.fixture(autouse=True, scope='session') +def dask_cuda(fail_missing: bool): + """ + Mark tests requiring dask.distributed + """ + yield import_or_skip("dask_cuda", reason=SKIP_REASON, fail_missing=fail_missing) + + @pytest.fixture(autouse=True, scope='session') def mlflow(fail_missing: bool): """ From c98c99df51f06657160b336b991d7ee0ebb9114c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 14 Jul 2023 09:03:58 -0700 Subject: [PATCH 4/5] _get_ci_column_selector returns a list for IncrementColumn --- tests/utils/nvt/test_schema_converters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils/nvt/test_schema_converters.py b/tests/utils/nvt/test_schema_converters.py index 70e1a66be2..16f2107bc2 100644 --- a/tests/utils/nvt/test_schema_converters.py +++ b/tests/utils/nvt/test_schema_converters.py @@ -158,7 +158,7 @@ def test_get_ci_column_selector_increment_column(): dtype="datetime64[ns]", groupby_column="groupby_col") result = _get_ci_column_selector(col_info) - assert result == "original_name" + assert result == ["groupby_col", "original_name"] def test_get_ci_column_selector_string_cat_column(): From 4a8f60a87460782da05c3a436526f868e2433d75 Mon Sep 17 00:00:00 2001 From: Devin Robison Date: Fri, 14 Jul 2023 11:35:31 -0600 Subject: [PATCH 5/5] Auto column creation fix --- morpheus/utils/nvt/mutate.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/morpheus/utils/nvt/mutate.py b/morpheus/utils/nvt/mutate.py index f3f0b28f40..9903a2b908 100644 --- a/morpheus/utils/nvt/mutate.py +++ b/morpheus/utils/nvt/mutate.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np import typing from inspect import getsourcelines @@ -125,7 +126,19 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram Transformed dataframe. """ - return self._func(col_selector, df) + df = self._func(col_selector, df) + + # If our dataframe doesn't contain the expected output columns, even after processing, we add dummy columns. + # This could occur if our JSON data doesn't always contain columns we expect to be expanded. + df_cols_set = set(df.columns) + new_cols = { + col[0]: np.zeros(df.shape[0], dtype=col[1]) + for col in self._output_columns if col[0] not in df_cols_set + } + + df = df.assign(**new_cols) + + return df def column_mapping(self, col_selector: ColumnSelector) -> typing.Dict[str, str]: """