Skip to content

Commit

Permalink
Handle transport differently for client and servers, works best with s…
Browse files Browse the repository at this point in the history
  • Loading branch information
runningman84 committed Jul 8, 2016
1 parent 9a84d21 commit 56c4518
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions lib/sensu/transport/snssqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,36 @@ def connected?; @connected; end

def connect(settings)
@settings = settings
@settings[:max_number_of_messages] = 10 if @settings[:max_number_of_messages].nil?
@settings[:wait_time_seconds] = 2 if @settings[:wait_time_seconds].nil?
@settings[:mode] = "unknown" if @settings[:mode].nil?

@connected = true
@results_callback = proc {}
@keepalives_callback = proc {}
@sqs = Aws::SQS::Client.new(region: @settings[:region])
@sns = Aws::SNS::Client.new(region: @settings[:region])
@sqs = nil
@sns = nil

unless @settings[:consuming_sqs_queue_url].nil?
@sqs = Aws::SQS::Client.new(region: @settings[:region])
else
self.logger.debug("sqs client disabled becase of missing consuming_sqs_queue_url")
end
unless @settings[:publishing_sns_topic_arn].nil?
@sns = Aws::SNS::Client.new(region: @settings[:region])
else
self.logger.debug("sns client disabled becase of missing publishing_sns_topic_arn")
end

case @settings[:mode]
when 'client'
raise("transport does not work without sns client, please specify publishing_sns_topic_arn") if @sns.nil?
when 'server'
raise("transport does not work without sqs client, please specify consuming_sqs_queue_url") if @sqs.nil?
end

self.logger.info("transport running in #{@settings[:mode]} mode")

# connect to statsd, if necessary
@statsd = nil
if !@settings[:statsd_addr].nil? and @settings[:statsd_addr] != ""
Expand Down Expand Up @@ -104,6 +128,7 @@ def subscribe(type, pipe, funnel = nil, options = {}, &callback)

# acknowledge will delete the given message from the SQS queue.
def acknowledge(info, &callback)
return if @sqs.nil?
EM.defer {
@sqs.delete_message(
queue_url: @settings[:consuming_sqs_queue_url],
Expand Down Expand Up @@ -143,6 +168,7 @@ def do_all_the_time(&blk)
end

def send_message(msg, attributes, &callback)
return if @sns.nil?
resp = @sns.publish(
target_arn: @settings[:publishing_sns_topic_arn],
message: msg,
Expand All @@ -157,6 +183,7 @@ def send_message(msg, attributes, &callback)
# receive_messages returns an array of SQS messages
# for the consuming queue
def receive_messages
return [] if @sqs.nil?
begin
resp = @sqs.receive_message(
message_attribute_names: PIPE_ARR,
Expand Down

0 comments on commit 56c4518

Please sign in to comment.