Skip to content
This repository has been archived by the owner on Jun 1, 2023. It is now read-only.

Implements concurrent requests #48

Closed
wants to merge 12 commits into from
39 changes: 2 additions & 37 deletions src/scry.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,13 @@ require "./scry/context"
require "./scry/message"
require "./scry/response"
require "./scry/environment_config"
require "./scry/concurrent_rpc"

module Scry
def self.start
Log.logger.info("Scry is looking into your code...")

at_exit do
Log.logger.info("...your session has ended")
end

EnvironmentConfig.new.run

next_request = Channel(Int32).new(10)
context = Context.new

loop do |i|
spawn do
Log.logger.debug("Scry is listening request ##{i}...")
content = Request.new(STDIN).read
request = Message.new(content).parse
results = context.dispatch(request)
rescue ex
results = [ResponseMessage.new(ex)]
ensure
response = Response.new([results].flatten)
response.write(STDOUT)
next_request.send i
end

select
when n = next_request.receive
Log.logger.debug("Scry has processed request ##{n}!")
next
else
sleep 1
end
end
rescue ex
Log.logger.error(
%(#{ex.message || "Unknown error"}\n#{ex.backtrace.join('\n')})
) unless Log.logger.nil?
ensure
Log.logger.close unless Log.logger.nil?
ConcurrentRpc.new.run
end
end

Expand Down
69 changes: 69 additions & 0 deletions src/scry/concurrent_rpc.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
module Scry
class ConcurrentRpc
CONTEXT = Context.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's safe to have the context global to all the fibers?

(not sure what the Context object does in scry)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well,I tried to use @context as well, but still doesn't work, see comment below 👇


private def get_requests(reader, input_channel)
Log.logger.info "getting request..."
content = Request.new(reader).read
Log.logger.info "getting content..."
input_channel.send content
end

private def handle_request(content, result_channel)
Log.logger.info "Processing request..."
Log.logger.info content
request = Message.new(content).parse
results = CONTEXT.dispatch(request)
rescue ex
results = [ResponseMessage.new(ex)]
ensure
Log.logger.info results
Log.logger.info "End Processing request..."
result_channel.send [results].flatten
end

def run
Log.logger.info "server has started"
input_channel = Channel(String | Nil).new

spawn get_requests(reader, input_channel)

channels = [] of Channel(String | Nil) | Channel(Array(Result))
channels << input_channel

until channels.empty?
channel_index, data = Channel.select(channels.map &.receive_select_action)
Log.logger.info "data received"
Log.logger.info channel_index
Log.logger.info data

if channels[channel_index] == input_channel
if data.nil?
Log.logger.info "Input closed, no more data to come in current channel..."
Copy link
Contributor

@bew bew Mar 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no more data to come from the client ?
Because current channel means nothing to me

channels.delete_at(channel_index)
next
end

content = data.as(String)
result_channel = Channel(Array(Result)).new

spawn handle_request(content, result_channel)

channels << result_channel
else
results = data.as(Array(Result))
Log.logger.info "Printing result output..."
Log.logger.info results
response = Response.new(results)
response.write(writer)
channels.delete_at(channel_index)
end
end
rescue ex
Log.logger.error(ex.inspect_with_backtrace) unless Log.logger.nil?
ensure
Log.logger.info("...your session has ended")
Log.logger.close unless Log.logger.nil?
end
end
end