diff --git a/.evergreen/config.yml b/.evergreen/config.yml index 55217b80cc..90da768350 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -1362,7 +1362,7 @@ tasks: vars: MONGO_GO_DRIVER_COMPRESSOR: "snappy" - # Build with the oldest supported version of Go. + # Build the compilecheck submodule with the oldest supported version of Go. - name: go1.18-build tags: ["compile-check"] commands: @@ -1370,8 +1370,7 @@ tasks: params: binary: bash env: - GOROOT: /opt/golang/go1.18 - add_to_path: [/opt/golang/go1.18/bin] + GO_VERSION: "1.18" args: [*task-runner, build-compile-check] # Build with the same Go version that we're using for tests. @@ -1381,6 +1380,10 @@ tasks: - command: subprocess.exec params: binary: bash + # Set the GO_VERSION to empty string to use the Go installation in the + # PATH. + env: + GO_VERSION: "" args: [*task-runner, build] - name: "atlas-test" diff --git a/.github/workflows/create-release-branch.yml b/.github/workflows/create-release-branch.yml new file mode 100644 index 0000000000..2d22c042cf --- /dev/null +++ b/.github/workflows/create-release-branch.yml @@ -0,0 +1,55 @@ +name: Create Release Branch + +on: + workflow_dispatch: + inputs: + branch_name: + description: The name of the new branch + required: true + version: + description: The version to set on the branch + required: true + base_ref: + description: The base reference for the branch + push_changes: + description: Whether to push the changes + default: "true" + +concurrency: + group: create-branch-${{ github.ref }} + cancel-in-progress: true + +defaults: + run: + shell: bash -eux {0} + +jobs: + create-branch: + environment: release + runs-on: ubuntu-latest + permissions: + id-token: write + contents: write + outputs: + version: ${{ steps.pre-publish.outputs.version }} + steps: + - uses: mongodb-labs/drivers-github-tools/secure-checkout@v2 + with: + app_id: ${{ vars.APP_ID }} + private_key: ${{ secrets.APP_PRIVATE_KEY }} + - uses: mongodb-labs/drivers-github-tools/setup@v2 + with: + aws_role_arn: ${{ secrets.AWS_ROLE_ARN }} + aws_region_name: ${{ vars.AWS_REGION_NAME }} + aws_secret_id: ${{ secrets.AWS_SECRET_ID }} + artifactory_username: ${{ vars.ARTIFACTORY_USERNAME }} + - uses: mongodb-labs/drivers-github-tools/create-branch@v2 + id: create-branch + with: + branch_name: ${{ inputs.branch_name }} + version: ${{ inputs.version }} + base_ref: ${{ inputs.base_ref }} + push_changes: ${{ inputs.push_changes }} + version_bump_script: "go run ${{ github.action_path }}/bump-version.go" + evergreen_project: mongo-go-driver-release + release_workflow_path: ./.github/workflows/release.yml diff --git a/Taskfile.yml b/Taskfile.yml index 206b31db9f..f22427a640 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -18,10 +18,13 @@ tasks: check-license: bash etc/check_license.sh build: - deps: [cross-compile, build-tests, build-compile-check, install-libmongocrypt] + deps: [install-libmongocrypt] cmds: - go build ./... - go build ${BUILD_TAGS} ./... + - task: build-tests + - task: build-compile-check + - task: cross-compile build-tests: go test -short ${BUILD_TAGS} -run ^$$ ./... diff --git a/etc/compile_check.sh b/etc/compile_check.sh index 69e4395a12..d83a0cc6f1 100755 --- a/etc/compile_check.sh +++ b/etc/compile_check.sh @@ -2,32 +2,33 @@ set -e # exit when any command fails set -x # show all commands being run -GC=go +# Default to Go 1.18 if GO_VERSION is not set. +# +# Use the "=" operator (instead of the more common ":-" operator) so that it +# allows setting GO_VERSION="" to use the Go installation in the PATH, and it +# sets the GO_VERSION variable if the default is used. +GC=go${GO_VERSION="1.18"} COMPILE_CHECK_DIR="internal/cmd/compilecheck" -# shellcheck disable=SC2034 -DEV_MIN_VERSION=1.19 - -# version will flatten a version string of upto 4 components for inequality -# comparison. -function version { - echo "$@" | awk -F. '{ printf("%d%03d%03d%03d\n", $1,$2,$3,$4); }'; -} # compile_check will attempt to build the internal/test/compilecheck project # using the provided Go version. This is to simulate an end-to-end use case. -# This check will only run on environments where the Go version is greater than -# or equal to the given version. function compile_check { # Change the directory to the compilecheck test directory. - cd ${COMPILE_CHECK_DIR} + pushd ${COMPILE_CHECK_DIR} - MACHINE_VERSION=`${GC} version | { read _ _ v _; echo ${v#go}; }` + # If a custom Go version is set using the GO_VERSION env var (e.g. "1.18"), + # add the GOPATH bin directory to PATH and then install that Go version. + if [ ! -z "$GO_VERSION" ]; then + PATH=$(go env GOPATH)/bin:$PATH + export PATH - # If the version is not 1.13, then run "go mod tidy" - if [ "$(version $MACHINE_VERSION)" -ge "$(version 1.15)" ]; then - go mod tidy + go install golang.org/dl/go$GO_VERSION@latest + ${GC} download fi + ${GC} version + ${GC} mod tidy + # Check simple build. ${GC} build ./... @@ -35,7 +36,7 @@ function compile_check { ${GC} build -buildmode=plugin # Check build with tags. - go build $BUILD_TAGS ./... + ${GC} build $BUILD_TAGS ./... # Check build with various architectures. GOOS=linux GOARCH=386 ${GC} build ./... @@ -50,7 +51,7 @@ function compile_check { rm compilecheck.so # Change the directory back to the working directory. - cd - + popd } compile_check diff --git a/examples/_logger/logrus/go.mod b/examples/_logger/logrus/go.mod index 15d893c008..f6ec322f68 100644 --- a/examples/_logger/logrus/go.mod +++ b/examples/_logger/logrus/go.mod @@ -13,7 +13,7 @@ require ( require ( github.com/go-logr/logr v1.2.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/examples/_logger/logrus/go.sum b/examples/_logger/logrus/go.sum index 58459e878b..8f12f843fb 100644 --- a/examples/_logger/logrus/go.sum +++ b/examples/_logger/logrus/go.sum @@ -9,8 +9,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= diff --git a/examples/_logger/zap/go.mod b/examples/_logger/zap/go.mod index ff361cc0d3..4d47994a73 100644 --- a/examples/_logger/zap/go.mod +++ b/examples/_logger/zap/go.mod @@ -13,7 +13,7 @@ require ( require ( github.com/go-logr/logr v1.2.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/examples/_logger/zap/go.sum b/examples/_logger/zap/go.sum index 609393e164..b86ad07cf7 100644 --- a/examples/_logger/zap/go.sum +++ b/examples/_logger/zap/go.sum @@ -11,8 +11,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= diff --git a/examples/_logger/zerolog/go.mod b/examples/_logger/zerolog/go.mod index 76ab265c76..9913c321c4 100644 --- a/examples/_logger/zerolog/go.mod +++ b/examples/_logger/zerolog/go.mod @@ -13,7 +13,7 @@ require ( require ( github.com/go-logr/logr v1.2.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/examples/_logger/zerolog/go.sum b/examples/_logger/zerolog/go.sum index 6d8c756356..8cc6562614 100644 --- a/examples/_logger/zerolog/go.sum +++ b/examples/_logger/zerolog/go.sum @@ -10,8 +10,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= diff --git a/go.mod b/go.mod index 0060ed272e..a51898d0fd 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/golang/snappy v0.0.4 github.com/google/go-cmp v0.6.0 - github.com/klauspost/compress v1.13.6 + github.com/klauspost/compress v1.16.7 github.com/xdg-go/scram v1.1.2 github.com/xdg-go/stringprep v1.0.4 github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 diff --git a/go.sum b/go.sum index 945fecdcf0..e4a93e189c 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= diff --git a/internal/cmd/benchmark/go.mod b/internal/cmd/benchmark/go.mod index 1ebb104c47..f99d93f20c 100644 --- a/internal/cmd/benchmark/go.mod +++ b/internal/cmd/benchmark/go.mod @@ -12,7 +12,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/internal/cmd/benchmark/go.sum b/internal/cmd/benchmark/go.sum index 9954acd3c7..a7b751018d 100644 --- a/internal/cmd/benchmark/go.sum +++ b/internal/cmd/benchmark/go.sum @@ -4,8 +4,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/internal/cmd/compilecheck/go.mod b/internal/cmd/compilecheck/go.mod index ef0d68d61e..bbc4aae937 100644 --- a/internal/cmd/compilecheck/go.mod +++ b/internal/cmd/compilecheck/go.mod @@ -10,7 +10,7 @@ require go.mongodb.org/mongo-driver/v2 v2.0.0-alpha2 require ( github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/internal/cmd/compilecheck/go.sum b/internal/cmd/compilecheck/go.sum index be87659b51..db8c667547 100644 --- a/internal/cmd/compilecheck/go.sum +++ b/internal/cmd/compilecheck/go.sum @@ -2,8 +2,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= diff --git a/internal/cmd/faas/awslambda/mongodb/go.mod b/internal/cmd/faas/awslambda/mongodb/go.mod index 26e4b5922a..a002c13046 100644 --- a/internal/cmd/faas/awslambda/mongodb/go.mod +++ b/internal/cmd/faas/awslambda/mongodb/go.mod @@ -10,7 +10,7 @@ require go.mongodb.org/mongo-driver/v2 v2.0.0-00010101000000-000000000000 require ( github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/stretchr/testify v1.8.1 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/internal/cmd/faas/awslambda/mongodb/go.sum b/internal/cmd/faas/awslambda/mongodb/go.sum index 6ea66c6c66..0adaac0231 100644 --- a/internal/cmd/faas/awslambda/mongodb/go.sum +++ b/internal/cmd/faas/awslambda/mongodb/go.sum @@ -7,8 +7,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index e627fe8c28..ae9225f33b 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -22,1208 +22,1212 @@ import ( "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" ) -func TestPool(t *testing.T) { - t.Run("newPool", func(t *testing.T) { - t.Parallel() +func TestNewPool(t *testing.T) { + t.Parallel() - t.Run("minPoolSize should not exceed maxPoolSize", func(t *testing.T) { - t.Parallel() + t.Run("minPoolSize should not exceed maxPoolSize", func(t *testing.T) { + t.Parallel() - p := newPool(poolConfig{MinPoolSize: 100, MaxPoolSize: 10}) - assert.Equalf(t, uint64(10), p.minSize, "expected minSize of a pool not to be greater than maxSize") + p := newPool(poolConfig{MinPoolSize: 100, MaxPoolSize: 10}) + assert.Equalf(t, uint64(10), p.minSize, "expected minSize of a pool not to be greater than maxSize") - p.close(context.Background()) - }) - t.Run("minPoolSize may exceed maxPoolSize of 0", func(t *testing.T) { - t.Parallel() + p.close(context.Background()) + }) + t.Run("minPoolSize may exceed maxPoolSize of 0", func(t *testing.T) { + t.Parallel() - p := newPool(poolConfig{MinPoolSize: 10, MaxPoolSize: 0}) - assert.Equalf(t, uint64(10), p.minSize, "expected minSize of a pool to be greater than maxSize of 0") + p := newPool(poolConfig{MinPoolSize: 10, MaxPoolSize: 0}) + assert.Equalf(t, uint64(10), p.minSize, "expected minSize of a pool to be greater than maxSize of 0") - p.close(context.Background()) - }) - t.Run("should be paused", func(t *testing.T) { - t.Parallel() + p.close(context.Background()) + }) + t.Run("should be paused", func(t *testing.T) { + t.Parallel() - p := newPool(poolConfig{}) - assert.Equalf(t, poolPaused, p.getState(), "expected new pool to be paused") + p := newPool(poolConfig{}) + assert.Equalf(t, poolPaused, p.getState(), "expected new pool to be paused") - p.close(context.Background()) - }) + p.close(context.Background()) }) - t.Run("closeConnection", func(t *testing.T) { - t.Parallel() +} - t.Run("can't close connection from different pool", func(t *testing.T) { - t.Parallel() +func TestPool_closeConnection(t *testing.T) { + t.Parallel() - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + t.Run("can't close connection from different pool", func(t *testing.T) { + t.Parallel() - p1 := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p1.ready() - require.NoError(t, err) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - c, err := p1.checkOut(context.Background()) - require.NoError(t, err) + p1 := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p1.ready() + require.NoError(t, err) - p2 := newPool(poolConfig{}) - err = p2.ready() - require.NoError(t, err) + c, err := p1.checkOut(context.Background()) + require.NoError(t, err) - err = p2.closeConnection(c) - assert.Equalf(t, ErrWrongPool, err, "expected ErrWrongPool error") + p2 := newPool(poolConfig{}) + err = p2.ready() + require.NoError(t, err) - p1.close(context.Background()) - p2.close(context.Background()) - }) + err = p2.closeConnection(c) + assert.Equalf(t, ErrWrongPool, err, "expected ErrWrongPool error") + + p1.close(context.Background()) + p2.close(context.Background()) }) - t.Run("close", func(t *testing.T) { - t.Parallel() +} - t.Run("calling close multiple times does not panic", func(t *testing.T) { - t.Parallel() +func TestPool_close(t *testing.T) { + t.Parallel() - p := newPool(poolConfig{ - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + t.Run("calling close multiple times does not panic", func(t *testing.T) { + t.Parallel() - for i := 0; i < 5; i++ { - p.close(context.Background()) - } + p := newPool(poolConfig{ + ConnectTimeout: defaultConnectionTimeout, }) - t.Run("closes idle connections", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) - - conns := make([]*connection, 3) - for i := range conns { - conns[i], err = p.checkOut(context.Background()) - require.NoError(t, err) - } - for i := range conns { - err = p.checkIn(conns[i]) - require.NoError(t, err) - } - assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections") - assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") - assert.Equalf(t, 3, p.availableConnectionCount(), "should have 3 available connections") - assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") + err := p.ready() + require.NoError(t, err) + for i := 0; i < 5; i++ { p.close(context.Background()) - assertConnectionsClosed(t, d, 3) - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 total connections") + } + }) + t.Run("closes idle connections", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("closes all open connections", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) + + conns := make([]*connection, 3) + for i := range conns { + conns[i], err = p.checkOut(context.Background()) + require.NoError(t, err) + } + for i := range conns { + err = p.checkIn(conns[i]) require.NoError(t, err) + } + assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections") + assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") + assert.Equalf(t, 3, p.availableConnectionCount(), "should have 3 available connections") + assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") - conns := make([]*connection, 3) - for i := range conns { - conns[i], err = p.checkOut(context.Background()) - require.NoError(t, err) - } - for i := 0; i < 2; i++ { - err = p.checkIn(conns[i]) - require.NoError(t, err) - } - assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections") - assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") - assert.Equalf(t, 2, p.availableConnectionCount(), "should have 2 available connections") - assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") + p.close(context.Background()) + assertConnectionsClosed(t, d, 3) + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 total connections") + }) + t.Run("closes all open connections", func(t *testing.T) { + t.Parallel() - p.close(context.Background()) - assertConnectionsClosed(t, d, 3) - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") - assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connections") + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("no race if connections are also connecting", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) - _, err = p.checkOut(context.Background()) + conns := make([]*connection, 3) + for i := range conns { + conns[i], err = p.checkOut(context.Background()) require.NoError(t, err) - - closed := make(chan struct{}) - started := make(chan struct{}) - go func() { - close(started) - - for { - select { - case <-closed: - return - default: - c, _ := p.checkOut(context.Background()) - _ = p.checkIn(c) - time.Sleep(time.Millisecond) - } - } - }() - - // Wait for the background goroutine to start running before trying to close the - // connection pool. - <-started - _, err = p.checkOut(context.Background()) + } + for i := 0; i < 2; i++ { + err = p.checkIn(conns[i]) require.NoError(t, err) + } + assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections") + assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") + assert.Equalf(t, 2, p.availableConnectionCount(), "should have 2 available connections") + assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") - p.close(context.Background()) + p.close(context.Background()) + assertConnectionsClosed(t, d, 3) + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") + assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connections") + }) + t.Run("no race if connections are also connecting", func(t *testing.T) { + t.Parallel() - close(closed) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("shuts down gracefully if Context has a deadline", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) - - // Check out 2 connections from the pool and add them to a conns slice. - conns := make([]*connection, 2) - for i := 0; i < 2; i++ { - c, err := p.checkOut(context.Background()) - require.NoError(t, err) - conns[i] = c - } + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - // Check out a 3rd connection from the pool and immediately check it back in so there is - // a mixture of in-use and idle connections. - c, err := p.checkOut(context.Background()) - require.NoError(t, err) + _, err = p.checkOut(context.Background()) + require.NoError(t, err) - err = p.checkIn(c) - require.NoError(t, err) + closed := make(chan struct{}) + started := make(chan struct{}) + go func() { + close(started) - // Start a goroutine that waits for the pool to start closing, then checks in the - // 2 in-use connections. Assert that both connections are still connected during - // graceful shutdown before they are checked in. - go func() { - for p.getState() == poolReady { + for { + select { + case <-closed: + return + default: + c, _ := p.checkOut(context.Background()) + _ = p.checkIn(c) time.Sleep(time.Millisecond) } - for _, c := range conns { - assert.Equalf(t, connConnected, c.state, "expected conn to still be connected") - - err := p.checkIn(c) - require.NoError(t, err) - } - }() - - // Close the pool with a 1-hour graceful shutdown timeout. Expect that the call to - // close() returns when all of the connections are checked in. If close() doesn't return - // when all of the connections are checked in, the test will time out. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) - defer cancel() - p.close(ctx) - }) - t.Run("closing a Connection does not cause an error after pool is closed", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + } + }() - c, err := p.checkOut(context.Background()) - require.NoError(t, err) + // Wait for the background goroutine to start running before trying to close the + // connection pool. + <-started + _, err = p.checkOut(context.Background()) + require.NoError(t, err) - p.close(context.Background()) + p.close(context.Background()) - c1 := &Connection{connection: c} - err = c1.Close() - require.NoError(t, err) - }) + close(closed) }) - t.Run("ready", func(t *testing.T) { + t.Run("shuts down gracefully if Context has a deadline", func(t *testing.T) { t.Parallel() - t.Run("can ready a paused pool", func(t *testing.T) { - t.Parallel() + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 6, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() + // Check out 2 connections from the pool and add them to a conns slice. + conns := make([]*connection, 2) + for i := 0; i < 2; i++ { + c, err := p.checkOut(context.Background()) require.NoError(t, err) - conns := make([]*connection, 3) - for i := range conns { - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - conns[i] = conn - } - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") - assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") + conns[i] = c + } - p.clear(nil, nil) - for _, conn := range conns { - err = p.checkIn(conn) - require.NoError(t, err) - } - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") - assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connections") + // Check out a 3rd connection from the pool and immediately check it back in so there is + // a mixture of in-use and idle connections. + c, err := p.checkOut(context.Background()) + require.NoError(t, err) - err = p.ready() - require.NoError(t, err) + err = p.checkIn(c) + require.NoError(t, err) - for i := 0; i < 3; i++ { - _, err := p.checkOut(context.Background()) - require.NoError(t, err) + // Start a goroutine that waits for the pool to start closing, then checks in the + // 2 in-use connections. Assert that both connections are still connected during + // graceful shutdown before they are checked in. + go func() { + for p.getState() == poolReady { + time.Sleep(time.Millisecond) } - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") - assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") - - p.close(context.Background()) - }) - t.Run("calling ready multiple times does not return an error", func(t *testing.T) { - t.Parallel() + for _, c := range conns { + assert.Equalf(t, connConnected, c.state, "expected conn to still be connected") - p := newPool(poolConfig{}) - for i := 0; i < 5; i++ { - err := p.ready() + err := p.checkIn(c) require.NoError(t, err) } + }() + + // Close the pool with a 1-hour graceful shutdown timeout. Expect that the call to + // close() returns when all of the connections are checked in. If close() doesn't return + // when all of the connections are checked in, the test will time out. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) + defer cancel() + p.close(ctx) + }) + t.Run("closing a Connection does not cause an error after pool is closed", func(t *testing.T) { + t.Parallel() - p.close(context.Background()) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("can clear and ready multiple times", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 2, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - c, err := p.checkOut(context.Background()) - require.NoError(t, err) - err = p.checkIn(c) - require.NoError(t, err) + c, err := p.checkOut(context.Background()) + require.NoError(t, err) - for i := 0; i < 100; i++ { - err = p.ready() - require.NoError(t, err) + p.close(context.Background()) - p.clear(nil, nil) - } + c1 := &Connection{connection: c} + err = c1.Close() + require.NoError(t, err) + }) +} - err = p.ready() - require.NoError(t, err) +func TestPool_ready(t *testing.T) { + t.Parallel() - c, err = p.checkOut(context.Background()) - require.NoError(t, err) - err = p.checkIn(c) - require.NoError(t, err) + t.Run("can ready a paused pool", func(t *testing.T) { + t.Parallel() - p.close(context.Background()) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 6, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("can clear and ready multiple times concurrently", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 2, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - c, err := p.checkOut(context.Background()) - require.NoError(t, err) - err = p.checkIn(c) + conns := make([]*connection, 3) + for i := range conns { + conn, err := p.checkOut(context.Background()) require.NoError(t, err) + conns[i] = conn + } + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") + assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 1000; i++ { - err := p.ready() - require.NoError(t, err) - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 1000; i++ { - p.clear(errors.New("test error"), nil) - } - }() - } - - wg.Wait() - err = p.ready() + p.clear(nil, nil) + for _, conn := range conns { + err = p.checkIn(conn) require.NoError(t, err) + } + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") + assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connections") - c, err = p.checkOut(context.Background()) - require.NoError(t, err) - err = p.checkIn(c) + err = p.ready() + require.NoError(t, err) + + for i := 0; i < 3; i++ { + _, err := p.checkOut(context.Background()) require.NoError(t, err) + } + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections") + assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections") - p.close(context.Background()) - }) + p.close(context.Background()) }) - t.Run("checkOut", func(t *testing.T) { + t.Run("calling ready multiple times does not return an error", func(t *testing.T) { t.Parallel() - t.Run("return error when attempting to create new connection", func(t *testing.T) { - t.Parallel() - - dialErr := errors.New("create new connection error") - p := newPool(poolConfig{ - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { - return DialerFunc(func(context.Context, string, string) (net.Conn, error) { - return nil, dialErr - }) - })) + p := newPool(poolConfig{}) + for i := 0; i < 5; i++ { err := p.ready() require.NoError(t, err) + } - _, err = p.checkOut(context.Background()) - var want error = ConnectionError{Wrapped: dialErr, init: true} - assert.Equalf(t, want, err, "should return error from calling checkOut()") - // If a connection initialization error occurs during checkOut, removing and closing the - // failed connection both happen asynchronously with the checkOut. Wait for up to 2s for - // the failed connection to be removed from the pool. - assert.Eventuallyf(t, - func() bool { - return p.totalConnectionCount() == 0 - }, - 2*time.Second, - 100*time.Millisecond, - "expected pool to have 0 total connections within 10s") + p.close(context.Background()) + }) + t.Run("can clear and ready multiple times", func(t *testing.T) { + t.Parallel() - p.close(context.Background()) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 2, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("closes perished connections", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 2, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool( - poolConfig{ - Address: address.Address(addr.String()), - MaxIdleTime: time.Millisecond, - ConnectTimeout: defaultConnectionTimeout, - }, - WithDialer(func(Dialer) Dialer { return d }), - ) - err := p.ready() - require.NoError(t, err) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - // Check out a connection and assert that the idle timeout is properly set then check it - // back into the pool. - c1, err := p.checkOut(context.Background()) - require.NoError(t, err) - assert.Equalf(t, 1, d.lenopened(), "should have opened 1 connection") - assert.Equalf(t, 1, p.totalConnectionCount(), "pool should have 1 total connection") - assert.Equalf(t, time.Millisecond, c1.idleTimeout, "connection should have a 1ms idle timeout") + c, err := p.checkOut(context.Background()) + require.NoError(t, err) + err = p.checkIn(c) + require.NoError(t, err) - err = p.checkIn(c1) + for i := 0; i < 100; i++ { + err = p.ready() require.NoError(t, err) - // Sleep for more than the 1ms idle timeout and then try to check out a connection. - // Expect that the previously checked-out connection is closed because it's idle and a - // new connection is created. - time.Sleep(50 * time.Millisecond) - c2, err := p.checkOut(context.Background()) - require.NoError(t, err) - // Assert that the connection pointers are not equal. Don't use "assert.NotEqual" because it asserts - // non-equality of fields, possibly accessing some fields non-atomically and causing a race condition. - assert.True(t, c1 != c2, "expected a new connection on 2nd check out after idle timeout expires") - assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connections") - assert.Equalf(t, 1, p.totalConnectionCount(), "pool should have 1 total connection") + p.clear(nil, nil) + } - p.close(context.Background()) - }) - t.Run("recycles connections", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + err = p.ready() + require.NoError(t, err) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) + c, err = p.checkOut(context.Background()) + require.NoError(t, err) + err = p.checkIn(c) + require.NoError(t, err) - for i := 0; i < 100; i++ { - c, err := p.checkOut(context.Background()) - require.NoError(t, err) + p.close(context.Background()) + }) + t.Run("can clear and ready multiple times concurrently", func(t *testing.T) { + t.Parallel() - err = p.checkIn(c) - require.NoError(t, err) - } - assert.Equalf(t, 1, d.lenopened(), "should have opened 1 connection") + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 2, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - p.close(context.Background()) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, }) - t.Run("cannot checkOut from closed pool", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + err := p.ready() + require.NoError(t, err) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + c, err := p.checkOut(context.Background()) + require.NoError(t, err) + err = p.checkIn(c) + require.NoError(t, err) - p.close(context.Background()) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + err := p.ready() + require.NoError(t, err) + } + }() - _, err = p.checkOut(context.Background()) - assert.Equalf( - t, - ErrPoolClosed, - err, - "expected an error from checkOut() from a closed pool") - }) - t.Run("handshaker i/o fails", func(t *testing.T) { - t.Parallel() - - p := newPool( - poolConfig{ - ConnectTimeout: defaultConnectionTimeout, - }, - WithHandshaker(func(Handshaker) Handshaker { - return operation.NewHello() - }), - WithDialer(func(Dialer) Dialer { - return DialerFunc(func(context.Context, string, string) (net.Conn, error) { - return &writeFailConn{&net.TCPConn{}}, nil - }) - }), - ) - err := p.ready() - require.NoError(t, err) + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + p.clear(errors.New("test error"), nil) + } + }() + } - _, err = p.checkOut(context.Background()) - assert.IsTypef(t, ConnectionError{}, err, "expected a ConnectionError") - if err, ok := err.(ConnectionError); ok { - assert.Containsf( - t, - err.Unwrap().Error(), - "unable to write wire message to network: Write error", - "expected error to contain string") - } - assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 available connections") - // On connect() failure, the connection is removed and closed after delivering the error - // to checkOut(), so it may still count toward the total connection count briefly. Wait - // up to 100ms for the total connection count to reach 0. - assert.Eventually(t, - func() bool { - return p.totalConnectionCount() == 0 - }, - 100*time.Millisecond, - 1*time.Millisecond, - "expected pool to have 0 total connections within 100ms") + wg.Wait() + err = p.ready() + require.NoError(t, err) - p.close(context.Background()) - }) - // Test that if a checkOut() times out, it returns a WaitQueueTimeout error that wraps a - // context.DeadlineExceeded error. - t.Run("wait queue timeout error", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + c, err = p.checkOut(context.Background()) + require.NoError(t, err) + err = p.checkIn(c) + require.NoError(t, err) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MaxPoolSize: 1, - ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + p.close(context.Background()) + }) +} - // check out first connection. - _, err = p.checkOut(context.Background()) - require.NoError(t, err) +func TestPool_checkOut(t *testing.T) { + t.Parallel() - // Set a short timeout and check out again. - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - _, err = p.checkOut(ctx) - assert.NotNilf(t, err, "expected a WaitQueueTimeout error") + t.Run("return error when attempting to create new connection", func(t *testing.T) { + t.Parallel() - // Assert that error received is WaitQueueTimeoutError with context deadline exceeded. - assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError") - if err, ok := err.(WaitQueueTimeoutError); ok { - assert.Equalf(t, context.DeadlineExceeded, err.Unwrap(), "expected wrapped error to be a context.Timeout") - assert.Containsf(t, err.Error(), "timed out", `expected error message to contain "timed out"`) - } + dialErr := errors.New("create new connection error") + p := newPool(poolConfig{ + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { + return DialerFunc(func(context.Context, string, string) (net.Conn, error) { + return nil, dialErr + }) + })) + err := p.ready() + require.NoError(t, err) - p.close(context.Background()) + _, err = p.checkOut(context.Background()) + var want error = ConnectionError{Wrapped: dialErr, init: true} + assert.Equalf(t, want, err, "should return error from calling checkOut()") + // If a connection initialization error occurs during checkOut, removing and closing the + // failed connection both happen asynchronously with the checkOut. Wait for up to 2s for + // the failed connection to be removed from the pool. + assert.Eventuallyf(t, + func() bool { + return p.totalConnectionCount() == 0 + }, + 2*time.Second, + 100*time.Millisecond, + "expected pool to have 0 total connections within 10s") + + p.close(context.Background()) + }) + t.Run("closes perished connections", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 2, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - // Test that an indefinitely blocked checkOut() doesn't cause the wait queue to overflow - // if there are many other checkOut() calls that time out. This tests a scenario where a - // wantConnQueue may grow unbounded while a checkOut() is blocked, even if all subsequent - // checkOut() calls time out (due to the behavior of wantConnQueue.cleanFront()). - t.Run("wait queue doesn't overflow", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - p := newPool(poolConfig{ + d := newdialer(&net.Dialer{}) + p := newPool( + poolConfig{ Address: address.Address(addr.String()), - MaxPoolSize: 1, + MaxIdleTime: time.Millisecond, ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + }, + WithDialer(func(Dialer) Dialer { return d }), + ) + err := p.ready() + require.NoError(t, err) + + // Check out a connection and assert that the idle timeout is properly set then check it + // back into the pool. + c1, err := p.checkOut(context.Background()) + require.NoError(t, err) + assert.Equalf(t, 1, d.lenopened(), "should have opened 1 connection") + assert.Equalf(t, 1, p.totalConnectionCount(), "pool should have 1 total connection") + assert.Equalf(t, time.Millisecond, c1.idleTimeout, "connection should have a 1ms idle timeout") + + err = p.checkIn(c1) + require.NoError(t, err) + + // Sleep for more than the 1ms idle timeout and then try to check out a connection. + // Expect that the previously checked-out connection is closed because it's idle and a + // new connection is created. + time.Sleep(50 * time.Millisecond) + c2, err := p.checkOut(context.Background()) + require.NoError(t, err) + // Assert that the connection pointers are not equal. Don't use "assert.NotEqual" because it asserts + // non-equality of fields, possibly accessing some fields non-atomically and causing a race condition. + assert.True(t, c1 != c2, "expected a new connection on 2nd check out after idle timeout expires") + assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connections") + assert.Equalf(t, 1, p.totalConnectionCount(), "pool should have 1 total connection") - // Check out the 1 connection that the pool will create. - c, err := p.checkOut(context.Background()) - require.NoError(t, err) + p.close(context.Background()) + }) + t.Run("recycles connections", func(t *testing.T) { + t.Parallel() - // Start a goroutine that tries to check out another connection with no timeout. Expect - // this goroutine to block (wait in the wait queue) until the checked-out connection is - // checked-in. Assert that there is no error once checkOut() finally does return. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - _, err := p.checkOut(context.Background()) - require.NoError(t, err) - }() + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) - // Run lots of check-out attempts with a low timeout and assert that each one fails with - // a WaitQueueTimeout error. Expect no other errors or panics. - for i := 0; i < 50000; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond) - _, err := p.checkOut(ctx) - cancel() - assert.NotNilf(t, err, "expected a WaitQueueTimeout error") - assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError") - } + for i := 0; i < 100; i++ { + c, err := p.checkOut(context.Background()) + require.NoError(t, err) - // Check-in the connection we checked out earlier and wait for the checkOut() goroutine - // to resume. err = p.checkIn(c) require.NoError(t, err) - wg.Wait() + } + assert.Equalf(t, 1, d.lenopened(), "should have opened 1 connection") - p.close(context.Background()) - }) - // Test that checkOut() on a full connection pool creates and returns a new connection - // immediately as soon as the pool is no longer full. - t.Run("should return a new connection as soon as the pool isn't full", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + p.close(context.Background()) + }) + t.Run("cannot checkOut from closed pool", func(t *testing.T) { + t.Parallel() - d := newdialer(&net.Dialer{}) - p := newPool( - poolConfig{ - Address: address.Address(addr.String()), - MaxPoolSize: 2, - ConnectTimeout: defaultConnectionTimeout, - }, - WithDialer(func(Dialer) Dialer { return d }), - ) - err := p.ready() - require.NoError(t, err) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - // Check out two connections (MaxPoolSize) so that subsequent checkOut() calls should - // block until a connection is checked back in or removed from the pool. - c, err := p.checkOut(context.Background()) - require.NoError(t, err) - _, err = p.checkOut(context.Background()) - require.NoError(t, err) - assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connection") - assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection") - assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection") - - // Run a checkOut() with timeout and expect it to time out because the pool is at - // MaxPoolSize and no connections are checked in or removed from the pool. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) - defer cancel() - _, err = p.checkOut(ctx) - assert.Equalf( - t, - context.DeadlineExceeded, - err.(WaitQueueTimeoutError).Wrapped, - "expected wrapped error to be a context.DeadlineExceeded") - - // Start a goroutine that closes one of the checked-out connections and checks it in. - // Expect that the checked-in connection is closed and allows blocked checkOut() to - // complete. Assert that the time between checking in the closed connection and when the - // checkOut() completes is within 100ms. - var start time.Time - go func() { - c.close() - start = time.Now() - err := p.checkIn(c) - require.NoError(t, err) - }() - _, err = p.checkOut(context.Background()) - require.NoError(t, err) - assert.WithinDurationf( - t, - time.Now(), - start, - 100*time.Millisecond, - "expected checkOut to complete within 100ms of checking in a closed connection") + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection") - assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connection") - assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection") - assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection") + p.close(context.Background()) - p.close(context.Background()) - }) - t.Run("canceled context in wait queue", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + _, err = p.checkOut(context.Background()) + assert.Equalf( + t, + ErrPoolClosed, + err, + "expected an error from checkOut() from a closed pool") + }) + t.Run("handshaker i/o fails", func(t *testing.T) { + t.Parallel() - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MaxPoolSize: 1, + p := newPool( + poolConfig{ ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + }, + WithHandshaker(func(Handshaker) Handshaker { + return operation.NewHello() + }), + WithDialer(func(Dialer) Dialer { + return DialerFunc(func(context.Context, string, string) (net.Conn, error) { + return &writeFailConn{&net.TCPConn{}}, nil + }) + }), + ) + err := p.ready() + require.NoError(t, err) - // Check out first connection. - _, err = p.checkOut(context.Background()) - require.NoError(t, err) + _, err = p.checkOut(context.Background()) + assert.IsTypef(t, ConnectionError{}, err, "expected a ConnectionError") + if err, ok := err.(ConnectionError); ok { + assert.Containsf( + t, + err.Unwrap().Error(), + "unable to write wire message to network: Write error", + "expected error to contain string") + } + assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 available connections") + // On connect() failure, the connection is removed and closed after delivering the error + // to checkOut(), so it may still count toward the total connection count briefly. Wait + // up to 100ms for the total connection count to reach 0. + assert.Eventually(t, + func() bool { + return p.totalConnectionCount() == 0 + }, + 100*time.Millisecond, + 1*time.Millisecond, + "expected pool to have 0 total connections within 100ms") - // Use a canceled context to check out another connection. - cancelCtx, cancel := context.WithCancel(context.Background()) - cancel() - _, err = p.checkOut(cancelCtx) - assert.NotNilf(t, err, "expected a non-nil error") + p.close(context.Background()) + }) + // Test that if a checkOut() times out, it returns a WaitQueueTimeout error that wraps a + // context.DeadlineExceeded error. + t.Run("wait queue timeout error", func(t *testing.T) { + t.Parallel() - // Assert that error received is WaitQueueTimeoutError with context canceled. - assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError") - if err, ok := err.(WaitQueueTimeoutError); ok { - assert.Equalf(t, context.Canceled, err.Unwrap(), "expected wrapped error to be a context.Canceled") - assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`) - } + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - p.close(context.Background()) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MaxPoolSize: 1, + ConnectTimeout: defaultConnectionTimeout, }) - t.Run("discards connections closed by the server side", func(t *testing.T) { - t.Parallel() + err := p.ready() + require.NoError(t, err) - cleanup := make(chan struct{}) - defer close(cleanup) + // check out first connection. + _, err = p.checkOut(context.Background()) + require.NoError(t, err) + + // Set a short timeout and check out again. + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + _, err = p.checkOut(ctx) + assert.NotNilf(t, err, "expected a WaitQueueTimeout error") + + // Assert that error received is WaitQueueTimeoutError with context deadline exceeded. + assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError") + if err, ok := err.(WaitQueueTimeoutError); ok { + assert.Equalf(t, context.DeadlineExceeded, err.Unwrap(), "expected wrapped error to be a context.Timeout") + assert.Containsf(t, err.Error(), "timed out", `expected error message to contain "timed out"`) + } - ncs := make(chan net.Conn, 2) - addr := bootstrapConnections(t, 2, func(nc net.Conn) { - // Send all "server-side" connections to a channel so we can - // interact with them during the test. - ncs <- nc + p.close(context.Background()) + }) + // Test that an indefinitely blocked checkOut() doesn't cause the wait queue to overflow + // if there are many other checkOut() calls that time out. This tests a scenario where a + // wantConnQueue may grow unbounded while a checkOut() is blocked, even if all subsequent + // checkOut() calls time out (due to the behavior of wantConnQueue.cleanFront()). + t.Run("wait queue doesn't overflow", func(t *testing.T) { + t.Parallel() - <-cleanup - _ = nc.Close() - }) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MaxPoolSize: 1, + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - // Add 1 idle connection to the pool by checking-out and checking-in - // a connection. - conn, err := p.checkOut(context.Background()) - require.NoError(t, err) - err = p.checkIn(conn) - require.NoError(t, err) - assertConnectionsOpened(t, d, 1) - assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") - - // Make that connection appear as if it's been idle for a minute. - conn.idleStart.Store(time.Now().Add(-1 * time.Minute)) - - // Close the "server-side" of the connection we just created. The idle - // connection in the pool is now unusable because the "server-side" - // closed it. - nc := <-ncs - err = nc.Close() - require.NoError(t, err) + // Check out the 1 connection that the pool will create. + c, err := p.checkOut(context.Background()) + require.NoError(t, err) - // In a separate goroutine, write a valid wire message to the 2nd - // connection that's about to be created. Stop waiting for a 2nd - // connection after 100ms to prevent leaking a goroutine. - go func() { - select { - case nc := <-ncs: - _, err := nc.Write([]byte{5, 0, 0, 0, 0}) - require.NoError(t, err, "Write error") - case <-time.After(100 * time.Millisecond): - } - }() + // Start a goroutine that tries to check out another connection with no timeout. Expect + // this goroutine to block (wait in the wait queue) until the checked-out connection is + // checked-in. Assert that there is no error once checkOut() finally does return. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() - // Check out a connection and try to read from it. Expect the pool to - // discard the connection that was closed by the "server-side" and - // return a newly created connection instead. - conn, err = p.checkOut(context.Background()) - require.NoError(t, err) - msg, err := conn.readWireMessage(context.Background()) + _, err := p.checkOut(context.Background()) require.NoError(t, err) - assert.Equal(t, []byte{5, 0, 0, 0, 0}, msg) + }() - err = p.checkIn(conn) - require.NoError(t, err) + // Run lots of check-out attempts with a low timeout and assert that each one fails with + // a WaitQueueTimeout error. Expect no other errors or panics. + for i := 0; i < 50000; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond) + _, err := p.checkOut(ctx) + cancel() + assert.NotNilf(t, err, "expected a WaitQueueTimeout error") + assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError") + } - assertConnectionsOpened(t, d, 2) - assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") + // Check-in the connection we checked out earlier and wait for the checkOut() goroutine + // to resume. + err = p.checkIn(c) + require.NoError(t, err) + wg.Wait() - p.close(context.Background()) - }) + p.close(context.Background()) }) - t.Run("checkIn", func(t *testing.T) { + // Test that checkOut() on a full connection pool creates and returns a new connection + // immediately as soon as the pool is no longer full. + t.Run("should return a new connection as soon as the pool isn't full", func(t *testing.T) { t.Parallel() - t.Run("cannot return same connection to pool twice", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - p := newPool(poolConfig{ + d := newdialer(&net.Dialer{}) + p := newPool( + poolConfig{ Address: address.Address(addr.String()), + MaxPoolSize: 2, ConnectTimeout: defaultConnectionTimeout, - }) - err := p.ready() - require.NoError(t, err) + }, + WithDialer(func(Dialer) Dialer { return d }), + ) + err := p.ready() + require.NoError(t, err) - c, err := p.checkOut(context.Background()) - require.NoError(t, err) - assert.Equalf(t, 0, p.availableConnectionCount(), "should be no idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") + // Check out two connections (MaxPoolSize) so that subsequent checkOut() calls should + // block until a connection is checked back in or removed from the pool. + c, err := p.checkOut(context.Background()) + require.NoError(t, err) + _, err = p.checkOut(context.Background()) + require.NoError(t, err) + assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connection") + assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection") + assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection") + + // Run a checkOut() with timeout and expect it to time out because the pool is at + // MaxPoolSize and no connections are checked in or removed from the pool. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + _, err = p.checkOut(ctx) + assert.Equalf( + t, + context.DeadlineExceeded, + err.(WaitQueueTimeoutError).Wrapped, + "expected wrapped error to be a context.DeadlineExceeded") + + // Start a goroutine that closes one of the checked-out connections and checks it in. + // Expect that the checked-in connection is closed and allows blocked checkOut() to + // complete. Assert that the time between checking in the closed connection and when the + // checkOut() completes is within 100ms. + var start time.Time + go func() { + c.close() + start = time.Now() + err := p.checkIn(c) + require.NoError(t, err) + }() + _, err = p.checkOut(context.Background()) + require.NoError(t, err) + assert.WithinDurationf( + t, + time.Now(), + start, + 100*time.Millisecond, + "expected checkOut to complete within 100ms of checking in a closed connection") + + assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection") + assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connection") + assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection") + assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection") - err = p.checkIn(c) - require.NoError(t, err) + p.close(context.Background()) + }) + t.Run("canceled context in wait queue", func(t *testing.T) { + t.Parallel() - err = p.checkIn(c) - assert.NotNilf(t, err, "expected an error trying to return the same conn to the pool twice") + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - assert.Equalf(t, 1, p.availableConnectionCount(), "should have returned 1 idle connection to the pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MaxPoolSize: 1, + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - p.close(context.Background()) + // Check out first connection. + _, err = p.checkOut(context.Background()) + require.NoError(t, err) + + // Use a canceled context to check out another connection. + cancelCtx, cancel := context.WithCancel(context.Background()) + cancel() + _, err = p.checkOut(cancelCtx) + assert.NotNilf(t, err, "expected a non-nil error") + + // Assert that error received is WaitQueueTimeoutError with context canceled. + assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError") + if err, ok := err.(WaitQueueTimeoutError); ok { + assert.Equalf(t, context.Canceled, err.Unwrap(), "expected wrapped error to be a context.Canceled") + assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`) + } + + p.close(context.Background()) + }) + t.Run("discards connections closed by the server side", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + + ncs := make(chan net.Conn, 2) + addr := bootstrapConnections(t, 2, func(nc net.Conn) { + // Send all "server-side" connections to a channel so we can + // interact with them during the test. + ncs <- nc + + <-cleanup + _ = nc.Close() }) - t.Run("closes connections if the pool is closed", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) - c, err := p.checkOut(context.Background()) - require.NoError(t, err) - assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") + // Add 1 idle connection to the pool by checking-out and checking-in + // a connection. + conn, err := p.checkOut(context.Background()) + require.NoError(t, err) + err = p.checkIn(conn) + require.NoError(t, err) + assertConnectionsOpened(t, d, 1) + assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") + + // Make that connection appear as if it's been idle for a minute. + conn.idleStart.Store(time.Now().Add(-1 * time.Minute)) + + // Close the "server-side" of the connection we just created. The idle + // connection in the pool is now unusable because the "server-side" + // closed it. + nc := <-ncs + err = nc.Close() + require.NoError(t, err) + + // In a separate goroutine, write a valid wire message to the 2nd + // connection that's about to be created. Stop waiting for a 2nd + // connection after 100ms to prevent leaking a goroutine. + go func() { + select { + case nc := <-ncs: + _, err := nc.Write([]byte{5, 0, 0, 0, 0}) + require.NoError(t, err, "Write error") + case <-time.After(100 * time.Millisecond): + } + }() - p.close(context.Background()) + // Check out a connection and try to read from it. Expect the pool to + // discard the connection that was closed by the "server-side" and + // return a newly created connection instead. + conn, err = p.checkOut(context.Background()) + require.NoError(t, err) + msg, err := conn.readWireMessage(context.Background()) + require.NoError(t, err) + assert.Equal(t, []byte{5, 0, 0, 0, 0}, msg) - err = p.checkIn(c) - require.NoError(t, err) - assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection") - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool") - assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connection in pool") + err = p.checkIn(conn) + require.NoError(t, err) + + assertConnectionsOpened(t, d, 2) + assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") + + p.close(context.Background()) + }) +} + +func TestPool_checkIn(t *testing.T) { + t.Parallel() + + t.Run("cannot return same connection to pool twice", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("can't checkIn a connection from different pool", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - p1 := newPool(poolConfig{ - Address: address.Address(addr.String()), - ConnectTimeout: defaultConnectionTimeout, - }) - err := p1.ready() - require.NoError(t, err) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p.ready() + require.NoError(t, err) - c, err := p1.checkOut(context.Background()) - require.NoError(t, err) + c, err := p.checkOut(context.Background()) + require.NoError(t, err) + assert.Equalf(t, 0, p.availableConnectionCount(), "should be no idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") - p2 := newPool(poolConfig{}) - err = p2.ready() - require.NoError(t, err) + err = p.checkIn(c) + require.NoError(t, err) + + err = p.checkIn(c) + assert.NotNilf(t, err, "expected an error trying to return the same conn to the pool twice") - err = p2.checkIn(c) - assert.Equalf(t, ErrWrongPool, err, "expected ErrWrongPool error") + assert.Equalf(t, 1, p.availableConnectionCount(), "should have returned 1 idle connection to the pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") - p1.close(context.Background()) - p2.close(context.Background()) + p.close(context.Background()) + }) + t.Run("closes connections if the pool is closed", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("bumps the connection idle deadline", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 1, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MaxIdleTime: 100 * time.Millisecond, - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) - defer p.close(context.Background()) + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) - c, err := p.checkOut(context.Background()) - require.NoError(t, err) + c, err := p.checkOut(context.Background()) + require.NoError(t, err) + assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") - // Sleep for 110ms, which will exceed the 100ms connection idle timeout. Then check the - // connection back in and expect that it is not closed because checkIn() should bump the - // connection idle deadline. - time.Sleep(110 * time.Millisecond) - err = p.checkIn(c) - require.NoError(t, err) + p.close(context.Background()) - assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") - assert.Equalf(t, 1, p.availableConnectionCount(), "should have 1 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") + err = p.checkIn(c) + require.NoError(t, err) + assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection") + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool") + assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connection in pool") + }) + t.Run("can't checkIn a connection from different pool", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("sets minPoolSize connection idle deadline", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 4, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MinPoolSize: 3, - MaxIdleTime: 10 * time.Millisecond, - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) - defer p.close(context.Background()) + p1 := newPool(poolConfig{ + Address: address.Address(addr.String()), + ConnectTimeout: defaultConnectionTimeout, + }) + err := p1.ready() + require.NoError(t, err) - // Wait for maintain() to open 3 connections. - assertConnectionsOpened(t, d, 3) + c, err := p1.checkOut(context.Background()) + require.NoError(t, err) - // Sleep for 100ms, which will exceed the 10ms connection idle timeout, then try to check - // out a connection. Expect that all minPoolSize connections checked into the pool by - // maintain() have passed their idle deadline, so checkOut() closes all 3 connections - // and tries to create a new connection. - time.Sleep(100 * time.Millisecond) - _, err = p.checkOut(context.Background()) - require.NoError(t, err) + p2 := newPool(poolConfig{}) + err = p2.ready() + require.NoError(t, err) - assertConnectionsClosed(t, d, 3) - assert.Equalf(t, 4, d.lenopened(), "should have opened 4 connections") - assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") + err = p2.checkIn(c) + assert.Equalf(t, ErrWrongPool, err, "expected ErrWrongPool error") + + p1.close(context.Background()) + p2.close(context.Background()) + }) + t.Run("bumps the connection idle deadline", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) + + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MaxIdleTime: 100 * time.Millisecond, + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) + defer p.close(context.Background()) + + c, err := p.checkOut(context.Background()) + require.NoError(t, err) + + // Sleep for 110ms, which will exceed the 100ms connection idle timeout. Then check the + // connection back in and expect that it is not closed because checkIn() should bump the + // connection idle deadline. + time.Sleep(110 * time.Millisecond) + err = p.checkIn(c) + require.NoError(t, err) + + assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections") + assert.Equalf(t, 1, p.availableConnectionCount(), "should have 1 idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") }) - t.Run("maintain", func(t *testing.T) { + t.Run("sets minPoolSize connection idle deadline", func(t *testing.T) { t.Parallel() - t.Run("creates MinPoolSize connections shortly after calling ready", func(t *testing.T) { - t.Parallel() + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 4, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 3, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MinPoolSize: 3, + MaxIdleTime: 10 * time.Millisecond, + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) + defer p.close(context.Background()) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MinPoolSize: 3, - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) + // Wait for maintain() to open 3 connections. + assertConnectionsOpened(t, d, 3) - assertConnectionsOpened(t, d, 3) - assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") - assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") + // Sleep for 100ms, which will exceed the 10ms connection idle timeout, then try to check + // out a connection. Expect that all minPoolSize connections checked into the pool by + // maintain() have passed their idle deadline, so checkOut() closes all 3 connections + // and tries to create a new connection. + time.Sleep(100 * time.Millisecond) + _, err = p.checkOut(context.Background()) + require.NoError(t, err) - p.close(context.Background()) + assertConnectionsClosed(t, d, 3) + assert.Equalf(t, 4, d.lenopened(), "should have opened 4 connections") + assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool") + }) +} + +func TestPool_maintain(t *testing.T) { + t.Parallel() + + t.Run("creates MinPoolSize connections shortly after calling ready", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 3, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("when MinPoolSize > MaxPoolSize should not exceed MaxPoolSize connections", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 20, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MinPoolSize: 20, - MaxPoolSize: 2, - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MinPoolSize: 3, + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) - assertConnectionsOpened(t, d, 2) - assert.Equalf(t, 2, p.availableConnectionCount(), "should be 2 idle connections in pool") - assert.Equalf(t, 2, p.totalConnectionCount(), "should be 2 total connection in pool") + assertConnectionsOpened(t, d, 3) + assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") + assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") - p.close(context.Background()) + p.close(context.Background()) + }) + t.Run("when MinPoolSize > MaxPoolSize should not exceed MaxPoolSize connections", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 20, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("removes perished connections", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 5, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - // Set the pool's maintain interval to 10ms so that it allows the test to run quickly. - MaintainInterval: 10 * time.Millisecond, - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() - require.NoError(t, err) + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MinPoolSize: 20, + MaxPoolSize: 2, + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) - // Check out and check in 3 connections. Assert that there are 3 total and 3 idle - // connections in the pool. - conns := make([]*connection, 3) - for i := range conns { - conns[i], err = p.checkOut(context.Background()) - require.NoError(t, err) - } - for _, c := range conns { - err = p.checkIn(c) - require.NoError(t, err) - } - assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections") - assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") - assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") - - // Manually make two of the connections in the idle connections stack perished due to - // passing the connection's idle deadline. Assert that maintain() closes the two - // perished connections and removes them from the pool. - p.idleMu.Lock() - for i := 0; i < 2; i++ { - p.idleConns[i].idleTimeout = time.Millisecond - p.idleConns[i].idleStart.Store(time.Now().Add(-1 * time.Hour)) - } - p.idleMu.Unlock() - assertConnectionsClosed(t, d, 2) - assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") - assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") + assertConnectionsOpened(t, d, 2) + assert.Equalf(t, 2, p.availableConnectionCount(), "should be 2 idle connections in pool") + assert.Equalf(t, 2, p.totalConnectionCount(), "should be 2 total connection in pool") - p.close(context.Background()) + p.close(context.Background()) + }) + t.Run("removes perished connections", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 5, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) - t.Run("removes perished connections and replaces them to maintain MinPoolSize", func(t *testing.T) { - t.Parallel() - - cleanup := make(chan struct{}) - defer close(cleanup) - addr := bootstrapConnections(t, 5, func(nc net.Conn) { - <-cleanup - _ = nc.Close() - }) - d := newdialer(&net.Dialer{}) - p := newPool(poolConfig{ - Address: address.Address(addr.String()), - MinPoolSize: 3, - // Set the pool's maintain interval to 10ms so that it allows the test to run quickly. - MaintainInterval: 10 * time.Millisecond, - ConnectTimeout: defaultConnectionTimeout, - }, WithDialer(func(Dialer) Dialer { return d })) - err := p.ready() + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + // Set the pool's maintain interval to 10ms so that it allows the test to run quickly. + MaintainInterval: 10 * time.Millisecond, + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) + + // Check out and check in 3 connections. Assert that there are 3 total and 3 idle + // connections in the pool. + conns := make([]*connection, 3) + for i := range conns { + conns[i], err = p.checkOut(context.Background()) require.NoError(t, err) - assertConnectionsOpened(t, d, 3) - assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") - assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") - - p.idleMu.Lock() - for i := 0; i < 2; i++ { - p.idleConns[i].idleTimeout = time.Millisecond - p.idleConns[i].idleStart.Store(time.Now().Add(-1 * time.Hour)) - } - p.idleMu.Unlock() - assertConnectionsClosed(t, d, 2) - assertConnectionsOpened(t, d, 5) - assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") - assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") + } + for _, c := range conns { + err = p.checkIn(c) + require.NoError(t, err) + } + assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections") + assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") + assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") + + // Manually make two of the connections in the idle connections stack perished due to + // passing the connection's idle deadline. Assert that maintain() closes the two + // perished connections and removes them from the pool. + p.idleMu.Lock() + for i := 0; i < 2; i++ { + p.idleConns[i].idleTimeout = time.Millisecond + p.idleConns[i].idleStart.Store(time.Now().Add(-1 * time.Hour)) + } + p.idleMu.Unlock() + assertConnectionsClosed(t, d, 2) + assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool") + assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool") - p.close(context.Background()) + p.close(context.Background()) + }) + t.Run("removes perished connections and replaces them to maintain MinPoolSize", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + addr := bootstrapConnections(t, 5, func(nc net.Conn) { + <-cleanup + _ = nc.Close() }) + + d := newdialer(&net.Dialer{}) + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MinPoolSize: 3, + // Set the pool's maintain interval to 10ms so that it allows the test to run quickly. + MaintainInterval: 10 * time.Millisecond, + ConnectTimeout: defaultConnectionTimeout, + }, WithDialer(func(Dialer) Dialer { return d })) + err := p.ready() + require.NoError(t, err) + assertConnectionsOpened(t, d, 3) + assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") + assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") + + p.idleMu.Lock() + for i := 0; i < 2; i++ { + p.idleConns[i].idleTimeout = time.Millisecond + p.idleConns[i].idleStart.Store(time.Now().Add(-1 * time.Hour)) + } + p.idleMu.Unlock() + assertConnectionsClosed(t, d, 2) + assertConnectionsOpened(t, d, 5) + assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool") + assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool") + + p.close(context.Background()) }) }