From 50c53c152cb124ef4eea796490223adbfdf989e7 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 14:45:50 -0700 Subject: [PATCH 01/13] Move Python interpreter and feedvalidator path to config --- config/sample.application.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/sample.application.yml b/config/sample.application.yml index 14d49e67c..f4670c80f 100644 --- a/config/sample.application.yml +++ b/config/sample.application.yml @@ -11,3 +11,5 @@ ARTIFACT_UPLOAD_S3_REGION: 'us-east-1' ARTIFACT_UPLOAD_S3_BUCKET: 'name of an S3 bucket' AWS_ACCESS_KEY_ID: 'xxxxxxxxx' AWS_SECRET_ACCESS_KEY: 'xxxxxx' +PYTHON_PATH: './virtualenv/bin/python' +FEEDVALIDATOR_PATH: './virtualenv/bin/feedvalidator.py' From 7555e31afd6453bc785e67debf193ba5c2567ce5 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 14:45:59 -0700 Subject: [PATCH 02/13] Remove tyr from feedeater --- lib/feedeater/artifact.py | 32 ++--------------------- lib/feedeater/tyr.py | 54 --------------------------------------- 2 files changed, 2 insertions(+), 84 deletions(-) delete mode 100644 lib/feedeater/tyr.py diff --git a/lib/feedeater/artifact.py b/lib/feedeater/artifact.py index 3585af872..9a4d9b335 100644 --- a/lib/feedeater/artifact.py +++ b/lib/feedeater/artifact.py @@ -7,29 +7,8 @@ import util import task -# Temporary: -import tyr class FeedEaterArtifact(task.FeedEaterTask): - def __init__(self, *args, **kwargs): - super(FeedEaterArtifact, self).__init__(*args, **kwargs) - self.tyrhost = kwargs.get('tyrhost') - self.tyrtoken = kwargs.get('tyrtoken') - - def parser(self): - parser = super(FeedEaterArtifact, self).parser() - parser.add_argument( - '--tyrtoken', - help='TYR api token', - default=os.getenv('TYR_AUTH_TOKEN') - ) - parser.add_argument( - '--tyrhost', - help='TYR Host', - default=os.getenv('TYR_HOST') or 'https://valhalla.mapzen.com' - ) - return parser - def run(self): # Create GTFS Artifacts self.log("===== Feed: %s ====="%self.feedid) @@ -50,13 +29,6 @@ def run(self): match = sorted(found, key=lambda x:x.data['updated_at'])[0] onestop_id = match.onestop() osm_way_id = match.data.get('osm_way_id') - if not osm_way_id and tyr: - osm_way_id = tyr.tyr_osm( - stop, - endpoint=self.tyrhost, - apitoken=self.tyrtoken - ) - self.log(" ... got tyr osm_way_id: %s"%osm_way_id) self.log(" onestop_id: %s, osm_way_id: %s"%(onestop_id, osm_way_id)) stop.set('onestop_id', onestop_id) stop.set('osm_way_id', osm_way_id) @@ -67,7 +39,7 @@ def run(self): if os.path.exists(stopstxt): os.unlink(stopstxt) if os.path.exists(artifact): - os.unlink(artifact) + os.unlink(artifact) # self.log("Creating output artifact: %s"%artifact) gtfsfeed.write(stopstxt, gtfsfeed.stops(), sortkey='stop_id') @@ -76,7 +48,7 @@ def run(self): if os.path.exists(stopstxt): os.unlink(stopstxt) self.log("Finished!") - + if __name__ == "__main__": task = FeedEaterArtifact.from_args() task.run() diff --git a/lib/feedeater/tyr.py b/lib/feedeater/tyr.py deleted file mode 100644 index 3f7b1bfa0..000000000 --- a/lib/feedeater/tyr.py +++ /dev/null @@ -1,54 +0,0 @@ -"""TYR interface.""" -import json -import urllib -import urllib2 -import collections - -import util - -def tyr_osm(stop, endpoint, apitoken=None, debug=False): - t = TYR(endpoint, apitoken=apitoken, debug=debug) - response = t.locate([stop.point()]) - try: - assert response - assert response[0]['ways'] - except: - return None - ways = collections.defaultdict(list) - for way in response[0]['ways']: - d = util.haversine(stop.point(), (way['correlated_lon'], way['correlated_lat'])) - ways[d].append(way['way_id']) - # get the lowest way_id in the closest way. - way_id = sorted(ways[sorted(ways.keys())[0]])[0] - return way_id - -class TYR(object): - def __init__(self, endpoint, apitoken=None, debug=False): - self.endpoint = endpoint - self.apitoken = apitoken - - def locate(self, locations, costing='pedestrian'): - data = { - 'locations': [ - {'lon':i[0], 'lat':i[1]} for i in locations - ], - 'costing': costing - } - return self.getjson('%s/locate'%self.endpoint, json=json.dumps(data)) - - def getjson(self, endpoint, **qs): - if self.apitoken: - qs['api_key'] = self.apitoken - if qs: - endpoint = '%s?%s'%( - endpoint, - urllib.urlencode(qs) - ) - req = urllib2.Request(endpoint) - response = urllib2.urlopen(req) - ret = response.read() - try: - ret = json.loads(ret) - except ValueError, e: - return None - return ret From af50fb339b4aa2147838ba367f2c8c2a16c82684 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:07:09 -0700 Subject: [PATCH 03/13] Subclass from FeedEaterWorker Move run_python, artifact_file_path, etc., to FeedEaterWorker Move PYTHON_PATH and VALIDATOR_PATH to config Use #system with array arguments, not interpolated string --- app/workers/feed_eater_feed_worker.rb | 50 ++++++++----------- app/workers/feed_eater_worker.rb | 19 +++++++ ...pload_feed_eater_artifacts_to_s3_worker.rb | 8 +-- 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/app/workers/feed_eater_feed_worker.rb b/app/workers/feed_eater_feed_worker.rb index 395cbb7ad..c76f76c09 100644 --- a/app/workers/feed_eater_feed_worker.rb +++ b/app/workers/feed_eater_feed_worker.rb @@ -1,19 +1,14 @@ -class FeedEaterFeedWorker - include Sidekiq::Worker - - PYTHON = './virtualenv/bin/python' - FEEDVALIDATOR = './virtualenv/bin/feedvalidator.py' - +class FeedEaterFeedWorker < FeedEaterWorker def perform(feed_onestop_id) # Download the feed feed = Feed.find_by(onestop_id: feed_onestop_id) logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Downloading #{feed.url}" updated = feed.fetch_and_check_for_updated_version - return unless updated + # return unless updated # Clear out old log files - log_file_path = FeedEaterFeedWorker.artifact_file_path("#{feed_onestop_id}.log") - validation_report_path = FeedEaterFeedWorker.artifact_file_path("#{feed_onestop_id}.html") + log_file_path = artifact_file_path("#{feed_onestop_id}.log") + validation_report_path = artifact_file_path("#{feed_onestop_id}.html") FileUtils.rm(log_file_path) if File.exist?(log_file_path) FileUtils.rm(validation_report_path) if File.exist?(validation_report_path) @@ -23,16 +18,24 @@ def perform(feed_onestop_id) # Validate and import feed begin logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Validating feed" - run_python('./lib/feedeater/validate.py', "--feedvalidator #{FEEDVALIDATOR} --log #{log_file_path} #{feed_onestop_id}") + feedvalidator = Figaro.env.feedvalidator_path || './virtualenv/bin/python' + run_python( + './lib/feedeater/validate.py', + '--feedvalidator', + feedvalidator, + '--log', + log_file_path, + feed_onestop_id + ) logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Uploading feed" - run_python('./lib/feedeater/post.py', "--log #{log_file_path} #{feed_onestop_id}") - logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Creating GTFS artifact" - run_python('./lib/feedeater/artifact.py', "--log #{log_file_path} #{feed_onestop_id}") - if Figaro.env.upload_feed_eater_artifacts_to_s3.present? && - Figaro.env.upload_feed_eater_artifacts_to_s3 == 'true' - logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueuing a job to upload artifacts to S3" - UploadFeedEaterArtifactsToS3Worker.perform_async(feed_onestop_id) - end + # run_python( + # './lib/feedeater/post.py', + # '--log', + # log_file_path, + # feed_onestop_id + # ) + logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueue artifact job" + GtfsFeedArtifactWorker.perform_async(feed_onestop_id) rescue Exception => e # NOTE: we're catching all exceptions, including Interrupt, # SignalException, and SyntaxError @@ -64,15 +67,4 @@ def perform(feed_onestop_id) ) end end - - private - - def run_python(file, args) - success = system("#{PYTHON} #{file} #{args}") - raise "Error running Python #{file} #{args}" if !success - end - - def self.artifact_file_path(name) - File.join(Figaro.env.transitland_feed_data_path, name) - end end diff --git a/app/workers/feed_eater_worker.rb b/app/workers/feed_eater_worker.rb index b472baf52..ea441d08d 100644 --- a/app/workers/feed_eater_worker.rb +++ b/app/workers/feed_eater_worker.rb @@ -9,4 +9,23 @@ def perform(feed_onestop_ids = []) FeedEaterFeedWorker.perform_async(feed.onestop_id) end end + + private + + def run_python(file, *args) + python = Figaro.env.python_path || './virtualenv/bin/python' + success = system( + python, + file, + *args + ) + raise "Error running Python: #{file} #{args}" if !success + end + + def artifact_file_path(name) + path = Figaro.env.transitland_feed_data_path + raise "Must specify TRANSITLAND_FEED_DATA_PATH" if !path + File.join(path, name) + end + end diff --git a/app/workers/upload_feed_eater_artifacts_to_s3_worker.rb b/app/workers/upload_feed_eater_artifacts_to_s3_worker.rb index 37fc6aebf..1dd63ca88 100644 --- a/app/workers/upload_feed_eater_artifacts_to_s3_worker.rb +++ b/app/workers/upload_feed_eater_artifacts_to_s3_worker.rb @@ -1,10 +1,6 @@ require 'aws-sdk' -class UploadFeedEaterArtifactsToS3Worker - include Sidekiq::Worker - - PYTHON = './virtualenv/bin/python' - FEEDVALIDATOR = './virtualenv/bin/feedvalidator.py' +class UploadFeedEaterArtifactsToS3Worker < FeedEaterWorker ARTIFACT_UPLOAD_S3_DIRECTORY = 'feedeater-artifacts/' @@ -25,7 +21,7 @@ def perform(feed_onestop_id) logger.info "S3 bucket: #{Figaro.env.artifact_upload_s3_bucket}" ['.html', '.zip', '.artifact.zip', '.log'].each do |file_extension| - local_file_path = FeedEaterFeedWorker.artifact_file_path(feed_onestop_id + file_extension) + local_file_path = artifact_file_path(feed_onestop_id + file_extension) remote_file_path = ARTIFACT_UPLOAD_S3_DIRECTORY + feed_onestop_id + file_extension logger.info "Uploading #{local_file_path} to S3" From 4234b0bb6356a0c0f220f1cdd62b1c00ece39bd2 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:07:51 -0700 Subject: [PATCH 04/13] Move artifact creation to separate job Try 5 attempts --- app/workers/gtfs_feed_artifact_worker.rb | 32 ++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 app/workers/gtfs_feed_artifact_worker.rb diff --git a/app/workers/gtfs_feed_artifact_worker.rb b/app/workers/gtfs_feed_artifact_worker.rb new file mode 100644 index 000000000..c18c2ebed --- /dev/null +++ b/app/workers/gtfs_feed_artifact_worker.rb @@ -0,0 +1,32 @@ +class GtfsFeedArtifactWorker < FeedEaterWorker + + MAX_ATTEMPTS = 5 + WAIT_TIME = 10.minutes + + def perform(feed_onestop_id, attempts=1) + logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Verifying osm_way_ids" + prefix = "gtfs://#{feed_onestop_id}/" + missing = Stop.with_identifer_starting_with(prefix).select { |x| x.tags['osm_way_id'].nil? } + + if attempts > MAX_ATTEMPTS + logger.info "Missing #{missing.length} osm_way_ids. #{attempts} attempts: aborting" + return + elsif missing.any? + logger.info "Missing #{missing.length} osm_way_ids. #{attempts} attempts: trying again" + GtfsFeedArtifactWorker.perform_in(WAIT_TIME, feed_onestop_id, attempts+1) + return + end + + logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Creating GTFS artifacts" + run_python( + './lib/feedeater/artifact.py', + feed_onestop_id + ) + + if Figaro.env.upload_feed_eater_artifacts_to_s3 == 'true' + logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Enqueuing a job to upload artifacts to S3" + UploadFeedEaterArtifactsToS3Worker.perform_async(feed_onestop_id) + end + + end +end From 3b533cc71bf2f1a2db54d3c8d5ac11998dadc106 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:08:51 -0700 Subject: [PATCH 05/13] Remove debugging... --- app/workers/feed_eater_feed_worker.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/app/workers/feed_eater_feed_worker.rb b/app/workers/feed_eater_feed_worker.rb index c76f76c09..366c550e2 100644 --- a/app/workers/feed_eater_feed_worker.rb +++ b/app/workers/feed_eater_feed_worker.rb @@ -4,7 +4,7 @@ def perform(feed_onestop_id) feed = Feed.find_by(onestop_id: feed_onestop_id) logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Downloading #{feed.url}" updated = feed.fetch_and_check_for_updated_version - # return unless updated + return unless updated # Clear out old log files log_file_path = artifact_file_path("#{feed_onestop_id}.log") @@ -28,12 +28,12 @@ def perform(feed_onestop_id) feed_onestop_id ) logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Uploading feed" - # run_python( - # './lib/feedeater/post.py', - # '--log', - # log_file_path, - # feed_onestop_id - # ) + run_python( + './lib/feedeater/post.py', + '--log', + log_file_path, + feed_onestop_id + ) logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueue artifact job" GtfsFeedArtifactWorker.perform_async(feed_onestop_id) rescue Exception => e From 24471d24c321efae6b552c958fa84fc382895f05 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:45:21 -0700 Subject: [PATCH 06/13] osm_way_id moved to tags --- lib/feedeater/artifact.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/feedeater/artifact.py b/lib/feedeater/artifact.py index 9a4d9b335..11c5dc984 100644 --- a/lib/feedeater/artifact.py +++ b/lib/feedeater/artifact.py @@ -28,7 +28,7 @@ def run(self): continue match = sorted(found, key=lambda x:x.data['updated_at'])[0] onestop_id = match.onestop() - osm_way_id = match.data.get('osm_way_id') + osm_way_id = match.tag('osm_way_id') self.log(" onestop_id: %s, osm_way_id: %s"%(onestop_id, osm_way_id)) stop.set('onestop_id', onestop_id) stop.set('osm_way_id', osm_way_id) From 9f383c8c8fce3b223213a347163e42d73fb30f70 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:45:33 -0700 Subject: [PATCH 07/13] Move artifact enqueue to end of successful import --- app/workers/feed_eater_feed_worker.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/app/workers/feed_eater_feed_worker.rb b/app/workers/feed_eater_feed_worker.rb index 366c550e2..1c7cacba3 100644 --- a/app/workers/feed_eater_feed_worker.rb +++ b/app/workers/feed_eater_feed_worker.rb @@ -34,8 +34,6 @@ def perform(feed_onestop_id) log_file_path, feed_onestop_id ) - logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueue artifact job" - GtfsFeedArtifactWorker.perform_async(feed_onestop_id) rescue Exception => e # NOTE: we're catching all exceptions, including Interrupt, # SignalException, and SyntaxError @@ -46,6 +44,10 @@ def perform(feed_onestop_id) else logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Saving successful import" feed.has_been_fetched_and_imported!(on_feed_import: feed_import) + if Figaro.env.auto_conflate_stops_with_osm == 'true' + logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueue artifact job" + GtfsFeedArtifactWorker.perform_async(feed_onestop_id) + end ensure # Cleanup import_log = '' From 0f0536e2fd47f49e2a852fb9a3b30b35bb2cd0fa Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:57:26 -0700 Subject: [PATCH 08/13] Add --quiet option --- lib/feedeater/task.py | 21 +++++++++++++++------ lib/feedeater/validate.py | 15 ++++++++------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/lib/feedeater/task.py b/lib/feedeater/task.py index 4f25c44e8..44ec3f68e 100644 --- a/lib/feedeater/task.py +++ b/lib/feedeater/task.py @@ -19,6 +19,7 @@ def __init__( apitoken=None, debug=None, log=None, + quiet=None, **kwargs ): self.filename = filename @@ -30,13 +31,15 @@ def __init__( apitoken=apitoken, debug=debug ) - self.logger = self._log_init(logfile=log, debug=debug) + self.logger = self._log_init(logfile=log, debug=debug, quiet=quiet) - def _log_init(self, logfile=None, debug=False): + def _log_init(self, logfile=None, debug=False, quiet=False): fmt = '[%(asctime)s] %(message)s' datefmt = '%Y-%m-%d %H:%M:%S' logger = logging.getLogger(str(id(self))) - if debug: + if quiet: + logger.setLevel(100) + elif debug: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) @@ -52,12 +55,13 @@ def _log_init(self, logfile=None, debug=False): @classmethod def from_args(cls): - parser = cls().parser() + parser = cls.parser() args = parser.parse_args() return cls(**vars(args)) - def parser(self): - parser = argparse.ArgumentParser(description=self.__doc__) + @classmethod + def parser(cls): + parser = argparse.ArgumentParser(description=cls.__doc__) parser.add_argument( 'feedid', help='Feed IDs' @@ -95,6 +99,11 @@ def parser(self): '--log', help='Log file' ) + parser.add_argument( + '--quiet', + action='store_true', + help='Quiet; no log output' + ) return parser def debug(self, msg): diff --git a/lib/feedeater/validate.py b/lib/feedeater/validate.py index ff80f72a0..0a6ce7504 100644 --- a/lib/feedeater/validate.py +++ b/lib/feedeater/validate.py @@ -11,14 +11,15 @@ def __init__(self, *args, **kwargs): super(FeedEaterValidate, self).__init__(*args, **kwargs) self.feedvalidator = kwargs.get('feedvalidator') - def parser(self): - parser = super(FeedEaterValidate, self).parser() + @classmethod + def parser(cls): + parser = super(FeedEaterValidate, cls).parser() parser.add_argument( '--feedvalidator', help='Path to feedvalidator.py' - ) + ) return parser - + def run(self): # Validate feeds self.log("===== Feed: %s ====="%self.feedid) @@ -26,11 +27,11 @@ def run(self): filename = self.filename or os.path.join(self.workdir, '%s.zip'%feed.onestop()) report = os.path.join(self.workdir, '%s.html'%feed.onestop()) self.log("Validating: %s"%filename) - gtfsfeed = mzgtfs.feed.Feed(filename) + gtfsfeed = mzgtfs.feed.Feed(filename) validator = mzgtfs.validation.ValidationReport() # gtfsfeed.validate(validator) gtfsfeed.validate_feedvalidator( - validator, + validator, feedvalidator=self.feedvalidator, report=report, ) @@ -41,7 +42,7 @@ def run(self): for e in validator.exceptions: self.log("%s: %s"%(e.source, e.message)) self.log("Finished!") - + if __name__ == "__main__": task = FeedEaterValidate.from_args() task.run() From 25ead95e1763f3da1fc422231a1d9807f95170b4 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 15:58:03 -0700 Subject: [PATCH 09/13] Run artifact.py with --quiet --- app/workers/gtfs_feed_artifact_worker.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/workers/gtfs_feed_artifact_worker.rb b/app/workers/gtfs_feed_artifact_worker.rb index c18c2ebed..a1af281c8 100644 --- a/app/workers/gtfs_feed_artifact_worker.rb +++ b/app/workers/gtfs_feed_artifact_worker.rb @@ -9,10 +9,10 @@ def perform(feed_onestop_id, attempts=1) missing = Stop.with_identifer_starting_with(prefix).select { |x| x.tags['osm_way_id'].nil? } if attempts > MAX_ATTEMPTS - logger.info "Missing #{missing.length} osm_way_ids. #{attempts} attempts: aborting" + logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Missing #{missing.length} osm_way_ids. #{attempts} attempts, aborting" return elsif missing.any? - logger.info "Missing #{missing.length} osm_way_ids. #{attempts} attempts: trying again" + logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Missing #{missing.length} osm_way_ids. #{attempts} attempts, trying again" GtfsFeedArtifactWorker.perform_in(WAIT_TIME, feed_onestop_id, attempts+1) return end @@ -20,6 +20,7 @@ def perform(feed_onestop_id, attempts=1) logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Creating GTFS artifacts" run_python( './lib/feedeater/artifact.py', + '--quiet', feed_onestop_id ) From bee77b400c569e3743fc4781b7e1819d1a5ffa44 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 16:04:41 -0700 Subject: [PATCH 10/13] Log job finished --- app/workers/feed_eater_feed_worker.rb | 3 +++ app/workers/feed_eater_worker.rb | 4 ++++ app/workers/gtfs_feed_artifact_worker.rb | 2 ++ 3 files changed, 9 insertions(+) diff --git a/app/workers/feed_eater_feed_worker.rb b/app/workers/feed_eater_feed_worker.rb index 1c7cacba3..a5f5d8bfe 100644 --- a/app/workers/feed_eater_feed_worker.rb +++ b/app/workers/feed_eater_feed_worker.rb @@ -68,5 +68,8 @@ def perform(feed_onestop_id) validation_report: validation_report ) end + + logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Done." + end end diff --git a/app/workers/feed_eater_worker.rb b/app/workers/feed_eater_worker.rb index ea441d08d..04cc590d6 100644 --- a/app/workers/feed_eater_worker.rb +++ b/app/workers/feed_eater_worker.rb @@ -6,8 +6,12 @@ def perform(feed_onestop_ids = []) Feed.update_feeds_from_feed_registry feeds = feed_onestop_ids.length > 0 ? Feed.where(onestop_id: feed_onestop_ids) : Feed.where('') feeds.each do |feed| + logger.info "FeedEaterWorker: Enqueue #{feed.onestop_id}" FeedEaterFeedWorker.perform_async(feed.onestop_id) end + + logger.info 'FeedEaterWorker: Done.' + end private diff --git a/app/workers/gtfs_feed_artifact_worker.rb b/app/workers/gtfs_feed_artifact_worker.rb index a1af281c8..706a2e8cc 100644 --- a/app/workers/gtfs_feed_artifact_worker.rb +++ b/app/workers/gtfs_feed_artifact_worker.rb @@ -29,5 +29,7 @@ def perform(feed_onestop_id, attempts=1) UploadFeedEaterArtifactsToS3Worker.perform_async(feed_onestop_id) end + logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Done." + end end From c05e1ecb99a827c04c5229637c7f01bca9f16649 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 16:06:32 -0700 Subject: [PATCH 11/13] Test stub --- spec/workers/gtfs_feed_artifact_worker_spec.rb | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 spec/workers/gtfs_feed_artifact_worker_spec.rb diff --git a/spec/workers/gtfs_feed_artifact_worker_spec.rb b/spec/workers/gtfs_feed_artifact_worker_spec.rb new file mode 100644 index 000000000..a994bd931 --- /dev/null +++ b/spec/workers/gtfs_feed_artifact_worker_spec.rb @@ -0,0 +1,3 @@ +describe GtfsFeedArtifactWorker do + pending 'write some specs' +end From 5bd76025a2613f2c8c4d121e597d58613a557ca3 Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 16:09:44 -0700 Subject: [PATCH 12/13] cosmetic --- app/workers/feed_eater_worker.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/app/workers/feed_eater_worker.rb b/app/workers/feed_eater_worker.rb index 04cc590d6..78adf70b2 100644 --- a/app/workers/feed_eater_worker.rb +++ b/app/workers/feed_eater_worker.rb @@ -9,9 +9,7 @@ def perform(feed_onestop_ids = []) logger.info "FeedEaterWorker: Enqueue #{feed.onestop_id}" FeedEaterFeedWorker.perform_async(feed.onestop_id) end - logger.info 'FeedEaterWorker: Done.' - end private From c089d104cb60efdc006ac73ec62c6e81474a73ca Mon Sep 17 00:00:00 2001 From: Ian Rees Date: Mon, 29 Jun 2015 16:55:57 -0700 Subject: [PATCH 13/13] Implementation note Also bump MAX_ATTEMPTS to 10 for now. --- app/workers/gtfs_feed_artifact_worker.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/workers/gtfs_feed_artifact_worker.rb b/app/workers/gtfs_feed_artifact_worker.rb index 706a2e8cc..f142c77d9 100644 --- a/app/workers/gtfs_feed_artifact_worker.rb +++ b/app/workers/gtfs_feed_artifact_worker.rb @@ -1,10 +1,13 @@ class GtfsFeedArtifactWorker < FeedEaterWorker - MAX_ATTEMPTS = 5 + MAX_ATTEMPTS = 10 WAIT_TIME = 10.minutes def perform(feed_onestop_id, attempts=1) logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Verifying osm_way_ids" + # TODO: Ian: Note, this fetches all prefix'd Stops, then filters locally. + # This is because we cannot easily query a NULL in the hstore tags. + # This will be revisited when osm_way_id is part of the model, not tags. prefix = "gtfs://#{feed_onestop_id}/" missing = Stop.with_identifer_starting_with(prefix).select { |x| x.tags['osm_way_id'].nil? }