Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows: Event loop based on IOCP #12149

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/std/concurrent/select_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ describe "select" do
x.should eq 2
end

it "stress select with send/receive in multiple fibers" do
pending_win32 "stress select with send/receive in multiple fibers" do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTR in theory nothing is preventing this to work, all of the ingredients are there, but it's still failing.

fibers = 4
msg_per_sender = 1000
ch = Array.new(fibers) { Array.new(fibers) { Channel(Int32).new } }
Expand Down
37 changes: 36 additions & 1 deletion spec/std/socket/tcp_socket_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe TCPSocket, tags: "network" do
end
end

pending_win32 "sync from server" do
it "sync from server" do
port = unused_local_port

TCPServer.open("::", port) do |server|
Expand Down Expand Up @@ -155,4 +155,39 @@ describe TCPSocket, tags: "network" do
end
end
end

it "sends and receives messages" do
port = unused_local_port

channel = Channel(Exception?).new
spawn do
TCPServer.open("::", port) do |server|
channel.send nil
sock = server.accept
sock.read_timeout = 3.second
sock.write_timeout = 3.second

sock.gets(4).should eq("ping")
sock << "pong"
channel.send nil
end
rescue exc
channel.send exc
end

if exc = channel.receive
raise exc
end

TCPSocket.open("localhost", port) do |client|
client.read_timeout = 3.second
client.write_timeout = 3.second
client << "ping"
client.gets(4).should eq("pong")
end

if exc = channel.receive
raise exc
end
end
end
1 change: 1 addition & 0 deletions spec/std/socket/unix_server_spec.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% skip_file if flag?(:win32) %}
require "../spec_helper"
require "socket"
require "../../support/fibers"
Expand Down
1 change: 1 addition & 0 deletions spec/std/socket/unix_socket_spec.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% skip_file if flag?(:win32) %}
require "spec"
require "socket"
require "../../support/tempfile"
Expand Down
4 changes: 2 additions & 2 deletions spec/win32_std_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require "./std/big/number_spec.cr"
require "./std/bit_array_spec.cr"
require "./std/bool_spec.cr"
require "./std/box_spec.cr"
# require "./std/channel_spec.cr" (failed codegen)
require "./std/channel_spec.cr"
require "./std/char/reader_spec.cr"
require "./std/char_spec.cr"
require "./std/class_spec.cr"
Expand All @@ -27,7 +27,7 @@ require "./std/compress/zip/zip_spec.cr"
require "./std/compress/zlib/reader_spec.cr"
require "./std/compress/zlib/stress_spec.cr"
require "./std/compress/zlib/writer_spec.cr"
# require "./std/concurrent/select_spec.cr" (failed to run)
require "./std/concurrent/select_spec.cr"
require "./std/concurrent_spec.cr"
require "./std/crypto/bcrypt/base64_spec.cr"
require "./std/crypto/bcrypt/password_spec.cr"
Expand Down
34 changes: 28 additions & 6 deletions src/crystal/system/win32/event_loop_iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,29 @@ module Crystal::EventLoop
next_event = @@queue.min_by { |e| e.wake_at }

if next_event
sleep_time = next_event.wake_at - Time.monotonic
now = Time.monotonic

if sleep_time > Time::Span.zero
LibC.Sleep(sleep_time.total_milliseconds)
if next_event.wake_at > now
sleep_time = next_event.wake_at - now
timed_out = IO::Overlapped.wait_queued_completions(sleep_time.total_milliseconds) do |fiber|
Crystal::Scheduler.enqueue fiber
end

return unless timed_out
end

dequeue next_event

Crystal::Scheduler.enqueue next_event.fiber
fiber = next_event.fiber

unless fiber.dead?
if next_event.timeout? && (select_action = fiber.timeout_select_action)
fiber.timeout_select_action = nil
select_action.time_expired(fiber)
else
Crystal::Scheduler.enqueue fiber
end
end
else
Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n"
::exit
Expand Down Expand Up @@ -80,21 +94,29 @@ module Crystal::EventLoop
def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event
Crystal::Event.new(Fiber.current)
end

def self.create_timeout_event(fiber)
Crystal::Event.new(fiber, timeout: true)
end
end

struct Crystal::Event
getter fiber
getter wake_at
getter? timeout

def initialize(@fiber : Fiber)
@wake_at = Time.monotonic
def initialize(@fiber : Fiber, @wake_at = Time.monotonic, *, @timeout = false)
end

# Frees the event
def free : Nil
Crystal::EventLoop.dequeue(self)
end

def delete
free
end

def add(time_span : Time::Span) : Nil
@wake_at = Time.monotonic + time_span
Crystal::EventLoop.enqueue(self)
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/win32/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ module Crystal::System::Socket
private def unbuffered_read(slice : Bytes)
wsabuf = wsa_buffer(slice)

bytes_read = overlapped_read(fd, "WSARecv") do |overlapped|
bytes_read = overlapped_operation(fd, "WSARecv", read_timeout, connreset_is_error: false) do |overlapped|
flags = 0_u32
LibC.WSARecv(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil)
end
Expand Down
Loading