Skip to content

Commit

Permalink
single commit upload per file
Browse files Browse the repository at this point in the history
  • Loading branch information
rghunter committed Sep 28, 2015
1 parent 6285b6a commit 53c60a3
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 22 deletions.
16 changes: 4 additions & 12 deletions gtfsdb/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ def load(cls, db, key_lookup, **kwargs):
elif cls.datasource == config.DATASOURCE_LOOKUP:
directory = resource_filename('gtfsdb', 'data')

if 'thread_pool' in kwargs.keys():
thread_pool = kwargs.get('thread_pool')
else:
thread_pool = ThreadPoolExecutor(max_workers=1)

records = []
futures = []
file_path = os.path.join(directory, cls.filename)
Expand All @@ -142,18 +137,15 @@ def load(cls, db, key_lookup, **kwargs):
records.append(record)
i += 1
if i >= batch_size:
futures.append(thread_pool.submit(db.execute, table.insert(), records))
db.session.execute(table.insert(), records)
db.session.flush()
records = []
i = 0
if len(records) > 0:
futures.append(thread_pool.submit(db.execute, table.insert(), records))
db.session.execute(table.insert(), records)
db.session.flush()
f.close()

if 'thread_pool' not in kwargs.keys():
for future in futures:
while future.running():
time.sleep(0.1)
future.result()
process_time = time.time() - start_time
log.debug('{0}.load ({1:.0f} seconds)'.format(cls.__name__, process_time))
return futures
Expand Down
9 changes: 1 addition & 8 deletions gtfsdb/model/gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def load(self, db, **kwargs):
batch_size=kwargs.get('batch_size', config.DEFAULT_BATCH_SIZE),
gtfs_directory=gtfs_directory,
key_lookup=key_lookup,
thread_pool=ThreadPoolExecutor(max_workers=kwargs.get('db_threads', config.DB_THREADS)),
file_id=self.file_id
)
futures = []
Expand All @@ -68,13 +67,7 @@ def load(self, db, **kwargs):
shutil.rmtree(gtfs_directory)

log.debug('GTFS.load: Done parsing, finishing upload')
for future in futures:
while future.running():
time.sleep(0.1)
excp = future.exception()
if excp:
raise excp
future.result()
db.session.commit()

process_time = time.time() - start_time
log.debug('GTFS.load ({0:.0f} seconds)'.format(process_time))
Expand Down
4 changes: 2 additions & 2 deletions tests/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def unzip_file(file_path):
batch_size = config

#batch_sizes = [1000, 5000, 10000]
batch_sizes = [5000]
db_threads = [1,5,10]
batch_sizes = [10000]
db_threads = [1]#,5,10]

result_list = []
for batch_size in batch_sizes:
Expand Down

0 comments on commit 53c60a3

Please sign in to comment.