-
Notifications
You must be signed in to change notification settings - Fork 31
Conversation
I did some tests, but looks like requests are still being processed in a synchronized way 😅 |
If the requests you're trying don't have any code that yields the fiber it will behave like that. |
@laginha87 Maybe we can try to use |
@faustinoaq Are you talking about changing the server to communicate with LSP clients using IPC instead of STDIO, or something else? I'm a little confused. |
Hi @keplersj, I think scry server should be able to process multiple request in a concurrent way. Currently if I request an method implementation, I can't get diagnostics or format my code until first request (go to definition) has finished. This PR isn't working like I thought. I think I don't understand crystal concurrency yet 😅 |
Ah. That makes sense. How about refactoring to use a event emitter architecture that would compliment the event based nature of LSP? This looks like a promising dependency that would be helpful: hugoabonizio/event_emitter.cr. |
src/scry.cr
Outdated
end | ||
|
||
select | ||
when n = next_request.receive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't work, because you're blocking here!
select
basically waits for any Channel or IO to unlock, but here you're waiting for only 1 channel.
You probably want to wait on the channels of all the requests that are currently being executed (in their own fiber) + on STDIN, so that as soon as you get a new request, STDIN will unlock, and you'll read the request, spawn a fiber etc.. And when a processing fiber finished, it writes to its channel, the channel unlocks in the select, and you can send back the result..
Not sure if it's clear enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @bew for your comment 👍
I'm a bit confused, can you show me a minimal example?
Do you mean something like this:
loop do
content = Request.new(STDIN).read # Is this blocking?
spawn do
# do something with content
end
# Where should the channels go?
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@faustinoaq I think what @bew means is select
is blocking, so it won't unblock until the channel receives a message, which is why you are seeing synchronous behavior, since it won't unblock until the request is done processing and you do channel.send
. You can also wait on STDIN in the select so it will unlock when a new request comes in and can spawn the new fiber there.
Not sure how an event emitting architecture would help, I think the correct word is "asynchronous RPC" or sth like that. |
I believe that a event emitter architecture could help this project as it's the architecture used in the main implementations by Microsoft. See |
Not sure I understand what you mean @keplersj, from my understanding the Scry server just needs to wait for requests from a text editor (on stdin), and write back a response (on stdout). As currently the requests are synchronous, only one request can be handled at a time. So the improvement would be to be able to handle many requests at the same time, via asynchronous responses. Where does events can be useful here? |
@faustinoaq I did an example of what I'm talking about: https://gist.github.com/bew/9088cf265bd0ea97ddb89dad7926fd3c I noticed that the keyword |
@bew I apologize. I seems I was very confused about what this discussion was about. As you correctly noticed, I was thinking from the perspective of making scry more asynchronous in the general sense and not thinking about asynchronous request handling. Thank you for your patience. |
src/scry/concurrent_rpc.cr
Outdated
|
||
if channels[channel_index] == input_channel | ||
if data.nil? | ||
Log.logger.info "Input closed, no more data to come in current channel..." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no more data to come from the client
?
Because current channel
means nothing to me
src/scry/concurrent_rpc.cr
Outdated
@@ -0,0 +1,69 @@ | |||
module Scry | |||
class ConcurrentRpc | |||
CONTEXT = Context.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it's safe to have the context global to all the fibers?
(not sure what the Context object does in scry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well,I tried to use @context
as well, but still doesn't work, see comment below 👇
Hi @bew I tried all ways I know 😅 , your code was very helpful, I discovered other ways as well, but none of them works with scry. Looks like LSP is forcing the request to be sync and receive something before writing to I tried some other code as well:
Just output this:
No initialize request output/received message
With this I can handle the initialize request but nothing more: def initialize
content = Request.new(STDIN).read
request = Message.new(content).parse
process(request)
end
def requests
loop do
content = Request.new(STDIN).read
request = Message.new(content).parse
spawn process(request)
end
end
def process(request)
results = @context.dispatch(request)
rescue ex
results = [ResponseMessage.new(ex)]
ensure
result_channel = Channel(Array(Result)).new
result_channel.send [results].flatten
@channels << result_channel
end
def run
spawn requests
loop do
index, results = Channel.select(@channels.map &.receive_select_action)
@channels.delete_at(index)
if results
response = Response.new(results)
response.write(STDOUT)
elsif @channels.empty?
break
end
end
rescue ex
Log.logger.error(ex.inspect_with_backtrace) unless Log.logger.nil?
ensure
Log.logger.info("...your session has ended")
Log.logger.close unless Log.logger.nil?
end
module Scry
struct ConcurrentRpc
private def read(io)
content = Request.new(io).read
Message.new(content).parse
end
private def write(io, results)
response = Response.new(results)
response.write(io)
end
def run
context = Context.new
chan = Channel(Array(Result)).new
flag = Channel(Nil).new
spawn do
loop do
request = read(STDIN)
spawn do
results = context.dispatch(request)
rescue ex
results = [ResponseMessage.new(ex)]
ensure
chan.send [results].flatten
end
select
when flag.receive
sleep 1.millisecond
else
sleep 1.millisecond
end
end
end
loop do
results = chan.receive
write(STDOUT, results)
spawn do
flag.send nil
end
end
rescue ex
Log.logger.error(ex.inspect_with_backtrace) unless Log.logger.nil?
ensure
Log.logger.info("...your session has ended")
Log.logger.close unless Log.logger.nil?
end
end
end I don't understand why scry and LSP client isn't allowing async request, I tried some basic crystal code and looks fine for me: chan = Channel(String).new
spawn do
loop do
request = gets.to_s
r = rand(10)
spawn do
puts "#{request} spawn sleeping #{r}"
sleep r
results = request.upcase
chan.send results
end
end
end
loop do
results = chan.receive
puts results
end Outputs:
I tried some code using a Maybe it happens because scry |
Travis is passing because we're testing individual components, but when you try the full compiled binary with a LSP client to process async requests it doesn't work |
I've found in the specification that it can be parallelized, but can be 'dangerous' in a way when there are destructive requests in the pipeline:
From https://microsoft.github.io/language-server-protocol/specification#actual-protocol Maybe there is a list of non-destructive requests that can be parallelized safely? At least we should not forget that 'minor' behavior. |
I think we should "parallelize" diagnostics and implementation, those 2 features are pretty slow in some projects. Formatting, symbol listing and completion are fast enough to keep these 3 features synchronized. So:
In fact I think we should use BTW, we're already using crystal command here: scry/src/scry/environment_config.cr Line 25 in 1bff42e
@keplersj @kofno @bew @bmulvihill @laginha87 WDYT? |
So, I think this PR is a bit useless since concurrent requests are more complex to implement 😅 |
Should I close this? 😅 Seems that parallel requests need other approach 😅 |
fa2a17a
to
1c3ccfa
Compare
Sorry, i didn't read everything carefully, but i think it's possible to parallelize using Fibers and Channels. Have you tried having a worker pool, with some workers that process the requests the same way, and 2 Channels, 1 for requests and 1 for results? |
@simaoneves there are features that complicate implementing this by using a simple solution of worker pools with fibers that come from the language server protocol:
|
@simaoneves I tried something similar using multiple channels, but I couldn't get it working. If you have some code example or idea, please try this PR with your LSP client and check |
1700145
to
5c983db
Compare
Oh, I just realize Scry requires concurrent request to support So, this 👇 code doesn't work yet. because requests are executed sequentially # context.cr
# Used by:
# - $/cancelRequest
private def dispatch_notification(params : CancelParams, msg)
Log.logger.debug("PROCESSES: #{Scry::PROCESSES}")
Scry::PROCESSES.each do |owner, process|
if process.terminated?
PROCESSES.delete(process)
elsif owner.to_s == params.id.to_s
begin
process.kill
code = Scry::ErrorCodes::RequestCancelled.value
error = ResponseError.new("request #{params.id} has been cancelled sucessfully", nil, code)
return ResponseMessage.new(error)
rescue ex
return ResponseMessage.new(ex)
end
end
end
end Some process can freeze the server because |
I gonna close this, I already implemented this successfully in my local machine, although, several LSP clients don't support this. Also the output becomes a headache because parallel responses get mixed up inside the io. We can reopen this in the future, though 😉 |
This PR add concurrent channels to avoid blocking request, so we can list symbols, while searching for a implementation, or format a file while searching for semantic errors, and so on...
Based in this pseudo-code:
This change had been suggested by @kofno here and @laginha87 here