Skip to content

Commit

Permalink
feat(ants): add initial version of ants (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed May 27, 2024
1 parent c430fa8 commit d0a2280
Show file tree
Hide file tree
Showing 15 changed files with 1,365 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"--fast"
],
"cSpell.words": [
"Alloc",
"Assistable",
"binaryheap",
"bodyclose",
Expand All @@ -16,6 +17,7 @@
"deadcode",
"deepcopy",
"depguard",
"Dietoad",
"dogsled",
"dotenv",
"dupl",
Expand All @@ -24,6 +26,7 @@
"errgroup",
"exportloopref",
"extendio",
"fasthttp",
"fieldalignment",
"fortytw",
"ginkgolinter",
Expand All @@ -49,11 +52,13 @@
"leaktest",
"linters",
"lorax",
"malloc",
"mattn",
"mohae",
"nakedret",
"nolint",
"nolintlint",
"Nonblocking",
"notif",
"onecontext",
"onsi",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Please refer to the documentation already available on [___RxGo___](https://gith

+ unit testing with [Ginkgo](https://onsi.github.io/ginkgo/)/[Gomega](https://onsi.github.io/gomega/)
+ implemented with [🐍 Cobra](https://cobra.dev/) cli framework, assisted by [🐲 Cobrass](https://github.com/snivilised/cobrass)
+ uses a heavily modified version of [🐜🐜🐜 ants](https://github.com/panjf2000/ants)
+ i18n with [go-i18n](https://github.com/nicksnyder/go-i18n)
+ linting configuration and pre-commit hooks, (see: [linting-golang](https://freshman.tech/linting-golang/)).

Expand Down
4 changes: 4 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ tasks:
cmds:
- ginkgo -p ./...

ta:
cmds:
- ginkgo -p ./internal/ants

ti:
cmds:
- go test ./i18n
Expand Down
21 changes: 21 additions & 0 deletions internal/ants/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2018 Andy Pan

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
140 changes: 140 additions & 0 deletions internal/ants/ants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// MIT License

// Copyright (c) 2018 Andy Pan

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package ants

import (
"errors"
"log"
"math"
"os"
"runtime"
"time"
)

const (
// DefaultAntsPoolSize is the default capacity for a default goroutine pool.
DefaultAntsPoolSize = math.MaxInt32

// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = time.Second
)

const (
// OPENED represents that the pool is opened.
OPENED = iota

// CLOSED represents that the pool is closed.
CLOSED
)

var (
// ErrLackPoolFunc will be returned when invokers don't provide function for pool.
ErrLackPoolFunc = errors.New("must provide function for pool")

// ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines.
ErrInvalidPoolExpiry = errors.New("invalid expiry for pool")

// ErrPoolClosed will be returned when submitting task to a closed pool.
ErrPoolClosed = errors.New("this pool has been closed")

// ErrPoolOverload will be returned when the pool is full and no workers available.
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")

// ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode.
ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode")

// ErrTimeout will be returned after the operations timed out.
ErrTimeout = errors.New("operation timed out")

// ErrInvalidPoolIndex will be returned when trying to retrieve a pool with an invalid index.
ErrInvalidPoolIndex = errors.New("invalid pool index")

// ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")

// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}

// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}()

// log.Lmsgprefix is not available in go1.13, just make an identical value for it.
logLmsgprefix = 64
defaultLogger = Logger(log.New(os.Stderr, "[ants]: ", log.LstdFlags|logLmsgprefix|log.Lmicroseconds))

// Init an instance pool when importing ants.
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)

const nowTimeUpdateInterval = 500 * time.Millisecond

// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Printf(format string, args ...interface{})
}

// Submit submits a task to pool.
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultAntsPool.Running()
}

// Cap returns the capacity of this default pool.
func Cap() int {
return defaultAntsPool.Cap()
}

// Free returns the available goroutines to work.
func Free() int {
return defaultAntsPool.Free()
}

// Release Closes the default pool.
func Release() {
defaultAntsPool.Release()
}

// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func ReleaseTimeout(timeout time.Duration) error {
return defaultAntsPool.ReleaseTimeout(timeout)
}

// Reboot reboots the default pool.
func Reboot() {
defaultAntsPool.Reboot()
}
13 changes: 13 additions & 0 deletions internal/ants/ants_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ants_test

import (
"testing"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ok
. "github.com/onsi/gomega" //nolint:revive // ok
)

func TestAnts(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Ants Suite")
}
142 changes: 142 additions & 0 deletions internal/ants/ants_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package ants_test

import (
"runtime"
"sync"
"time"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ok
. "github.com/onsi/gomega" //nolint:revive // ok

"github.com/snivilised/lorax/internal/ants"
)

var _ = Describe("Ants", func() {
Context("Submit", func() {
When("default pool", func() {
It("🧪 should: run ok", func() {
// TestAntsPool
defer leaktest.Check(GinkgoT())()
defer ants.Release()

var (
wg sync.WaitGroup
err error
)

for i := 0; i < n; i++ {
wg.Add(1)
err = ants.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()

GinkgoWriter.Printf("pool, capacity:%d", ants.Cap())
GinkgoWriter.Printf("pool, running workers number:%d", ants.Running())
GinkgoWriter.Printf("pool, free workers number:%d", ants.Free())

mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem

Expect(err).To(Succeed())
GinkgoWriter.Printf("memory usage:%d MB", curMem)
})
})

When("non-blocking", func() {
It("🧪 should: not fail", func() {
// TestNonblockingSubmit
// ??? defer leaktest.Check(GinkgoT())()

poolSize := 10
p, err := ants.NewPool(poolSize, ants.WithNonblocking(true))
Expect(err).To(Succeed(), "create TimingPool failed")

defer p.Release()

for i := 0; i < poolSize-1; i++ {
Expect(p.Submit(longRunningFunc)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)
}
ch := make(chan struct{})
ch1 := make(chan struct{})
f := func() {
<-ch
close(ch1)
}
// p is full now.
Expect(p.Submit(f)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)
Expect(p.Submit(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()),
"nonblocking submit when pool is full should get an ErrPoolOverload",
)

// interrupt f to get an available worker
close(ch)
<-ch1
Expect(p.Submit(demoFunc)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)
})
})

When("max blocking", func() {
It("🧪 should: not fail", func() {
// TestMaxBlockingSubmit
// ??? defer leaktest.Check(GinkgoT())()

poolSize := 10
p, err := ants.NewPool(poolSize, ants.WithMaxBlockingTasks(1))
Expect(err).To(Succeed(), "create TimingPool failed")

defer p.Release()

for i := 0; i < poolSize-1; i++ {
Expect(p.Submit(longRunningFunc)).To(Succeed(),
"blocking submit when pool is not full shouldn't return error",
)
}
ch := make(chan struct{})
f := func() {
<-ch
}
// p is full now.
Expect(p.Submit(f)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)

var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Submit(demoFunc); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
Expect(p.Submit(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()),
"blocking submit when pool reach max blocking submit should return ErrPoolOverload",
)

// interrupt f to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
// t.Fatalf("blocking submit when pool is full should not return error")
Fail("blocking submit when pool is full should not return error")
default:
}
})
})
})
})
Loading

0 comments on commit d0a2280

Please sign in to comment.