From 53c60a39da61fc89c5bfdf135443c24eaa9fe614 Mon Sep 17 00:00:00 2001 From: "Ryan G. Hunter" Date: Mon, 28 Sep 2015 15:59:35 -0400 Subject: [PATCH] single commit upload per file --- gtfsdb/model/base.py | 16 ++++------------ gtfsdb/model/gtfs.py | 9 +-------- tests/performance.py | 4 ++-- 3 files changed, 7 insertions(+), 22 deletions(-) diff --git a/gtfsdb/model/base.py b/gtfsdb/model/base.py index 60c12e8..2538ac4 100644 --- a/gtfsdb/model/base.py +++ b/gtfsdb/model/base.py @@ -121,11 +121,6 @@ def load(cls, db, key_lookup, **kwargs): elif cls.datasource == config.DATASOURCE_LOOKUP: directory = resource_filename('gtfsdb', 'data') - if 'thread_pool' in kwargs.keys(): - thread_pool = kwargs.get('thread_pool') - else: - thread_pool = ThreadPoolExecutor(max_workers=1) - records = [] futures = [] file_path = os.path.join(directory, cls.filename) @@ -142,18 +137,15 @@ def load(cls, db, key_lookup, **kwargs): records.append(record) i += 1 if i >= batch_size: - futures.append(thread_pool.submit(db.execute, table.insert(), records)) + db.session.execute(table.insert(), records) + db.session.flush() records = [] i = 0 if len(records) > 0: - futures.append(thread_pool.submit(db.execute, table.insert(), records)) + db.session.execute(table.insert(), records) + db.session.flush() f.close() - if 'thread_pool' not in kwargs.keys(): - for future in futures: - while future.running(): - time.sleep(0.1) - future.result() process_time = time.time() - start_time log.debug('{0}.load ({1:.0f} seconds)'.format(cls.__name__, process_time)) return futures diff --git a/gtfsdb/model/gtfs.py b/gtfsdb/model/gtfs.py index e4ef42a..9b7d4c7 100644 --- a/gtfsdb/model/gtfs.py +++ b/gtfsdb/model/gtfs.py @@ -59,7 +59,6 @@ def load(self, db, **kwargs): batch_size=kwargs.get('batch_size', config.DEFAULT_BATCH_SIZE), gtfs_directory=gtfs_directory, key_lookup=key_lookup, - thread_pool=ThreadPoolExecutor(max_workers=kwargs.get('db_threads', config.DB_THREADS)), file_id=self.file_id ) futures = [] @@ -68,13 +67,7 @@ def load(self, db, **kwargs): shutil.rmtree(gtfs_directory) log.debug('GTFS.load: Done parsing, finishing upload') - for future in futures: - while future.running(): - time.sleep(0.1) - excp = future.exception() - if excp: - raise excp - future.result() + db.session.commit() process_time = time.time() - start_time log.debug('GTFS.load ({0:.0f} seconds)'.format(process_time)) diff --git a/tests/performance.py b/tests/performance.py index 220a953..91dbe1e 100644 --- a/tests/performance.py +++ b/tests/performance.py @@ -38,8 +38,8 @@ def unzip_file(file_path): batch_size = config #batch_sizes = [1000, 5000, 10000] - batch_sizes = [5000] - db_threads = [1,5,10] + batch_sizes = [10000] + db_threads = [1]#,5,10] result_list = [] for batch_size in batch_sizes: