Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate worker for GTFS artifacts #112

Merged
merged 13 commits into from
Jun 30, 2015
49 changes: 23 additions & 26 deletions app/workers/feed_eater_feed_worker.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
class FeedEaterFeedWorker
include Sidekiq::Worker

PYTHON = './virtualenv/bin/python'
FEEDVALIDATOR = './virtualenv/bin/feedvalidator.py'

class FeedEaterFeedWorker < FeedEaterWorker
def perform(feed_onestop_id)
# Download the feed
feed = Feed.find_by(onestop_id: feed_onestop_id)
Expand All @@ -12,8 +7,8 @@ def perform(feed_onestop_id)
return unless updated

# Clear out old log files
log_file_path = FeedEaterFeedWorker.artifact_file_path("#{feed_onestop_id}.log")
validation_report_path = FeedEaterFeedWorker.artifact_file_path("#{feed_onestop_id}.html")
log_file_path = artifact_file_path("#{feed_onestop_id}.log")
validation_report_path = artifact_file_path("#{feed_onestop_id}.html")
FileUtils.rm(log_file_path) if File.exist?(log_file_path)
FileUtils.rm(validation_report_path) if File.exist?(validation_report_path)

Expand All @@ -23,16 +18,22 @@ def perform(feed_onestop_id)
# Validate and import feed
begin
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Validating feed"
run_python('./lib/feedeater/validate.py', "--feedvalidator #{FEEDVALIDATOR} --log #{log_file_path} #{feed_onestop_id}")
feedvalidator = Figaro.env.feedvalidator_path || './virtualenv/bin/python'
run_python(
'./lib/feedeater/validate.py',
'--feedvalidator',
feedvalidator,
'--log',
log_file_path,
feed_onestop_id
)
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Uploading feed"
run_python('./lib/feedeater/post.py', "--log #{log_file_path} #{feed_onestop_id}")
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Creating GTFS artifact"
run_python('./lib/feedeater/artifact.py', "--log #{log_file_path} #{feed_onestop_id}")
if Figaro.env.upload_feed_eater_artifacts_to_s3.present? &&
Figaro.env.upload_feed_eater_artifacts_to_s3 == 'true'
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueuing a job to upload artifacts to S3"
UploadFeedEaterArtifactsToS3Worker.perform_async(feed_onestop_id)
end
run_python(
'./lib/feedeater/post.py',
'--log',
log_file_path,
feed_onestop_id
)
rescue Exception => e
# NOTE: we're catching all exceptions, including Interrupt,
# SignalException, and SyntaxError
Expand All @@ -43,6 +44,10 @@ def perform(feed_onestop_id)
else
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Saving successful import"
feed.has_been_fetched_and_imported!(on_feed_import: feed_import)
if Figaro.env.auto_conflate_stops_with_osm == 'true'
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Enqueue artifact job"
GtfsFeedArtifactWorker.perform_async(feed_onestop_id)
end
ensure
# Cleanup
import_log = ''
Expand All @@ -63,16 +68,8 @@ def perform(feed_onestop_id)
validation_report: validation_report
)
end
end

private

def run_python(file, args)
success = system("#{PYTHON} #{file} #{args}")
raise "Error running Python #{file} #{args}" if !success
end
logger.info "FeedEaterFeedWorker #{feed_onestop_id}: Done."

def self.artifact_file_path(name)
File.join(Figaro.env.transitland_feed_data_path, name)
end
end
21 changes: 21 additions & 0 deletions app/workers/feed_eater_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,28 @@ def perform(feed_onestop_ids = [])
Feed.update_feeds_from_feed_registry
feeds = feed_onestop_ids.length > 0 ? Feed.where(onestop_id: feed_onestop_ids) : Feed.where('')
feeds.each do |feed|
logger.info "FeedEaterWorker: Enqueue #{feed.onestop_id}"
FeedEaterFeedWorker.perform_async(feed.onestop_id)
end
logger.info 'FeedEaterWorker: Done.'
end

private

def run_python(file, *args)
python = Figaro.env.python_path || './virtualenv/bin/python'
success = system(
python,
file,
*args
)
raise "Error running Python: #{file} #{args}" if !success
end

def artifact_file_path(name)
path = Figaro.env.transitland_feed_data_path
raise "Must specify TRANSITLAND_FEED_DATA_PATH" if !path
File.join(path, name)
end

end
38 changes: 38 additions & 0 deletions app/workers/gtfs_feed_artifact_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
class GtfsFeedArtifactWorker < FeedEaterWorker

MAX_ATTEMPTS = 10
WAIT_TIME = 10.minutes

def perform(feed_onestop_id, attempts=1)
logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Verifying osm_way_ids"
# TODO: Ian: Note, this fetches all prefix'd Stops, then filters locally.
# This is because we cannot easily query a NULL in the hstore tags.
# This will be revisited when osm_way_id is part of the model, not tags.
prefix = "gtfs://#{feed_onestop_id}/"
missing = Stop.with_identifer_starting_with(prefix).select { |x| x.tags['osm_way_id'].nil? }

if attempts > MAX_ATTEMPTS
logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Missing #{missing.length} osm_way_ids. #{attempts} attempts, aborting"
return
elsif missing.any?
logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Missing #{missing.length} osm_way_ids. #{attempts} attempts, trying again"
GtfsFeedArtifactWorker.perform_in(WAIT_TIME, feed_onestop_id, attempts+1)
return
end

logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Creating GTFS artifacts"
run_python(
'./lib/feedeater/artifact.py',
'--quiet',
feed_onestop_id
)

if Figaro.env.upload_feed_eater_artifacts_to_s3 == 'true'
logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Enqueuing a job to upload artifacts to S3"
UploadFeedEaterArtifactsToS3Worker.perform_async(feed_onestop_id)
end

logger.info "GtfsFeedArtifactWorker #{feed_onestop_id}: Done."

end
end
8 changes: 2 additions & 6 deletions app/workers/upload_feed_eater_artifacts_to_s3_worker.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
require 'aws-sdk'

class UploadFeedEaterArtifactsToS3Worker
include Sidekiq::Worker

PYTHON = './virtualenv/bin/python'
FEEDVALIDATOR = './virtualenv/bin/feedvalidator.py'
class UploadFeedEaterArtifactsToS3Worker < FeedEaterWorker

ARTIFACT_UPLOAD_S3_DIRECTORY = 'feedeater-artifacts/'

Expand All @@ -25,7 +21,7 @@ def perform(feed_onestop_id)
logger.info "S3 bucket: #{Figaro.env.artifact_upload_s3_bucket}"

['.html', '.zip', '.artifact.zip', '.log'].each do |file_extension|
local_file_path = FeedEaterFeedWorker.artifact_file_path(feed_onestop_id + file_extension)
local_file_path = artifact_file_path(feed_onestop_id + file_extension)
remote_file_path = ARTIFACT_UPLOAD_S3_DIRECTORY + feed_onestop_id + file_extension

logger.info "Uploading #{local_file_path} to S3"
Expand Down
2 changes: 2 additions & 0 deletions config/sample.application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ ARTIFACT_UPLOAD_S3_REGION: 'us-east-1'
ARTIFACT_UPLOAD_S3_BUCKET: 'name of an S3 bucket'
AWS_ACCESS_KEY_ID: 'xxxxxxxxx'
AWS_SECRET_ACCESS_KEY: 'xxxxxx'
PYTHON_PATH: './virtualenv/bin/python'
FEEDVALIDATOR_PATH: './virtualenv/bin/feedvalidator.py'
34 changes: 3 additions & 31 deletions lib/feedeater/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,8 @@

import util
import task
# Temporary:
import tyr

class FeedEaterArtifact(task.FeedEaterTask):
def __init__(self, *args, **kwargs):
super(FeedEaterArtifact, self).__init__(*args, **kwargs)
self.tyrhost = kwargs.get('tyrhost')
self.tyrtoken = kwargs.get('tyrtoken')

def parser(self):
parser = super(FeedEaterArtifact, self).parser()
parser.add_argument(
'--tyrtoken',
help='TYR api token',
default=os.getenv('TYR_AUTH_TOKEN')
)
parser.add_argument(
'--tyrhost',
help='TYR Host',
default=os.getenv('TYR_HOST') or 'https://valhalla.mapzen.com'
)
return parser

def run(self):
# Create GTFS Artifacts
self.log("===== Feed: %s ====="%self.feedid)
Expand All @@ -49,14 +28,7 @@ def run(self):
continue
match = sorted(found, key=lambda x:x.data['updated_at'])[0]
onestop_id = match.onestop()
osm_way_id = match.data.get('osm_way_id')
if not osm_way_id and tyr:
osm_way_id = tyr.tyr_osm(
stop,
endpoint=self.tyrhost,
apitoken=self.tyrtoken
)
self.log(" ... got tyr osm_way_id: %s"%osm_way_id)
osm_way_id = match.tag('osm_way_id')
self.log(" onestop_id: %s, osm_way_id: %s"%(onestop_id, osm_way_id))
stop.set('onestop_id', onestop_id)
stop.set('osm_way_id', osm_way_id)
Expand All @@ -67,7 +39,7 @@ def run(self):
if os.path.exists(stopstxt):
os.unlink(stopstxt)
if os.path.exists(artifact):
os.unlink(artifact)
os.unlink(artifact)
#
self.log("Creating output artifact: %s"%artifact)
gtfsfeed.write(stopstxt, gtfsfeed.stops(), sortkey='stop_id')
Expand All @@ -76,7 +48,7 @@ def run(self):
if os.path.exists(stopstxt):
os.unlink(stopstxt)
self.log("Finished!")

if __name__ == "__main__":
task = FeedEaterArtifact.from_args()
task.run()
21 changes: 15 additions & 6 deletions lib/feedeater/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(
apitoken=None,
debug=None,
log=None,
quiet=None,
**kwargs
):
self.filename = filename
Expand All @@ -30,13 +31,15 @@ def __init__(
apitoken=apitoken,
debug=debug
)
self.logger = self._log_init(logfile=log, debug=debug)
self.logger = self._log_init(logfile=log, debug=debug, quiet=quiet)

def _log_init(self, logfile=None, debug=False):
def _log_init(self, logfile=None, debug=False, quiet=False):
fmt = '[%(asctime)s] %(message)s'
datefmt = '%Y-%m-%d %H:%M:%S'
logger = logging.getLogger(str(id(self)))
if debug:
if quiet:
logger.setLevel(100)
elif debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
Expand All @@ -52,12 +55,13 @@ def _log_init(self, logfile=None, debug=False):

@classmethod
def from_args(cls):
parser = cls().parser()
parser = cls.parser()
args = parser.parse_args()
return cls(**vars(args))

def parser(self):
parser = argparse.ArgumentParser(description=self.__doc__)
@classmethod
def parser(cls):
parser = argparse.ArgumentParser(description=cls.__doc__)
parser.add_argument(
'feedid',
help='Feed IDs'
Expand Down Expand Up @@ -95,6 +99,11 @@ def parser(self):
'--log',
help='Log file'
)
parser.add_argument(
'--quiet',
action='store_true',
help='Quiet; no log output'
)
return parser

def debug(self, msg):
Expand Down
54 changes: 0 additions & 54 deletions lib/feedeater/tyr.py

This file was deleted.

15 changes: 8 additions & 7 deletions lib/feedeater/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,27 @@ def __init__(self, *args, **kwargs):
super(FeedEaterValidate, self).__init__(*args, **kwargs)
self.feedvalidator = kwargs.get('feedvalidator')

def parser(self):
parser = super(FeedEaterValidate, self).parser()
@classmethod
def parser(cls):
parser = super(FeedEaterValidate, cls).parser()
parser.add_argument(
'--feedvalidator',
help='Path to feedvalidator.py'
)
)
return parser

def run(self):
# Validate feeds
self.log("===== Feed: %s ====="%self.feedid)
feed = self.registry.feed(self.feedid)
filename = self.filename or os.path.join(self.workdir, '%s.zip'%feed.onestop())
report = os.path.join(self.workdir, '%s.html'%feed.onestop())
self.log("Validating: %s"%filename)
gtfsfeed = mzgtfs.feed.Feed(filename)
gtfsfeed = mzgtfs.feed.Feed(filename)
validator = mzgtfs.validation.ValidationReport()
# gtfsfeed.validate(validator)
gtfsfeed.validate_feedvalidator(
validator,
validator,
feedvalidator=self.feedvalidator,
report=report,
)
Expand All @@ -41,7 +42,7 @@ def run(self):
for e in validator.exceptions:
self.log("%s: %s"%(e.source, e.message))
self.log("Finished!")

if __name__ == "__main__":
task = FeedEaterValidate.from_args()
task.run()
3 changes: 3 additions & 0 deletions spec/workers/gtfs_feed_artifact_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
describe GtfsFeedArtifactWorker do
pending 'write some specs'
end