Skip to content
This repository has been archived by the owner on Jun 1, 2023. It is now read-only.

Implements concurrent requests #48

Closed
wants to merge 12 commits into from

Conversation

faustinoaq
Copy link
Member

This PR add concurrent channels to avoid blocking request, so we can list symbols, while searching for a implementation, or format a file while searching for semantic errors, and so on...

Based in this pseudo-code:

ch = Channel(Nil).new

loop do
  spawn do
    i = rand(10)
    puts "Receiving and processing request of #{i} seconds..."
    sleep i
    ch.send nil
  end

  select
  when ch.receive
    puts "Request has been finished!"
    next
  else
    puts "sleeping..."
    sleep 1
  end
end

This change had been suggested by @kofno here and @laginha87 here

@faustinoaq faustinoaq requested a review from a team February 25, 2018 15:06
@faustinoaq
Copy link
Member Author

I did some tests, but looks like requests are still being processed in a synchronized way 😅

@laginha87
Copy link
Contributor

If the requests you're trying don't have any code that yields the fiber it will behave like that.
I think the only operation that might yield automatically in scry is file reads.

@faustinoaq
Copy link
Member Author

@laginha87 Maybe we can try to use http instead of STDIN to comunicate a client with scry server, WDYT?

@keplersj
Copy link
Contributor

@faustinoaq Are you talking about changing the server to communicate with LSP clients using IPC instead of STDIO, or something else? I'm a little confused.

@faustinoaq
Copy link
Member Author

Are you talking about changing the server to communicate with LSP clients using IPC instead of STDIO, or something else?

Hi @keplersj, I think scry server should be able to process multiple request in a concurrent way. Currently if I request an method implementation, I can't get diagnostics or format my code until first request (go to definition) has finished.

This PR isn't working like I thought. I think I don't understand crystal concurrency yet 😅

@keplersj
Copy link
Contributor

Ah. That makes sense. How about refactoring to use a event emitter architecture that would compliment the event based nature of LSP? This looks like a promising dependency that would be helpful: hugoabonizio/event_emitter.cr.

src/scry.cr Outdated
end

select
when n = next_request.receive
Copy link
Contributor

@bew bew Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't work, because you're blocking here!

select basically waits for any Channel or IO to unlock, but here you're waiting for only 1 channel.

You probably want to wait on the channels of all the requests that are currently being executed (in their own fiber) + on STDIN, so that as soon as you get a new request, STDIN will unlock, and you'll read the request, spawn a fiber etc.. And when a processing fiber finished, it writes to its channel, the channel unlocks in the select, and you can send back the result..

Not sure if it's clear enough.

Copy link
Member Author

@faustinoaq faustinoaq Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @bew for your comment 👍

I'm a bit confused, can you show me a minimal example?

Do you mean something like this:

loop do
  content = Request.new(STDIN).read # Is this blocking?
  spawn do
    # do something with content
  end
  # Where should the channels go?
end

Copy link
Contributor

@bmulvihill bmulvihill Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faustinoaq I think what @bew means is select is blocking, so it won't unblock until the channel receives a message, which is why you are seeing synchronous behavior, since it won't unblock until the request is done processing and you do channel.send. You can also wait on STDIN in the select so it will unlock when a new request comes in and can spawn the new fiber there.

@bew
Copy link
Contributor

bew commented Feb 27, 2018

Not sure how an event emitting architecture would help, I think the correct word is "asynchronous RPC" or sth like that.

@keplersj
Copy link
Contributor

I believe that a event emitter architecture could help this project as it's the architecture used in the main implementations by Microsoft. See interface Connection in vscode-languageserver-node/server. This architecture helps aid concurrency in handling requests and is familiar to anyone experienced with the protocol from the VSCode and Atom side of things where this event driven architecture is used to drive client creation.

@bew
Copy link
Contributor

bew commented Feb 27, 2018

Not sure I understand what you mean @keplersj, from my understanding the Scry server just needs to wait for requests from a text editor (on stdin), and write back a response (on stdout).

As currently the requests are synchronous, only one request can be handled at a time. So the improvement would be to be able to handle many requests at the same time, via asynchronous responses.

Where does events can be useful here?
Or are you talking in general sense for the internal handling of a specific request? (In this case the 2 approaches are complementary IMO)

@bew
Copy link
Contributor

bew commented Feb 27, 2018

@faustinoaq I did an example of what I'm talking about: https://gist.github.com/bew/9088cf265bd0ea97ddb89dad7926fd3c

I noticed that the keyword select wasn't going to work, because with the keyword you can't specify an array for all the channels you want to monitor. So I fell back to raw Channel.select (what the select keyword uses under-the-hood).

@keplersj
Copy link
Contributor

@bew I apologize. I seems I was very confused about what this discussion was about. As you correctly noticed, I was thinking from the perspective of making scry more asynchronous in the general sense and not thinking about asynchronous request handling. Thank you for your patience.


if channels[channel_index] == input_channel
if data.nil?
Log.logger.info "Input closed, no more data to come in current channel..."
Copy link
Contributor

@bew bew Mar 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no more data to come from the client ?
Because current channel means nothing to me

@@ -0,0 +1,69 @@
module Scry
class ConcurrentRpc
CONTEXT = Context.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's safe to have the context global to all the fibers?

(not sure what the Context object does in scry)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well,I tried to use @context as well, but still doesn't work, see comment below 👇

@faustinoaq
Copy link
Member Author

faustinoaq commented Mar 4, 2018

Hi @bew I tried all ways I know 😅 , your code was very helpful, I discovered other ways as well, but none of them works with scry. Looks like LSP is forcing the request to be sync and receive something before writing to STDIN again, because when I try to read STDIN without waiting STDOUT (using spawn to handle the request in the background) the channel receive get blocked, none is received.

I tried some other code as well:

  1. Code based on comment suggestion: 97b44dc

Just output this:

I, [2018-03-03 20:13:35 -05:00 #23054]  INFO -- : Scry is looking into your code...
D, [2018-03-03 20:13:35 -05:00 #23054] DEBUG -- : Content-Length: 1156
D, [2018-03-03 20:13:35 -05:00 #23054] DEBUG -- : 
D, [2018-03-03 20:13:35 -05:00 #23054] DEBUG -- : {"jsonrpc":"2.0","id":0,"method":"initialize","params":{"processId":23010,"rootPath":"/home/main/Projects/foo",

No initialize request output/received message

  1. Code trying to use one channel: (I had to use a initialize sync code to get it working)

With this I can handle the initialize request but nothing more:

def initialize
  content = Request.new(STDIN).read
  request = Message.new(content).parse
  process(request)
end

def requests
  loop do
    content = Request.new(STDIN).read
    request = Message.new(content).parse
    spawn process(request)
  end
end

def process(request)
  results = @context.dispatch(request)
rescue ex
  results = [ResponseMessage.new(ex)]
ensure
  result_channel = Channel(Array(Result)).new
  result_channel.send [results].flatten
  @channels << result_channel
end

def run
  spawn requests

  loop do
    index, results = Channel.select(@channels.map &.receive_select_action)

    @channels.delete_at(index)

    if results
      response = Response.new(results)
      response.write(STDOUT)
    elsif @channels.empty?
      break
    end
  end
rescue ex
  Log.logger.error(ex.inspect_with_backtrace) unless Log.logger.nil?
ensure
  Log.logger.info("...your session has ended")
  Log.logger.close unless Log.logger.nil?
end
  1. I tried some tricks using select to get it working but requests are synchronized as before :
module Scry
  struct ConcurrentRpc
    private def read(io)
      content = Request.new(io).read
      Message.new(content).parse
    end

    private def write(io, results)
      response = Response.new(results)
      response.write(io)
    end

    def run
      context = Context.new
      chan = Channel(Array(Result)).new
      flag = Channel(Nil).new
      spawn do
        loop do
          request = read(STDIN)
          spawn do
            results = context.dispatch(request)
          rescue ex
            results = [ResponseMessage.new(ex)]
          ensure
            chan.send [results].flatten
          end
          select
          when flag.receive
            sleep 1.millisecond
          else
            sleep 1.millisecond
          end
        end
      end
      loop do
        results = chan.receive
        write(STDOUT, results)
        spawn do
          flag.send nil
        end
      end
    rescue ex
      Log.logger.error(ex.inspect_with_backtrace) unless Log.logger.nil?
    ensure
      Log.logger.info("...your session has ended")
      Log.logger.close unless Log.logger.nil?
    end
  end
end

I don't understand why scry and LSP client isn't allowing async request, I tried some basic crystal code and looks fine for me:

chan = Channel(String).new

spawn do
  loop do
    request = gets.to_s
    r = rand(10)
    spawn do
      puts "#{request} spawn sleeping #{r}"
      sleep r
      results = request.upcase
      chan.send results
    end
  end
end

loop do
  results = chan.receive
  puts results
end

Outputs:

a
a spawn sleeping 3
b
b spawn sleeping 2
c
c spawn sleeping 1
C
B
A
d
d spawn sleeping 1
e
e spawn sleeping 7
f
f spawn sleeping 3
D
F
E

I tried some code using a channels array and Channel.select(channels.map &.receive_select_action) and example runs but when I try it on scry doesn't work.

Maybe it happens because scry Request.new(STDIN).read is reading the input twice?, see: https://github.com/crystal-lang-tools/scry/blob/master/src/scry/request.cr#L13 (both headers and content use @io.read)

@faustinoaq
Copy link
Member Author

Travis is passing because we're testing individual components, but when you try the full compiled binary with a LSP client to process async requests it doesn't work

@bew
Copy link
Contributor

bew commented Mar 4, 2018

I've found in the specification that it can be parallelized, but can be 'dangerous' in a way when there are destructive requests in the pipeline:

Responses to requests should be sent in the roughly same order as the requests appear on the server or client side. So for example if a server receives a textDocument/completion request and then a textDocument/signatureHelp request it will usually first return the response for the textDocument/completion and then the response for textDocument/signatureHelp.

However, the server may decide to use a parallel execution strategy and may wish to return responses in a different order than the requests were received. The server may do so as long as this reordering doesn’t affect the correctness of the responses. For example, reordering the result of textDocument/completion and textDocument/signatureHelp is allowed, as these each of these requests usually won’t affect the output of the other. On the other hand, the server most likely should not reorder textDocument/definition and textDocument/rename requests, since the executing the latter may affect the result of the former.

From https://microsoft.github.io/language-server-protocol/specification#actual-protocol

Maybe there is a list of non-destructive requests that can be parallelized safely? At least we should not forget that 'minor' behavior.

@faustinoaq
Copy link
Member Author

I think we should "parallelize" diagnostics and implementation, those 2 features are pretty slow in some projects.

Formatting, symbol listing and completion are fast enough to keep these 3 features synchronized.

So:

feature behavior why?
diagnostics async is slow
implementations async is slow
formatting sync is fast enough
symbols listing sync is fast enough
completion sync is fast enough

In fact I think we should use Process.run or Process.new and execute crystal tool implementations (goto definition) and crystal build --no-codegen (diagnostics) instead of implementing it on pure crystal using require "compiler/crystal/**". This will free up a lot of memory for scry, and will make the scry binary very small.

BTW, we're already using crystal command here:

Process.run("crystal", ["env"], output: io)

@keplersj @kofno @bew @bmulvihill @laginha87 WDYT?

@faustinoaq
Copy link
Member Author

So, I think this PR is a bit useless since concurrent requests are more complex to implement 😅

@faustinoaq
Copy link
Member Author

faustinoaq commented Mar 5, 2018

Should I close this? 😅 Seems that parallel requests need other approach 😅

@faustinoaq faustinoaq force-pushed the master branch 2 times, most recently from fa2a17a to 1c3ccfa Compare March 7, 2018 13:03
@simaoneves
Copy link
Contributor

simaoneves commented Mar 7, 2018

Sorry, i didn't read everything carefully, but i think it's possible to parallelize using Fibers and Channels. Have you tried having a worker pool, with some workers that process the requests the same way, and 2 Channels, 1 for requests and 1 for results?
2 requests arrive, first worker picks the first one in requests channel, second worker picks the second request on the requests channel again.
When they are finish they send the results to results channel (any order). In the main thread you listen the results channel and send the result back to the editor (you can reorder here if you really want to, if you want to respond with the same order the requests came in). Just a thought :)

@laginha87
Copy link
Contributor

laginha87 commented Mar 7, 2018

@simaoneves there are features that complicate implementing this by using a simple solution of worker pools with fibers that come from the language server protocol:

  1. Some requests should be sequential because the second request depends on the output of the first.
  2. Some requests can be cancelable.

@faustinoaq
Copy link
Member Author

@simaoneves I tried something similar using multiple channels, but I couldn't get it working.

If you have some code example or idea, please try this PR with your LSP client and check scry.out file generated on your temp directory.

@faustinoaq
Copy link
Member Author

Oh, I just realize Scry requires concurrent request to support cancelRequest

So, this 👇 code doesn't work yet. because requests are executed sequentially

# context.cr
# Used by:
# - $/cancelRequest
private def dispatch_notification(params : CancelParams, msg)
  Log.logger.debug("PROCESSES: #{Scry::PROCESSES}")
  Scry::PROCESSES.each do |owner, process|
    if process.terminated?
      PROCESSES.delete(process)
    elsif owner.to_s == params.id.to_s
      begin
        process.kill
        code = Scry::ErrorCodes::RequestCancelled.value
        error = ResponseError.new("request #{params.id} has been cancelled sucessfully", nil, code)
        return ResponseMessage.new(error)
      rescue ex
        return ResponseMessage.new(ex)
      end
    end
  end
end

Some process can freeze the server because cancelRequest is blocked by the process itself.

@faustinoaq
Copy link
Member Author

I gonna close this, I already implemented this successfully in my local machine, although, several LSP clients don't support this. Also the output becomes a headache because parallel responses get mixed up inside the io.

We can reopen this in the future, though 😉

@faustinoaq faustinoaq closed this Jun 14, 2018
@faustinoaq faustinoaq deleted the fa/implement-concurrent-requests branch June 14, 2018 13:28
@faustinoaq faustinoaq removed the request for review from a team June 14, 2018 13:28
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants