From dd410dbfa9e9e569459d1702a07b0f52e9d68c89 Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Mon, 8 Jul 2024 11:31:17 +0100 Subject: [PATCH 1/9] temp patch for new questionnaire format --- radarpipeline/io/reader.py | 14 +++++++++++--- radarpipeline/io/writer.py | 3 +++ radarpipeline/project/project.py | 15 +++++++++------ 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/radarpipeline/io/reader.py b/radarpipeline/io/reader.py index 9800c3a..ddf938a 100644 --- a/radarpipeline/io/reader.py +++ b/radarpipeline/io/reader.py @@ -127,6 +127,9 @@ def __init__(self, spark_session: ps.SparkSession, # RADAR_NEW: uid/variable/yyyymm/yyyymmdd.csv.gz "RADAR_NEW": re.compile(r"""^[\w-]+/([\w]+)/ [\d]+/([\d]+.csv.gz$|schema-\1.json$)""", re.X), + # RADAR_OLD: uid/questionnaire/QuestionaireName/yyyymm/yyyymmdd.csv.gz + "RADAR_QUES": re.compile(r"""^[\w-]+/([\w]+)/([\w]+)/ + [\d]+/([\d]+.csv.gz$|schema-\1.json$)""", re.X) } self.required_data = required_data self.df_type = df_type @@ -150,7 +153,7 @@ def _get_source_type(self, source_path): file_format = file.replace(source_path, "") if re.match(value, file_format): return key - raise ValueError("Source type not recognized") + raise ValueError("Source path not recognized") def read_data(self) -> RadarData: """ @@ -172,7 +175,7 @@ def read_data(self) -> RadarData: logger.info("Reading data from old RADAR format") radar_data, user_data_dict = self._read_data_from_old_format( source_path_item, user_data_dict) - elif source_type == "RADAR_NEW": + elif source_type == "RADAR_NEW" or source_type == "RADAR_QUES": logger.info("Reading data from new RADAR format") radar_data, user_data_dict = self._read_data_from_new_format( source_path_item, user_data_dict) @@ -313,7 +316,7 @@ def _read_data_from_new_format(self, source_path: str, user_data_dict: dict): logger.info(f"Reading data for user: {uid}") variable_data_dict = {} for dirname in self.required_data: - if dirname not in os.listdir(os.path.join(source_path, uid)): + if not os.path.exists(os.path.join(source_path, uid, dirname)): continue logger.info(f"Reading data for variable: {dirname}") data_files = [] @@ -369,6 +372,8 @@ def is_schema_present(self, schema_dir, schema_dir_base) -> bool: bool True if schema is present, False otherwise """ + if "/" in schema_dir_base: + schema_dir_base = schema_dir_base.split("/")[0] schema_file = os.path.join( schema_dir, f"schema-{schema_dir_base}.json" ) @@ -378,6 +383,9 @@ def is_schema_present(self, schema_dir, schema_dir_base) -> bool: return False def get_schema(self, schema_dir, schema_dir_base) -> StructType: + + if "/" in schema_dir_base: + schema_dir_base = schema_dir_base.split("/")[0] if schema_dir_base in self.schema_dict: return self.schema_dict[schema_dir_base] else: diff --git a/radarpipeline/io/writer.py b/radarpipeline/io/writer.py index f3cdf4b..ecb1d23 100644 --- a/radarpipeline/io/writer.py +++ b/radarpipeline/io/writer.py @@ -9,6 +9,8 @@ from radarpipeline.datatypes.data_types import DataType from radarpipeline.io.abc import DataWriter +from pathlib import Path + logger = logging.getLogger(__name__) @@ -110,6 +112,7 @@ def save_dataframe(self, feature_name, feature_df): self.data_format, self.compression) try: + Path(file_path).parent.mkdir(parents=True, exist_ok=True) if self.data_format == "csv": feature_df.to_csv( file_path, diff --git a/radarpipeline/project/project.py b/radarpipeline/project/project.py index 00e79d9..9dc7a2a 100644 --- a/radarpipeline/project/project.py +++ b/radarpipeline/project/project.py @@ -269,16 +269,19 @@ def fetch_data(self) -> None: root_dir = sftp_data_reader.get_root_dir() logger.info("Reading data from sftp") sftp_data_reader.read_sftp_data() - sftp_local_config = { - "config": { - "source_path": root_dir - } + self.config["input"] = { + "source_type": "local", + "data_format": "csv", + "config": { + "source_path": root_dir + }, } + datareader = Reader( self.spark_session, - sftp_local_config, + self.config, self.total_required_data, - self.config["configurations"]["df_type"], + self.config["configurations"]['df_type'], ) self.data = datareader.read_data() else: From 59a8ba389588f7d15f1f9296bceca5ae0b753e62 Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Tue, 16 Jul 2024 16:38:24 +0100 Subject: [PATCH 2/9] parallise for loop in read data --- radarpipeline/io/reader.py | 15 ++++++++++++--- radarpipeline/project/project.py | 8 ++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/radarpipeline/io/reader.py b/radarpipeline/io/reader.py index ddf938a..f3065b5 100644 --- a/radarpipeline/io/reader.py +++ b/radarpipeline/io/reader.py @@ -5,6 +5,7 @@ import gzip import re from typing import Any, Dict, List, Optional, Union +import concurrent.futures import pyspark.sql as ps from pyspark.sql import SparkSession, DataFrame @@ -272,7 +273,8 @@ def _read_data_from_old_format(self, source_path: str, user_data_dict: dict): uids = self._remove_hidden_dirs(uids) if self.user_sampler is not None: uids = self.user_sampler.sample_uids(uids) - for uid in uids: + + def process_uid(uid): logger.info(f"Reading data for user: {uid}") variable_data_dict = {} for dirname in self.required_data: @@ -300,6 +302,9 @@ def _read_data_from_old_format(self, source_path: str, user_data_dict: dict): if variable_data.get_data_size() > 0: variable_data_dict[dirname] = variable_data user_data_dict[uid] = RadarUserData(variable_data_dict, self.df_type) + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(process_uid, uids) radar_data = RadarData(user_data_dict, self.df_type) return radar_data, user_data_dict @@ -309,10 +314,11 @@ def _read_data_from_new_format(self, source_path: str, user_data_dict: dict): uids = self._remove_hidden_dirs(uids) if self.user_sampler is not None: uids = self.user_sampler.sample_uids(uids) - for uid in uids: + + def process_uid(uid): # Skip hidden files if uid[0] == ".": - continue + return logger.info(f"Reading data for user: {uid}") variable_data_dict = {} for dirname in self.required_data: @@ -344,6 +350,9 @@ def _read_data_from_new_format(self, source_path: str, user_data_dict: dict): if variable_data.get_data_size() > 0: variable_data_dict[dirname] = variable_data user_data_dict[uid] = RadarUserData(variable_data_dict, self.df_type) + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(process_uid, uids) radar_data = RadarData(user_data_dict, self.df_type) return radar_data, user_data_dict diff --git a/radarpipeline/project/project.py b/radarpipeline/project/project.py index 9dc7a2a..8500343 100644 --- a/radarpipeline/project/project.py +++ b/radarpipeline/project/project.py @@ -270,10 +270,10 @@ def fetch_data(self) -> None: logger.info("Reading data from sftp") sftp_data_reader.read_sftp_data() self.config["input"] = { - "source_type": "local", - "data_format": "csv", - "config": { - "source_path": root_dir + "source_type": "local", + "data_format": "csv", + "config": { + "source_path": root_dir }, } From 8060aaeaf0add20731e4220457375ad5a61d29fd Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Wed, 17 Jul 2024 11:52:41 +0100 Subject: [PATCH 3/9] updated spark output --- radarpipeline/io/writer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/radarpipeline/io/writer.py b/radarpipeline/io/writer.py index ecb1d23..1bc6077 100644 --- a/radarpipeline/io/writer.py +++ b/radarpipeline/io/writer.py @@ -27,12 +27,10 @@ def __init__( self, features: Dict[str, DataType], output_dir: str, - num_files: Optional[int] = 1, compress: bool = False, data_format: str = "csv", ) -> None: super().__init__(features, output_dir) - self.num_files = num_files self.compression = "gzip" if compress is True else "none" self.data_format = data_format @@ -59,7 +57,7 @@ def save_dataframe(self, feature_name, feature_df): ) elif self.data_format == "parquet": feature_df.write.parquet( - path=folder_path, + path=folder_path + ".parquet", compression=self.compression, ) else: From 3c9f07876d183f6a30419cbd57d7314f359c378a Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Fri, 19 Jul 2024 12:37:00 +0100 Subject: [PATCH 4/9] updated code to input path format --- radarpipeline/io/reader.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/radarpipeline/io/reader.py b/radarpipeline/io/reader.py index f3065b5..c0e73a5 100644 --- a/radarpipeline/io/reader.py +++ b/radarpipeline/io/reader.py @@ -145,15 +145,14 @@ def _get_source_type(self, source_path): """ Returns the source type of the data """ - files = [y for x in os.walk(source_path) for y in - glob(os.path.join(x[0], '*.*'))] if source_path[-1] != "/": source_path = source_path + "/" - for key, value in self.source_formats.items(): - file = files[0] - file_format = file.replace(source_path, "") - if re.match(value, file_format): - return key + for x in os.walk(source_path): + for file in glob(os.path.join(x[0], '*.*')): + for key, value in self.source_formats.items(): + file_format = file.replace(source_path, "") + if re.match(value, file_format): + return key raise ValueError("Source path not recognized") def read_data(self) -> RadarData: From fdfa7761e307c02c86f68163ccacdafa6e217c9c Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Tue, 30 Jul 2024 14:32:26 +0100 Subject: [PATCH 5/9] added kyro serialization --- radarpipeline/io/writer.py | 9 ++++++++- radarpipeline/project/sparkengine.py | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/radarpipeline/io/writer.py b/radarpipeline/io/writer.py index 1bc6077..97a0809 100644 --- a/radarpipeline/io/writer.py +++ b/radarpipeline/io/writer.py @@ -31,8 +31,13 @@ def __init__( data_format: str = "csv", ) -> None: super().__init__(features, output_dir) - self.compression = "gzip" if compress is True else "none" self.data_format = data_format + if data_format == "csv" or data_format == "pickle": + self.compression = "gzip" if compress is True else "infer" + elif data_format == "parquet": + self.compression = "gzip" if compress is True else "snappy" + else: + logger.warning("Invalid data format specified. Using default compression") def write_data(self) -> None: for feature_name, feature_df in self.features.items(): @@ -56,6 +61,8 @@ def save_dataframe(self, feature_name, feature_df): lineSep=constants.LINESEP, ) elif self.data_format == "parquet": + logger.info(f"Total Number of Rdd Partitions:{ + feature_df.rdd.getNumPartitions()}") feature_df.write.parquet( path=folder_path + ".parquet", compression=self.compression, diff --git a/radarpipeline/project/sparkengine.py b/radarpipeline/project/sparkengine.py index 616a5fc..56810db 100644 --- a/radarpipeline/project/sparkengine.py +++ b/radarpipeline/project/sparkengine.py @@ -81,6 +81,8 @@ def initialize_spark_session(self) -> ps.SparkSession: self.spark_config['spark.driver.maxResultSize']) .config('spark.log.level', self.spark_config['spark.log.level']) + .config("spark.serializer", + "org.apache.spark.serializer.KryoSerializer") .getOrCreate() ) else: From 1c4a7c95839d0284329ece86680fe8869bb2bf07 Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Tue, 30 Jul 2024 15:04:47 +0100 Subject: [PATCH 6/9] Added recursive sftp dowload to get ques data --- radarpipeline/io/connection.py | 183 +++++++++++++++++++++++++++++++++ radarpipeline/io/downloader.py | 12 +-- radarpipeline/io/reader.py | 2 +- 3 files changed, 188 insertions(+), 9 deletions(-) diff --git a/radarpipeline/io/connection.py b/radarpipeline/io/connection.py index c35cd15..f466647 100644 --- a/radarpipeline/io/connection.py +++ b/radarpipeline/io/connection.py @@ -7,6 +7,7 @@ import logging import os from radarpipeline.common.utils import reparent +import posixpath from contextlib import contextmanager from stat import S_IMODE, S_ISDIR, S_ISREG @@ -15,6 +16,81 @@ logger = logging.getLogger(__name__) +class WTCallbacks(object): + '''an object to house the callbacks, used internally''' + def __init__(self): + '''set instance vars''' + self._flist = [] + self._dlist = [] + self._ulist = [] + + def file_cb(self, pathname): + '''called for regular files, appends pathname to .flist + + :param str pathname: file path + ''' + self._flist.append(pathname) + + def dir_cb(self, pathname): + '''called for directories, appends pathname to .dlist + + :param str pathname: directory path + ''' + self._dlist.append(pathname) + + def unk_cb(self, pathname): + '''called for unknown file types, appends pathname to .ulist + + :param str pathname: unknown entity path + ''' + self._ulist.append(pathname) + + @property + def flist(self): + '''return a sorted list of files currently traversed + + :getter: returns the list + :setter: sets the list + :type: list + ''' + return sorted(self._flist) + + @flist.setter + def flist(self, val): + '''setter for _flist ''' + self._flist = val + + @property + def dlist(self): + '''return a sorted list of directories currently traversed + + :getter: returns the list + :setter: sets the list + :type: list + ''' + return sorted(self._dlist) + + @dlist.setter + def dlist(self, val): + '''setter for _dlist ''' + self._dlist = val + + @property + def ulist(self): + '''return a sorted list of unknown entities currently traversed + + :getter: returns the list + :setter: sets the list + :type: list + ''' + return sorted(self._ulist) + + @ulist.setter + def ulist(self, val): + '''setter for _ulist ''' + self._ulist = val + + class ConnectionException(Exception): """Exception raised for connection problems @@ -238,6 +314,87 @@ def get_d(self, remotedir, localdir, preserve_mtime=False): self.get(rname, reparent(localdir, rname), preserve_mtime=preserve_mtime) + def get_r(self, remotedir, localdir, preserve_mtime=False): + """recursively copy remotedir structure to localdir + + :param str remotedir: the remote directory to copy from + :param str localdir: the local directory to copy to + :param bool preserve_mtime: *Default: False* - + preserve modification time on files + + :returns: None + + :raises: + + """ + self._sftp_connect() + wtcb = WTCallbacks() + self.walktree(remotedir, wtcb.file_cb, wtcb.dir_cb, wtcb.unk_cb) + # handle directories we recursed through + for dname in wtcb.dlist: + for subdir in path_advance(dname): + try: + os.mkdir(reparent(localdir, subdir)) + # force result to a list for setter, + wtcb.dlist = wtcb.dlist + [subdir, ] + except OSError: # dir exists + pass + + for fname in wtcb.flist: + # they may have told us to start down farther, so we may not have + # recursed through some, ensure local dir structure matches + head, _ = os.path.split(fname) + if head not in wtcb.dlist: + for subdir in path_advance(head): + if subdir not in wtcb.dlist and subdir != '.': + os.mkdir(reparent(localdir, subdir)) + wtcb.dlist = wtcb.dlist + [subdir, ] + + self.get(fname, + reparent(localdir, fname), + preserve_mtime=preserve_mtime) + + def walktree(self, remotepath, fcallback, dcallback, ucallback, + recurse=True): + '''recursively descend, depth first, the directory tree rooted at + remotepath, calling discreet callback functions for each regular file, + directory and unknown file type. + + :param str remotepath: + root of remote directory to descend, use '.' to start at + :attr:`.pwd` + :param callable fcallback: + callback function to invoke for a regular file. + (form: ``func(str)``) + :param callable dcallback: + callback function to invoke for a directory. (form: ``func(str)``) + :param callable ucallback: + callback function to invoke for an unknown file type. + (form: ``func(str)``) + :param bool recurse: *Default: True* - should it recurse + + :returns: None + + :raises: + + ''' + self._sftp_connect() + for entry in self.listdir(remotepath): + pathname = posixpath.join(remotepath, entry) + mode = self._sftp.stat(pathname).st_mode + if S_ISDIR(mode): + # It's a directory, call the dcallback function + dcallback(pathname) + if recurse: + # now, recurse into it + self.walktree(pathname, fcallback, dcallback, ucallback) + elif S_ISREG(mode): + # It's a file, call the fcallback function + fcallback(pathname) + else: + # Unknown file type + ucallback(pathname) + def close(self): """Closes the connection and cleans up.""" # Close SFTP Connection. @@ -248,3 +405,29 @@ def close(self): if self._transport: self._transport.close() self._transport = None + + +def path_advance(thepath, sep=os.sep): + '''generator to iterate over a file path forwards + + :param str thepath: the path to navigate forwards + :param str sep: *Default: os.sep* - the path separator to use + + :returns: (iter)able of strings + + ''' + # handle a direct path + pre = '' + if thepath[0] == sep: + pre = sep + curpath = '' + parts = thepath.split(sep) + if pre: + if parts[0]: + parts[0] = pre + parts[0] + else: + parts[1] = pre + parts[1] + for part in parts: + curpath = os.path.join(curpath, part) + if curpath: + yield curpath \ No newline at end of file diff --git a/radarpipeline/io/downloader.py b/radarpipeline/io/downloader.py index 6b8b220..1e3a7e1 100644 --- a/radarpipeline/io/downloader.py +++ b/radarpipeline/io/downloader.py @@ -89,16 +89,10 @@ def _fetch_data(self, root_path, sftp_source_path, included_var_cat, uid): root_path, dir_path, src_file ) ): - os.makedirs( - os.path.join( - root_path, dir_path, - src_file), - exist_ok=True) - sftp.get_d(src_file, + sftp.get_r(src_file, os.path.join( root_path, - dir_path, - src_file), + dir_path), preserve_mtime=True) except FileNotFoundError: logger.warning("Folder not found: " + dir_path @@ -116,6 +110,8 @@ def _is_src_in_category(self, src, categories): if categories == "all": return True for category in categories: + if "/" in category: + category = category.split("/")[0] if src[:len(category)] == category: return True return False diff --git a/radarpipeline/io/reader.py b/radarpipeline/io/reader.py index c0e73a5..91dade9 100644 --- a/radarpipeline/io/reader.py +++ b/radarpipeline/io/reader.py @@ -147,7 +147,7 @@ def _get_source_type(self, source_path): """ if source_path[-1] != "/": source_path = source_path + "/" - for x in os.walk(source_path): + for x in os.walk(source_path, topdown=False): for file in glob(os.path.join(x[0], '*.*')): for key, value in self.source_formats.items(): file_format = file.replace(source_path, "") From 0063c63e2e557f6b0125afedb735fe1d29f4d169 Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Tue, 30 Jul 2024 15:32:58 +0100 Subject: [PATCH 7/9] updated spark data write --- radarpipeline/io/writer.py | 4 +--- tests/tests_io/test_writer.py | 9 +++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/radarpipeline/io/writer.py b/radarpipeline/io/writer.py index 97a0809..8ea7734 100644 --- a/radarpipeline/io/writer.py +++ b/radarpipeline/io/writer.py @@ -33,7 +33,7 @@ def __init__( super().__init__(features, output_dir) self.data_format = data_format if data_format == "csv" or data_format == "pickle": - self.compression = "gzip" if compress is True else "infer" + self.compression = "gzip" if compress is True else "uncompressed" elif data_format == "parquet": self.compression = "gzip" if compress is True else "snappy" else: @@ -61,8 +61,6 @@ def save_dataframe(self, feature_name, feature_df): lineSep=constants.LINESEP, ) elif self.data_format == "parquet": - logger.info(f"Total Number of Rdd Partitions:{ - feature_df.rdd.getNumPartitions()}") feature_df.write.parquet( path=folder_path + ".parquet", compression=self.compression, diff --git a/tests/tests_io/test_writer.py b/tests/tests_io/test_writer.py index 56a1de0..d2b9e7b 100644 --- a/tests/tests_io/test_writer.py +++ b/tests/tests_io/test_writer.py @@ -26,7 +26,7 @@ def setUp(self): .getOrCreate() # Create PySpark DataFrame from Pandas self.sparkDF = spark.createDataFrame(self.mock_pandas) - + self.suffix = "" self.features = {"test_feature": self.sparkDF} def test_write_data_csv(self): @@ -54,20 +54,21 @@ def test_write_data_parquet(self): data_format="parquet") self.sparkdatawriter.write_data() # check if the dir exists - self.assertTrue(os.path.exists(f"{self.output_dir}" + "test_feature")) + self.assertTrue(os.path.exists(f"{self.output_dir}" + "test_feature.parquet")) # check if the file has the correct content ## spark read the directory spark = SparkSession.builder \ .master("local") \ .appName("radarpipeline") \ .getOrCreate() - output_file = spark.read.parquet(f"{self.output_dir}" + "test_feature") + self.suffix = ".parquet" + output_file = spark.read.parquet(f"{self.output_dir}" + "test_feature.parquet") assert_pyspark_df_equal(output_file, self.sparkDF) assert_frame_equal(output_file.toPandas(), self.mock_pandas) def tearDown(self): # delete the file - path = pl.Path(f"{self.output_dir}" + "test_feature") + path = pl.Path(f"{self.output_dir}" + "test_feature" + self.suffix) shutil.rmtree(path) From 09e5b4d42cdd0a1f94877c393f3f0dd1aad9ac3e Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Tue, 30 Jul 2024 15:35:38 +0100 Subject: [PATCH 8/9] solved flake8 issue --- radarpipeline/io/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/radarpipeline/io/connection.py b/radarpipeline/io/connection.py index f466647..92b20f2 100644 --- a/radarpipeline/io/connection.py +++ b/radarpipeline/io/connection.py @@ -430,4 +430,4 @@ def path_advance(thepath, sep=os.sep): for part in parts: curpath = os.path.join(curpath, part) if curpath: - yield curpath \ No newline at end of file + yield curpath From 7ee5f104030bf4872fce8fe24415a0d87d117e00 Mon Sep 17 00:00:00 2001 From: Heet Sankesara Date: Fri, 2 Aug 2024 16:19:22 +0100 Subject: [PATCH 9/9] solved minor test issues --- tests/resources/test_yamls/config_with_multiple_features.yaml | 2 +- tests/test_integration/test_integration.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/resources/test_yamls/config_with_multiple_features.yaml b/tests/resources/test_yamls/config_with_multiple_features.yaml index 0eb178e..c45f9e2 100644 --- a/tests/resources/test_yamls/config_with_multiple_features.yaml +++ b/tests/resources/test_yamls/config_with_multiple_features.yaml @@ -34,6 +34,6 @@ features: output: output_location: local # can be local, postgres, sftp config: - target_path: output/multiple_feature_test/ + target_path: tests/resources/test_output/ data_format: csv compress: false \ No newline at end of file diff --git a/tests/test_integration/test_integration.py b/tests/test_integration/test_integration.py index af1148c..9dc6afb 100644 --- a/tests/test_integration/test_integration.py +++ b/tests/test_integration/test_integration.py @@ -94,7 +94,7 @@ def test_multiple_feature_run(self): "tabularize_features_android_phone_battery_level.csv")) self.assertIsFile(path) # read the file and verify that the output is the same - expected_output_path = "tests/resources/expected_output/tabular" + expected_output_path = "tests/resources/expected_output/tabular/" expected_df = pd.read_csv( os.path.join(expected_output_path, "tabularize_features_android_phone_battery_level.csv")) @@ -130,4 +130,4 @@ def rmtree(self, f: pl.Path): def tearDown(self): # Delete the output directory after the test - self.rmtree(pl.Path(self.output_dir)) \ No newline at end of file + self.rmtree(pl.Path(self.output_dir))