diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3916acd --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,141 @@ +from impsy import dataset +from impsy import train +from impsy import utils +import os +import random +import pytest + + +## MDRNN configuration + + +@pytest.fixture(scope="session") +def dimension(): + return 8 + +@pytest.fixture(scope="session") +def mdrnn_size(): + return "xs" + +@pytest.fixture(scope="session") +def units(mdrnn_size): + return utils.mdrnn_config(mdrnn_size)['units'] + +@pytest.fixture(scope="session") +def mixtures(mdrnn_size): + return utils.mdrnn_config(mdrnn_size)['mixes'] + +@pytest.fixture(scope="session") +def layers(mdrnn_size): + return utils.mdrnn_config(mdrnn_size)['layers'] + + +## Training configuration + + +@pytest.fixture(scope="session") +def sequence_length(): + return 3 + + +@pytest.fixture(scope="session") +def batch_size(): + return 3 + + +## Locations + + +@pytest.fixture(scope="session") +def dataset_location(tmp_path_factory): + location = tmp_path_factory.mktemp("datasets") + return location + +@pytest.fixture(scope="session") +def log_location(tmp_path_factory): + location = tmp_path_factory.mktemp("logs") + return location + +@pytest.fixture(scope="session") +def models_location(tmp_path_factory): + location = tmp_path_factory.mktemp("models") + return location + + +## Generate sample log files, dataset files, and models. + + +@pytest.fixture(scope="session") +def log_files(log_location, dimension, number=20, events=49): + """Creates some test log files for dataset testing.""" + assert dimension > 1, "minimum dimension is 2" + print(f"dimension: {dimension}") + test_log_files = [] + for i in range(number): + test_log_file = log_location / f"2024-06-{i:02d}T12-00-00-{dimension}d-mdrnn.log" + num_events = random.randint(events, events + 10) # generate a random number of events in a range of 10. + with open(test_log_file, "w") as file: + for j in range(num_events): + nums = [random.random() for _ in range(dimension - 1)] + num_string = ",".join(map(str, nums)) + test_line = f"2024-06-01T12:00:{j:02d},interface,{num_string}\n" + file.write(test_line) + test_log_files.append(test_log_file) + return test_log_files + + +@pytest.fixture(scope="session") +def dataset_file(log_location, dataset_location, dimension, log_files): + dataset_filename = dataset.generate_dataset( + dimension=dimension, source=log_location, destination=dataset_location + ) + dataset_full_path = dataset_location / dataset_filename + return dataset_full_path + + +@pytest.fixture(scope="session") +def sequence_slices(sequence_length, dimension, batch_size): + x_t_log = utils.generate_data( + samples=((sequence_length + 1) * batch_size), dimension=dimension + ) + slices = train.slice_sequence_examples(x_t_log, sequence_length + 1, step_size=1) + return slices + + +@pytest.fixture(scope="session") +def trained_model(dimension, dataset_file, dataset_location, models_location, mdrnn_size): + assert os.path.isfile(dataset_file) + batch_size = 1 + epochs = 1 + + # Train using that dataset + train_output = train.train_mdrnn( + dimension=dimension, + dataset_location=dataset_location, + model_size=mdrnn_size, + early_stopping=False, + patience=10, + num_epochs=epochs, + batch_size=batch_size, + save_location=models_location, + save_model=True, + save_weights=True, + save_tflite=True, + ) + return train_output + + +@pytest.fixture(scope="session") +def tflite_file(trained_model): + return trained_model['tflite_file'] + + +@pytest.fixture(scope="session") +def weights_file(trained_model): + return trained_model['weights_file'] + + +@pytest.fixture(scope="session") +def keras_file(trained_model): + return trained_model['keras_file'] + diff --git a/tests/test_data.py b/tests/test_data.py index 7b46d4f..8102ccf 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,82 +1,33 @@ from impsy import dataset from impsy import train -from impsy import tflite_converter -from impsy import utils -from impsy import mdrnn import numpy as np import os -import random import tensorflow as tf -import pytest -from pathlib import Path -## MDRNN configuration +## Test logs, data and training. -@pytest.fixture(scope="session") -def dimension(): - return 8 -@pytest.fixture(scope="session") -def mdrnn_size(): - return "xs" +def test_data_munging(sequence_length, batch_size, dimension, sequence_slices): + """Test the data munging functions""" -@pytest.fixture(scope="session") -def units(mdrnn_size): - return utils.mdrnn_config(mdrnn_size)['units'] + # overlapping + Xs, ys = train.seq_to_overlapping_format(sequence_slices) + assert len(Xs) == len(ys) + assert len(Xs[0]) == sequence_length + assert len(ys[0]) == sequence_length -@pytest.fixture(scope="session") -def mixtures(mdrnn_size): - return utils.mdrnn_config(mdrnn_size)['mixes'] + print("Xs:", len(Xs[0])) + print("ys:", len(ys[0])) -@pytest.fixture(scope="session") -def layers(mdrnn_size): - return utils.mdrnn_config(mdrnn_size)['layers'] + # singleton + X, y = train.seq_to_singleton_format(sequence_slices) + print("X:", len(X[0])) + print("y:", len(y[0])) -## Locations - -@pytest.fixture(scope="session") -def dataset_location(tmp_path_factory): - location = tmp_path_factory.mktemp("datasets") - return location - -@pytest.fixture(scope="session") -def log_location(tmp_path_factory): - location = tmp_path_factory.mktemp("logs") - return location - -@pytest.fixture(scope="session") -def models_location(tmp_path_factory): - location = tmp_path_factory.mktemp("models") - return location - - -@pytest.fixture(scope="session") -def log_files(log_location, dimension, number=20, events=49): - """Creates some test log files for dataset testing.""" - assert dimension > 1, "minimum dimension is 2" - print(f"dimension: {dimension}") - test_log_files = [] - for i in range(number): - test_log_file = log_location / f"2024-06-{i:02d}T12-00-00-{dimension}d-mdrnn.log" - num_events = random.randint(events, events + 10) # generate a random number of events in a range of 10. - with open(test_log_file, "w") as file: - for j in range(num_events): - nums = [random.random() for _ in range(dimension - 1)] - num_string = ",".join(map(str, nums)) - test_line = f"2024-06-01T12:00:{j:02d},interface,{num_string}\n" - file.write(test_line) - test_log_files.append(test_log_file) - return test_log_files - - -@pytest.fixture(scope="session") -def dataset_file(log_location, dataset_location, dimension, log_files): - dataset_filename = dataset.generate_dataset( - dimension=dimension, source=log_location, destination=dataset_location - ) - dataset_full_path = dataset_location / dataset_filename - return dataset_full_path + assert len(X) == len(y) + assert len(X[0]) == sequence_length + assert len(y[0]) == dimension def test_log_to_examples(dimension, log_files): @@ -94,129 +45,9 @@ def test_dataset_command(dataset_location, dataset_file): print("Num touches:", np.sum([len(l) for l in corpus])) -@pytest.fixture(scope="session") -def trained_model(dimension, dataset_file, dataset_location, models_location, mdrnn_size): - assert os.path.isfile(dataset_file) - batch_size = 1 - epochs = 1 - - # Train using that dataset - train_output = train.train_mdrnn( - dimension=dimension, - dataset_location=dataset_location, - model_size=mdrnn_size, - early_stopping=False, - patience=10, - num_epochs=epochs, - batch_size=batch_size, - save_location=models_location, - save_model=True, - save_weights=True, - save_tflite=True, - ) - return train_output - - def test_train_command(trained_model): """Test that a trained model can be constructed.""" assert os.path.isfile(trained_model["weights_file"]) assert os.path.isfile(trained_model["keras_file"]) assert os.path.isfile(trained_model["tflite_file"]) assert isinstance(trained_model["history"], tf.keras.callbacks.History) - - -### tflite conversion tests - - -def test_config_to_tflite(models_location): - test_config = "configs/default.toml" - tflite_file = tflite_converter.config_to_tflite(test_config, save_path=models_location) - assert os.path.exists(tflite_file) - - -def test_weights_to_model_file(trained_model, dimension, mdrnn_size): - weights_file = trained_model["weights_file"] - print(f"Weights file: {weights_file}") - tflite_file = tflite_converter.weights_file_to_model_file(weights_file, mdrnn_size, dimension) - print(f"File returned: {tflite_file}") - assert os.path.exists(tflite_file) - - -def test_model_file_to_tflite(trained_model): - model_filename = trained_model["keras_file"] - tflite_file = tflite_converter.model_file_to_tflite(model_filename) - assert os.path.exists(tflite_file) - - -### inference model tests. - - -@pytest.fixture(scope="session") -def tflite_file(trained_model): - return trained_model['tflite_file'] - -@pytest.fixture(scope="session") -def weights_file(trained_model): - return trained_model['weights_file'] - -@pytest.fixture(scope="session") -def keras_file(trained_model): - return trained_model['keras_file'] - -@pytest.fixture(scope="session") -def tflite_model(tflite_file, dimension, units, mixtures, layers): - model = mdrnn.TfliteMDRNN(tflite_file, dimension, units, mixtures, layers) - return model - -def test_tflite_predictions(tflite_model: mdrnn.TfliteMDRNN): - """Test inference from a TfliteMDRNN model""" - num_test_steps = 5 - dimension = tflite_model.dimension - value = mdrnn.random_sample(out_dim=dimension) - for i in range(num_test_steps): - value = tflite_model.generate(value) - assert len(value) == dimension - value = mdrnn.proc_generated_touch(value, dimension) - assert len(value) == dimension - -@pytest.fixture(scope="session") -def keras_model(keras_file, dimension, units, mixtures, layers): - model = mdrnn.KerasMDRNN(keras_file, dimension, units, mixtures, layers) - return model - -def test_keras_predictions(keras_model: mdrnn.KerasMDRNN): - """Test inference from a KerasMDRNN model""" - num_test_steps = 5 - dimension = keras_model.dimension - value = mdrnn.random_sample(out_dim=dimension) - for i in range(num_test_steps): - value = keras_model.generate(value) - assert len(value) == dimension - value = mdrnn.proc_generated_touch(value, dimension) - assert len(value) == dimension - -@pytest.fixture(scope="session") -def weights_model(weights_file, dimension, units, mixtures, layers): - assert weights_file.suffix == ".h5", "has to be an .h5 weights" - model = mdrnn.KerasMDRNN(weights_file, dimension, units, mixtures, layers) - return model - -def test_weights_predictions(weights_model: mdrnn.KerasMDRNN): - """Test inference from a KerasMDRNN model""" - num_test_steps = 5 - dimension = weights_model.dimension - value = mdrnn.random_sample(out_dim=dimension) - for i in range(num_test_steps): - value = weights_model.generate(value) - assert len(value) == dimension - value = mdrnn.proc_generated_touch(value, dimension) - assert len(value) == dimension - - -def test_dummy_model(): - model = mdrnn.DummyMDRNN(Path("/"), 4, 64, 5, 2) - dimension = model.dimension - value = mdrnn.random_sample(out_dim=dimension) - value = model.generate(value) - assert len(value) == dimension - diff --git a/tests/test_interaction.py b/tests/test_interaction.py index 9e5c4ea..a9f2d8e 100644 --- a/tests/test_interaction.py +++ b/tests/test_interaction.py @@ -2,10 +2,10 @@ from impsy import utils import pytest from pathlib import Path -import time import numpy as np import logging + @pytest.fixture(scope="session") def default_config(): """get the default config file.""" @@ -13,6 +13,7 @@ def default_config(): config = utils.get_config_data(config_path) return(config) + @pytest.fixture(scope="session") def user_only_untrained_config(): """get a config file without a neural network and in user-only mode.""" @@ -22,17 +23,13 @@ def user_only_untrained_config(): @pytest.fixture(scope="session") -def dimension(default_config): +def default_dimension(default_config): return default_config["model"]["dimension"] -@pytest.fixture(scope="session") -def log_location(tmp_path_factory): - location = tmp_path_factory.mktemp("logs") - return location @pytest.fixture(scope="session") -def logger(dimension, log_location): - logger = interaction.setup_logging(dimension, location=log_location) +def logger(default_dimension, log_location): + logger = interaction.setup_logging(default_dimension, location=log_location) return logger @@ -45,12 +42,12 @@ def test_logging(logger, dimension): @pytest.fixture(scope="session") -def neural_network(default_config): +def default_neural_network(default_config): net = interaction.build_network(default_config) return net -def test_build_network(neural_network): +def test_build_network(default_neural_network): pass @@ -73,19 +70,19 @@ def test_monitor_user_action(interaction_server): interaction_server.monitor_user_action() -def test_make_prediction(interaction_server, neural_network): - interaction_server.make_prediction(neural_network) +def test_make_prediction(interaction_server, default_neural_network): + interaction_server.make_prediction(default_neural_network) def test_input_list(interaction_server): interaction_server.construct_input_list(0,0.0) -def test_dense_callback(interaction_server, dimension): - values = np.random.rand(dimension - 1) +def test_dense_callback(interaction_server, default_dimension): + values = np.random.rand(default_dimension - 1) interaction_server.dense_callback(values) -def test_send_values(interaction_server, dimension): - values = np.random.rand(dimension - 1) +def test_send_values(interaction_server, default_dimension): + values = np.random.rand(default_dimension - 1) interaction_server.send_back_values(values) diff --git a/tests/test_model.py b/tests/test_model.py index 03499bf..9dfff74 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -6,25 +6,8 @@ from pathlib import Path -@pytest.fixture(scope="session") -def dimension(): - return 3 - -@pytest.fixture(scope="session") -def sequence_length(): - return 3 - -@pytest.fixture(scope="session") -def batch_size(): - return 3 +## PredictiveMusicMDRNN testing. -@pytest.fixture(scope="session") -def sequence_slices(sequence_length, dimension, batch_size): - x_t_log = utils.generate_data( - samples=((sequence_length + 1) * batch_size), dimension=dimension - ) - slices = train.slice_sequence_examples(x_t_log, sequence_length + 1, step_size=1) - return slices def test_inference(): """Test inference from a PredictiveMusicMDRNN model""" @@ -55,30 +38,68 @@ def test_training(sequence_length, batch_size, dimension, sequence_slices): assert isinstance(history, tf.keras.callbacks.History) -def test_data_munging(sequence_length, batch_size, dimension, sequence_slices): - """Test the data munging functions""" +def test_model_config(): + """Tests the model config function.""" + conf = utils.mdrnn_config('s') + assert conf["units"] == utils.SIZE_TO_PARAMETERS['s']['units'] - # overlapping - Xs, ys = train.seq_to_overlapping_format(sequence_slices) - assert len(Xs) == len(ys) - assert len(Xs[0]) == sequence_length - assert len(ys[0]) == sequence_length - print("Xs:", len(Xs[0])) - print("ys:", len(ys[0])) +### inference model tests. - # singleton - X, y = train.seq_to_singleton_format(sequence_slices) - print("X:", len(X[0])) - print("y:", len(y[0])) - assert len(X) == len(y) - assert len(X[0]) == sequence_length - assert len(y[0]) == dimension +@pytest.fixture(scope="session") +def tflite_model(tflite_file, dimension, units, mixtures, layers): + model = mdrnn.TfliteMDRNN(tflite_file, dimension, units, mixtures, layers) + return model +def test_tflite_predictions(tflite_model: mdrnn.TfliteMDRNN): + """Test inference from a TfliteMDRNN model""" + num_test_steps = 5 + dimension = tflite_model.dimension + value = mdrnn.random_sample(out_dim=dimension) + for i in range(num_test_steps): + value = tflite_model.generate(value) + assert len(value) == dimension + value = mdrnn.proc_generated_touch(value, dimension) + assert len(value) == dimension -def test_model_config(): - """Tests the model config function.""" - conf = utils.mdrnn_config("s") - assert conf["units"] == 64 +@pytest.fixture(scope="session") +def keras_model(keras_file, dimension, units, mixtures, layers): + model = mdrnn.KerasMDRNN(keras_file, dimension, units, mixtures, layers) + return model + +def test_keras_predictions(keras_model: mdrnn.KerasMDRNN): + """Test inference from a KerasMDRNN model""" + num_test_steps = 5 + dimension = keras_model.dimension + value = mdrnn.random_sample(out_dim=dimension) + for i in range(num_test_steps): + value = keras_model.generate(value) + assert len(value) == dimension + value = mdrnn.proc_generated_touch(value, dimension) + assert len(value) == dimension + +@pytest.fixture(scope="session") +def weights_model(weights_file, dimension, units, mixtures, layers): + assert weights_file.suffix == ".h5", "has to be an .h5 weights" + model = mdrnn.KerasMDRNN(weights_file, dimension, units, mixtures, layers) + return model +def test_weights_predictions(weights_model: mdrnn.KerasMDRNN): + """Test inference from a KerasMDRNN model""" + num_test_steps = 5 + dimension = weights_model.dimension + value = mdrnn.random_sample(out_dim=dimension) + for i in range(num_test_steps): + value = weights_model.generate(value) + assert len(value) == dimension + value = mdrnn.proc_generated_touch(value, dimension) + assert len(value) == dimension + + +def test_dummy_model(): + model = mdrnn.DummyMDRNN(Path("/"), 4, 64, 5, 2) + dimension = model.dimension + value = mdrnn.random_sample(out_dim=dimension) + value = model.generate(value) + assert len(value) == dimension diff --git a/tests/test_tflite_conversion.py b/tests/test_tflite_conversion.py new file mode 100644 index 0000000..bbf9f41 --- /dev/null +++ b/tests/test_tflite_conversion.py @@ -0,0 +1,25 @@ +from impsy import tflite_converter +import os + + +### tflite conversion tests + + +def test_config_to_tflite(models_location): + test_config = "configs/default.toml" + tflite_file = tflite_converter.config_to_tflite(test_config, save_path=models_location) + assert os.path.exists(tflite_file) + + +def test_weights_to_model_file(trained_model, dimension, mdrnn_size): + weights_file = trained_model["weights_file"] + print(f"Weights file: {weights_file}") + tflite_file = tflite_converter.weights_file_to_model_file(weights_file, mdrnn_size, dimension) + print(f"File returned: {tflite_file}") + assert os.path.exists(tflite_file) + + +def test_model_file_to_tflite(trained_model): + model_filename = trained_model["keras_file"] + tflite_file = tflite_converter.model_file_to_tflite(model_filename) + assert os.path.exists(tflite_file) \ No newline at end of file