Skip to content

Commit

Permalink
Merge pull request #32 from moul/dev/moul/future
Browse files Browse the repository at this point in the history
  • Loading branch information
moul authored Dec 29, 2020
2 parents f809d12 + d3a4840 commit 5fe5fb3
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 3 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func CaptureStdoutAndStderr() (func() string, error)
CaptureStdoutAndStderr temporarily pipes os.Stdout and os.Stderr into a
buffer.
func CheckErr(err error)
CheckErr panics if the passed error is not nil.
func CombineFuncs(left func(), right ...func()) func()
CombineFuncs create a chain of functions. This can be particularly useful
for creating cleanup function progressively. It solves the infinite loop you
Expand All @@ -64,9 +67,16 @@ func ExecStandaloneOutputs(cmd *exec.Cmd) ([]byte, []byte, error)
standard error.
func ExpandUser(path string) (string, error)
func FanIn(chans ...<-chan interface{}) <-chan interface{}
FanIn merges multiple input chans events into one.
func FileExists(path string) bool
FileExists checks whether a path exists and is a regular file.
func Future(fn func() (interface{}, error)) <-chan FutureRet
Future starts running the given function in background and return a chan
that will return the result of the execution.
func JSON(input interface{}) string
JSON returns a JSON representation of the passed input.
Expand Down Expand Up @@ -134,6 +144,12 @@ func WaitForCtrlC()
TYPES
type FutureRet struct {
Ret interface{}
Err error
}
FutureRet is a generic struct returned by Future.
type MutexMap struct {
// Has unexported fields.
}
Expand All @@ -154,6 +170,7 @@ type UniqueChild interface {
auto-kill itself when its context is done.
func NewUniqueChild(ctx context.Context) UniqueChild
NewUniqueChild instantiates and returns a UniqueChild manager.
```

Expand Down
31 changes: 30 additions & 1 deletion concurrency.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package u

import "context"
import (
"context"
"sync"
)

// UniqueChild is a goroutine manager (parent) that can only have one child at a time.
// When you call UniqueChild.SetChild(), UniqueChild cancels the previous child context (if any), then run a new child.
Expand All @@ -10,6 +13,7 @@ type UniqueChild interface {
CloseChild()
}

// NewUniqueChild instantiates and returns a UniqueChild manager.
func NewUniqueChild(ctx context.Context) UniqueChild { return &uniqueChild{ctx: ctx} }

type uniqueChild struct {
Expand All @@ -33,3 +37,28 @@ func (parent *uniqueChild) CloseChild() {
parent.lastChildCancelFn()
}
}

// FanIn merges multiple input chans events into one.
func FanIn(chans ...<-chan interface{}) <-chan interface{} {
merged := make(chan interface{})
var wg sync.WaitGroup
wg.Add(len(chans))

output := func(c <-chan interface{}) {
for item := range c {
merged <- item
}
wg.Done()
}

for _, ch := range chans {
go output(ch)
}

go func() {
wg.Wait()
close(merged)
}()

return merged
}
45 changes: 45 additions & 0 deletions concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package u_test
import (
"context"
"fmt"
"sort"
"strings"
"time"

"moul.io/u"
Expand Down Expand Up @@ -81,3 +83,46 @@ func ExampleUniqueChild_CloseChild() {

// Output: A
}

func ExampleFanIn() {
ch1 := make(chan interface{})
ch2 := make(chan interface{})
ch3 := make(chan interface{})
merged := u.FanIn(ch1, ch2, ch3)
done := make(chan bool)
received := []string{}

go func() {
for item := range merged {
fmt.Println("tick")
received = append(received, fmt.Sprintf("%v", item))
}
done <- true
}()

ch1 <- 1
ch2 <- 2
ch3 <- 3
close(ch1)
ch2 <- 4
ch2 <- 5
ch3 <- 6
close(ch2)
ch3 <- 7
close(ch3)

<-done

sort.Strings(received)
fmt.Println(strings.Join(received, ", "))

// Output:
// tick
// tick
// tick
// tick
// tick
// tick
// tick
// 1, 2, 3, 4, 5, 6, 7
}
2 changes: 0 additions & 2 deletions depaware.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ moul.io/u dependencies: (generated by github.com/tailscale/depaware)
path from archive/zip
path/filepath from io/ioutil+
reflect from encoding/binary+
LD runtime/cgo
sort from compress/flate+
strconv from compress/flate+
strings from archive/zip+
Expand All @@ -39,4 +38,3 @@ moul.io/u dependencies: (generated by github.com/tailscale/depaware)
unicode from bytes+
unicode/utf16 from encoding/json+
unicode/utf8 from archive/zip+
unsafe from hash/crc32+
16 changes: 16 additions & 0 deletions pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,19 @@ func CheckErr(err error) {
panic(err)
}
}

// Future starts running the given function in background and return a chan that will return the result of the execution.
func Future(fn func() (interface{}, error)) <-chan FutureRet {
c := make(chan FutureRet, 1)
go func() {
ret, err := fn()
c <- FutureRet{Ret: ret, Err: err}
}()
return c
}

// FutureRet is a generic struct returned by Future.
type FutureRet struct {
Ret interface{}
Err error
}
18 changes: 18 additions & 0 deletions pattern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package u_test
import (
"fmt"
"net/http"
"time"

"moul.io/u"
)
Expand All @@ -19,3 +20,20 @@ func ExampleCheckErr() {
_, err := http.Get("http://foo.bar")
u.CheckErr(err) // panic
}

func ExampleFuture() {
future := u.Future(func() (interface{}, error) {
time.Sleep(100 * time.Millisecond)
return "foobar", nil
})

// here, we can do some stuff

ret := <-future
fmt.Println("Ret:", ret.Ret)
fmt.Println("Err:", ret.Err)

// Output:
// Ret: foobar
// Err: <nil>
}

0 comments on commit 5fe5fb3

Please sign in to comment.