Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatch queue #215

Merged
merged 1 commit into from
Jan 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 11 additions & 46 deletions watcher_fsevents_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ package notify

/*
#include <CoreServices/CoreServices.h>
#include <dispatch/dispatch.h>

typedef void (*CFRunLoopPerformCallBack)(void*);

void gosource(void *);
void gostream(uintptr_t, uintptr_t, size_t, uintptr_t, uintptr_t, uintptr_t);

static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintptr_t info, CFArrayRef paths, FSEventStreamEventId since, CFTimeInterval latency, FSEventStreamCreateFlags flags) {
Expand All @@ -27,7 +25,6 @@ import "C"
import (
"errors"
"os"
"runtime"
"sync"
"sync/atomic"
"unsafe"
Expand All @@ -42,45 +39,18 @@ var (
since = uint64(C.FSEventsGetCurrentEventId())
)

var runloop C.CFRunLoopRef // global runloop which all streams are registered with
var wg sync.WaitGroup // used to wait until the runloop starts

// source is used for synchronization purposes - it signals when runloop has
// started and is ready via the wg. It also serves purpose of a dummy source,
// thanks to it the runloop does not return as it also has at least one source
// registered.
var source = C.CFRunLoopSourceCreate(C.kCFAllocatorDefault, 0, &C.CFRunLoopSourceContext{
perform: (C.CFRunLoopPerformCallBack)(C.gosource),
})
// global dispatch queue which all streams are registered with
var q C.dispatch_queue_t = C.dispatch_queue_create(
C.CString("com.github.rjeczalik.notify"),
(C.dispatch_queue_attr_t)(C.DISPATCH_QUEUE_SERIAL),
)

// Errors returned when FSEvents functions fail.
var (
errCreate = os.NewSyscallError("FSEventStreamCreate", errors.New("NULL"))
errStart = os.NewSyscallError("FSEventStreamStart", errors.New("false"))
)

// initializes the global runloop and ensures any created stream awaits its
// readiness.
func init() {
wg.Add(1)
go func() {
// There is exactly one run loop per thread. Lock this goroutine to its
// thread to ensure that it's not rescheduled on a different thread while
// setting up the run loop.
runtime.LockOSThread()
runloop = C.CFRunLoopGetCurrent()
C.CFRunLoopAddSource(runloop, source, C.kCFRunLoopDefaultMode)
C.CFRunLoopRun()
panic("runloop has just unexpectedly stopped")
}()
C.CFRunLoopSourceSignal(source)
}

//export gosource
func gosource(unsafe.Pointer) {
wg.Done()
}

//export gostream
func gostream(_, info uintptr, n C.size_t, paths, flags, ids uintptr) {
const (
Expand Down Expand Up @@ -143,8 +113,7 @@ func (r *streamFuncRegistry) delete(id uintptr) {
delete(r.m, id)
}

// Stream represents single watch-point which listens for events scheduled by
// the global runloop.
// Stream represents a single watch-point which listens for events scheduled on the global dispatch queue.
type stream struct {
path string
ref C.FSEventStreamRef
Expand All @@ -160,39 +129,35 @@ func newStream(path string, fn streamFunc) *stream {
}
}

// Start creates a FSEventStream for the given path and schedules it with
// global runloop. It's a nop if the stream was already started.
// Start creates a FSEventStream for the given path and schedules on the global dispatch queue.
// It's a nop if the stream was already started.
func (s *stream) Start() error {
if s.ref != nilstream {
return nil
}
wg.Wait()
p := C.CFStringCreateWithCStringNoCopy(C.kCFAllocatorDefault, C.CString(s.path), C.kCFStringEncodingUTF8, C.kCFAllocatorDefault)
path := C.CFArrayCreate(C.kCFAllocatorDefault, (*unsafe.Pointer)(unsafe.Pointer(&p)), 1, nil)
ctx := C.FSEventStreamContext{}
ref := C.EventStreamCreate(&ctx, C.uintptr_t(s.info), path, C.FSEventStreamEventId(atomic.LoadUint64(&since)), latency, flags)
if ref == nilstream {
return errCreate
}
C.FSEventStreamScheduleWithRunLoop(ref, runloop, C.kCFRunLoopDefaultMode)
C.FSEventStreamSetDispatchQueue(ref, q)
if C.FSEventStreamStart(ref) == C.Boolean(0) {
C.FSEventStreamInvalidate(ref)
return errStart
}
C.CFRunLoopWakeUp(runloop)
s.ref = ref
return nil
}

// Stop stops underlying FSEventStream and unregisters it from global runloop.
// Stop stops underlying FSEventStream and unregisters it from the global dispatch queue.
func (s *stream) Stop() {
if s.ref == nilstream {
return
}
wg.Wait()
C.FSEventStreamStop(s.ref)
C.FSEventStreamInvalidate(s.ref)
C.CFRunLoopWakeUp(runloop)
s.ref = nilstream
streamFuncs.delete(s.info)
}