From db28365132e9ba9c6742a191cce8c0fe42aa9707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Tue, 6 Apr 2021 00:42:05 +0200 Subject: [PATCH 1/6] Implement event loop stub for win32 --- spec/std/concurrent_spec.cr | 18 ++++++++++ src/crystal/system/win32/event_loop_iocp.cr | 39 +++++++++++++-------- src/windows_stubs.cr | 8 ----- 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index d3b8994f0768..eec86ac08bed 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -66,4 +66,22 @@ describe "concurrent" do it "accepts method call with receiver" do typeof(spawn String.new) end + + it "schedules intermitting sleeps" do + chan = Channel(Int32).new + spawn do + 3.times do |i| + sleep 20.milliseconds + chan.send (i + 1) + end + end + spawn do + 2.times do |i| + sleep 50.milliseconds + chan.send (i + 1) * 10 + end + end + + Array(Int32).new(5, 0).map!{ chan.receive }.should eq [1,2,10,3,20] + end end diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 1a80eab6c1b2..723baa07fc52 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -1,14 +1,22 @@ require "crystal/system/print_error" module Crystal::EventLoop - @@queue = Deque(Fiber).new + @@queue = Deque(Event).new # Runs the event loop. def self.run_once : Nil - next_fiber = @@queue.pop? + next_event = @@queue.min_by { |e| e.wake_at } - if next_fiber - Crystal::Scheduler.enqueue next_fiber + if next_event + sleep_time = next_event.wake_at - Time.monotonic + + if sleep_time > Time::Span.zero + LibC.Sleep(sleep_time.milliseconds) + end + + dequeue next_event + + Crystal::Scheduler.enqueue next_event.fiber else Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n" ::exit @@ -19,20 +27,18 @@ module Crystal::EventLoop def self.after_fork : Nil end - def self.enqueue(fiber : Fiber) - unless @@queue.includes?(fiber) - @@queue << fiber + def self.enqueue(event : Event) + unless @@queue.includes?(event) + @@queue << event end end - def self.dequeue(fiber : Fiber) - @@queue.delete(fiber) + def self.dequeue(event : Event) + @@queue.delete(event) end # Create a new resume event for a fiber. def self.create_resume_event(fiber : Fiber) : Crystal::Event - enqueue(fiber) - Crystal::Event.new(fiber) end @@ -48,15 +54,20 @@ module Crystal::EventLoop end struct Crystal::Event + getter fiber + getter wake_at + def initialize(@fiber : Fiber) + @wake_at = Time.monotonic end # Frees the event def free : Nil - Crystal::EventLoop.dequeue(@fiber) + Crystal::EventLoop.dequeue(self) end - def add(time_span : Time::Span?) : Nil - Crystal::EventLoop.enqueue(@fiber) + def add(time_span : Time::Span) : Nil + @wake_at = Time.monotonic + time_span + Crystal::EventLoop.enqueue(self) end end diff --git a/src/windows_stubs.cr b/src/windows_stubs.cr index d55c53099c4f..01783ab9e71b 100644 --- a/src/windows_stubs.cr +++ b/src/windows_stubs.cr @@ -89,11 +89,3 @@ end enum Signal KILL = 0 end - -def sleep(seconds : Number) - sleep(seconds.seconds) -end - -def sleep(time : Time::Span) - LibC.Sleep(time.total_milliseconds) -end From d8337ac8012655ae2680c9c76c896d08796bdfb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Tue, 6 Apr 2021 14:24:36 +0200 Subject: [PATCH 2/6] crystal tool format --- spec/std/concurrent_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index eec86ac08bed..53dcff261070 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -82,6 +82,6 @@ describe "concurrent" do end end - Array(Int32).new(5, 0).map!{ chan.receive }.should eq [1,2,10,3,20] + Array(Int32).new(5, 0).map! { chan.receive }.should eq [1, 2, 10, 3, 20] end end From 16f2101ca8ad888beaa3657bfa311f61a2e800aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Tue, 6 Apr 2021 19:28:33 +0200 Subject: [PATCH 3/6] Update src/crystal/system/win32/event_loop_iocp.cr Co-authored-by: Sijawusz Pur Rahnama --- src/crystal/system/win32/event_loop_iocp.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 723baa07fc52..f1bd1f4be697 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -11,7 +11,7 @@ module Crystal::EventLoop sleep_time = next_event.wake_at - Time.monotonic if sleep_time > Time::Span.zero - LibC.Sleep(sleep_time.milliseconds) + LibC.Sleep(sleep_time.total_milliseconds) end dequeue next_event From dd91a27cf5c6e5210282954d9214e731ccf55b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Tue, 6 Apr 2021 20:31:51 +0200 Subject: [PATCH 4/6] Increase sleep time in concurrent spec --- spec/std/concurrent_spec.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index 53dcff261070..01b3c8788e7d 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -71,13 +71,13 @@ describe "concurrent" do chan = Channel(Int32).new spawn do 3.times do |i| - sleep 20.milliseconds + sleep 40.milliseconds chan.send (i + 1) end end spawn do 2.times do |i| - sleep 50.milliseconds + sleep 100.milliseconds chan.send (i + 1) * 10 end end From 731ca69fa4b83c3bd42c6803ba0eaa114a6fa839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Tue, 6 Apr 2021 21:19:52 +0200 Subject: [PATCH 5/6] Update spec/std/concurrent_spec.cr Co-authored-by: Sijawusz Pur Rahnama --- spec/std/concurrent_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index 01b3c8788e7d..96e83d1e363e 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -82,6 +82,6 @@ describe "concurrent" do end end - Array(Int32).new(5, 0).map! { chan.receive }.should eq [1, 2, 10, 3, 20] + Array(Int32).new(5) { chan.receive }.should eq [1, 2, 10, 3, 20] end end From c343a8afd1bdf40a95274b26e4b1780ec311c3b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Fri, 28 May 2021 22:52:01 +0200 Subject: [PATCH 6/6] Disable spec on darwin --- spec/std/concurrent_spec.cr | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index 96e83d1e363e..644c79310aa8 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -67,21 +67,26 @@ describe "concurrent" do typeof(spawn String.new) end - it "schedules intermitting sleeps" do - chan = Channel(Int32).new - spawn do - 3.times do |i| - sleep 40.milliseconds - chan.send (i + 1) + {% if flag?(:darwin) %} + pending "schedules intermitting sleeps" + # TODO: This spec fails on darwin, even with highly increased sleep times. Needs investigation. + {% else %} + it "schedules intermitting sleeps" do + chan = Channel(Int32).new + spawn do + 3.times do |i| + sleep 40.milliseconds + chan.send (i + 1) + end end - end - spawn do - 2.times do |i| - sleep 100.milliseconds - chan.send (i + 1) * 10 + spawn do + 2.times do |i| + sleep 100.milliseconds + chan.send (i + 1) * 10 + end end - end - Array(Int32).new(5) { chan.receive }.should eq [1, 2, 10, 3, 20] - end + Array(Int32).new(5) { chan.receive }.should eq [1, 2, 10, 3, 20] + end + {% end %} end