From 25a652fb1120e79cf570f18861eba1bdac470624 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 1 Dec 2023 13:36:29 +0100 Subject: [PATCH 1/2] feat(store/nats-js-kv): Add kv based natsjs store Signed-off-by: jkoberg --- .golangci.yaml | 8 +- go.work | 3 +- v4/store/nats-js-kv/README.md | 79 +++++ v4/store/nats-js-kv/context.go | 18 ++ v4/store/nats-js-kv/go.mod | 66 ++++ v4/store/nats-js-kv/go.sum | 241 ++++++++++++++ v4/store/nats-js-kv/helpers_test.go | 184 +++++++++++ v4/store/nats-js-kv/keys.go | 109 +++++++ v4/store/nats-js-kv/nats.go | 479 ++++++++++++++++++++++++++++ v4/store/nats-js-kv/nats_test.go | 336 +++++++++++++++++++ v4/store/nats-js-kv/options.go | 75 +++++ v4/store/nats-js-kv/test_data.go | 138 ++++++++ 12 files changed, 1731 insertions(+), 5 deletions(-) create mode 100644 v4/store/nats-js-kv/README.md create mode 100644 v4/store/nats-js-kv/context.go create mode 100644 v4/store/nats-js-kv/go.mod create mode 100644 v4/store/nats-js-kv/go.sum create mode 100644 v4/store/nats-js-kv/helpers_test.go create mode 100644 v4/store/nats-js-kv/keys.go create mode 100644 v4/store/nats-js-kv/nats.go create mode 100644 v4/store/nats-js-kv/nats_test.go create mode 100644 v4/store/nats-js-kv/options.go create mode 100644 v4/store/nats-js-kv/test_data.go diff --git a/.golangci.yaml b/.golangci.yaml index ddff21fa..602941a1 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -112,10 +112,10 @@ linters-settings: min-occurrences: 3 depguard: rules: - main: - deny: - - pkg: "github.com/golang/protobuf/proto" - desc: not allowed + main: + deny: + - pkg: "github.com/golang/protobuf/proto" + desc: not allowed misspell: # Correct spellings using locale preferences for US or UK. # Default is to use a neutral variety of English. diff --git a/go.work b/go.work index 0f32c94a..d2685597 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,4 @@ -go 1.18 +go 1.21 use ( ./v4/acme/certmagic @@ -84,6 +84,7 @@ use ( ./v4/store/memory ./v4/store/mysql ./v4/store/nats-js + ./v4/store/nats-js-kv ./v4/store/redis ./v4/sync/consul ./v4/sync/etcd diff --git a/v4/store/nats-js-kv/README.md b/v4/store/nats-js-kv/README.md new file mode 100644 index 00000000..84db5e6d --- /dev/null +++ b/v4/store/nats-js-kv/README.md @@ -0,0 +1,79 @@ +# NATS JetStream Key Value Store Plugin + +This plugin uses the NATS JetStream [KeyValue Store](https://docs.nats.io/nats-concepts/jetstream/key-value-store) to implement the Go-Micro store interface. + +You can use this plugin like any other store plugin. +To start a local NATS JetStream server run `nats-server -js`. + +To manually create a new storage object call: + +```go +natsjskv.NewStore(opts ...store.Option) +``` + +The Go-Micro store interface uses databases and tables to store keys. These translate +to buckets (key value stores) and key prefixes. If no database (bucket name) is provided, "default" will be used. + +You can call `Write` with any arbitrary database name, and if a bucket with that name does not exist yet, +it will be automatically created. + +If a table name is provided, it will use it to prefix the key as `_`. + +To delete a bucket, and all the key/value pairs in it, pass the `DeleteBucket` option to the `Delete` +method, then they key name will be interpreted as a bucket name, and the bucket will be deleted. + +Next to the default store options, a few NATS specific options are available: + + +```go +// NatsOptions accepts nats.Options +NatsOptions(opts nats.Options) + +// JetStreamOptions accepts multiple nats.JSOpt +JetStreamOptions(opts ...nats.JSOpt) + +// KeyValueOptions accepts multiple nats.KeyValueConfig +// This will create buckets with the provided configs at initialization. +// +// type KeyValueConfig struct { +// Bucket string +// Description string +// MaxValueSize int32 +// History uint8 +// TTL time.Duration +// MaxBytes int64 +// Storage StorageType +// Replicas int +// Placement *Placement +// RePublish *RePublish +// Mirror *StreamSource +// Sources []*StreamSource +} +KeyValueOptions(cfg ...*nats.KeyValueConfig) + +// DefaultTTL sets the default TTL to use for new buckets +// By default no TTL is set. +// +// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL. +// Either set a default TTL with this option or provide bucket specific options +// with ObjectStoreOptions +DefaultTTL(ttl time.Duration) + +// DefaultMemory sets the default storage type to memory only. +// +// The default is file storage, persisting storage between service restarts. +// Be aware that the default storage location of NATS the /tmp dir is, and thus +// won't persist reboots. +DefaultMemory() + +// DefaultDescription sets the default description to use when creating new +// buckets. The default is "Store managed by go-micro" +DefaultDescription(text string) + +// DeleteBucket will use the key passed to Delete as a bucket (database) name, +// and delete the bucket. +// This option should not be combined with the store.DeleteFrom option, as +// that will overwrite the delete action. +DeleteBucket() +``` + diff --git a/v4/store/nats-js-kv/context.go b/v4/store/nats-js-kv/context.go new file mode 100644 index 00000000..e5753f24 --- /dev/null +++ b/v4/store/nats-js-kv/context.go @@ -0,0 +1,18 @@ +package natsjskv + +import ( + "context" + + "go-micro.dev/v4/store" +) + +// setStoreOption returns a function to setup a context with given value. +func setStoreOption(k, v interface{}) store.Option { + return func(o *store.Options) { + if o.Context == nil { + o.Context = context.Background() + } + + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/v4/store/nats-js-kv/go.mod b/v4/store/nats-js-kv/go.mod new file mode 100644 index 00000000..736074ff --- /dev/null +++ b/v4/store/nats-js-kv/go.mod @@ -0,0 +1,66 @@ +module github.com/go-micro/plugins/v4/store/nats-js-kv + +go 1.21 + +require ( + github.com/cornelk/hashmap v1.0.8 + github.com/nats-io/nats-server/v2 v2.8.4 +) + +require ( + github.com/Microsoft/go-winio v0.5.2 // indirect + github.com/ProtonMail/go-crypto v0.0.0-20220824120805-4b6e5c587895 // indirect + github.com/acomagu/bufpipe v1.0.3 // indirect + github.com/bitly/go-simplejson v0.5.0 // indirect + github.com/cloudflare/circl v1.2.0 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emirpasic/gods v1.18.1 // indirect + github.com/fsnotify/fsnotify v1.5.4 // indirect + github.com/go-git/gcfg v1.5.0 // indirect + github.com/go-git/go-billy/v5 v5.3.1 // indirect + github.com/go-git/go-git/v5 v5.4.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.6 // indirect + github.com/imdario/mergo v0.3.13 // indirect + github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect + github.com/kevinburke/ssh_config v1.2.0 // indirect + github.com/klauspost/compress v1.17.0 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect + github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/sergi/go-diff v1.2.0 // indirect + github.com/test-go/testify v1.1.4 // indirect + github.com/urfave/cli/v2 v2.14.0 // indirect + github.com/xanzy/ssh-agent v0.3.2 // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + golang.org/x/mod v0.8.0 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect + golang.org/x/tools v0.6.0 // indirect + golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect + gopkg.in/warnings.v0 v0.1.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +require ( + github.com/google/uuid v1.3.0 + github.com/miekg/dns v1.1.50 // indirect + github.com/nats-io/nats.go v1.31.0 + github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.7.1 + go-micro.dev/v4 v4.9.0 + golang.org/x/crypto v0.6.0 // indirect + golang.org/x/net v0.6.0 // indirect + golang.org/x/sync v0.1.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/v4/store/nats-js-kv/go.sum b/v4/store/nats-js-kv/go.sum new file mode 100644 index 00000000..b2e17e36 --- /dev/null +++ b/v4/store/nats-js-kv/go.sum @@ -0,0 +1,241 @@ +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0= +github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= +github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= +github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo= +github.com/ProtonMail/go-crypto v0.0.0-20220824120805-4b6e5c587895 h1:NsReiLpErIPzRrnogAXYwSoU7txA977LjDGrbkewJbg= +github.com/ProtonMail/go-crypto v0.0.0-20220824120805-4b6e5c587895/go.mod h1:UBYPn8k0D56RtnR8RFQMjmh4KrZzWJ5o7Z9SYjossQ8= +github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk= +github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4= +github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= +github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= +github.com/bwesterb/go-ristretto v1.2.1/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= +github.com/cloudflare/circl v1.1.0/go.mod h1:prBCrKB9DV4poKZY1l9zBXg2QJY7mvgRvtMxxK7fi4I= +github.com/cloudflare/circl v1.2.0 h1:NheeISPSUcYftKlfrLuOo4T62FkmD4t4jviLfFFYaec= +github.com/cloudflare/circl v1.2.0/go.mod h1:Ch2UgYr6ti2KTtlejELlROl0YIYj7SLjAC8M+INXlMk= +github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= +github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= +github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= +github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0= +github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= +github.com/go-git/gcfg v1.5.0 h1:Q5ViNfGF8zFgyJWPqYwA7qGFoMTEiBmdlkcfRmpIMa4= +github.com/go-git/gcfg v1.5.0/go.mod h1:5m20vg6GwYabIxaOonVkTdrILxQMpEShl1xiMF4ua+E= +github.com/go-git/go-billy/v5 v5.2.0/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0= +github.com/go-git/go-billy/v5 v5.3.1 h1:CPiOUAzKtMRvolEKw+bG1PLRpT7D3LIs3/3ey4Aiu34= +github.com/go-git/go-billy/v5 v5.3.1/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0= +github.com/go-git/go-git-fixtures/v4 v4.2.1 h1:n9gGL1Ct/yIw+nfsfr8s4+sbhT+Ncu2SubfXjIWgci8= +github.com/go-git/go-git-fixtures/v4 v4.2.1/go.mod h1:K8zd3kDUAykwTdDCr+I0per6Y6vMiRR/nnVTBtavnB0= +github.com/go-git/go-git/v5 v5.4.2 h1:BXyZu9t0VkbiHtqrsvdq39UDhGJTl1h55VW6CSC4aY4= +github.com/go-git/go-git/v5 v5.4.2/go.mod h1:gQ1kArt6d+n+BGd+/B/I74HwRTLhth2+zti4ihgckDc= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= +github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= +github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= +github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= +github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= +github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= +github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A= +github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA= +github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= +github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= +github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= +github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= +github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= +github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= +github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= +github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= +github.com/urfave/cli/v2 v2.14.0 h1:sFRL29Dm9JhXSMYb96raDeo/Q/JRyPXPs8u+4CkMlI8= +github.com/urfave/cli/v2 v2.14.0/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= +github.com/xanzy/ssh-agent v0.3.0/go.mod h1:3s9xbODqPuuhK9JV1R321M/FlMZSBvE5aY6eAcqrDh0= +github.com/xanzy/ssh-agent v0.3.2 h1:eKj4SX2Fe7mui28ZgnFW5fmTz1EIr7ugo5s6wDxdHBM= +github.com/xanzy/ssh-agent v0.3.2/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= +github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= +github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go-micro.dev/v4 v4.9.0 h1:pd1CpqMT9hA47jSmX8mfdGK865PkMh95Rwj5RdfqPqE= +go-micro.dev/v4 v4.9.0/go.mod h1:Ju8HrZ5hQSF+QguZ2QUs9Kbe42MHP1tJa/fpP5g07Cs= +golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= +golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210326060303-6b1517762897/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY= +golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210502180810-71e4cd670f79/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220315194320-039c03cc5b86/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= +gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/v4/store/nats-js-kv/helpers_test.go b/v4/store/nats-js-kv/helpers_test.go new file mode 100644 index 00000000..a5317e93 --- /dev/null +++ b/v4/store/nats-js-kv/helpers_test.go @@ -0,0 +1,184 @@ +package natsjskv + +import ( + "context" + "fmt" + "net" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + nserver "github.com/nats-io/nats-server/v2/server" + "github.com/pkg/errors" + "github.com/test-go/testify/require" + "go-micro.dev/v4/store" +) + +func testSetup(ctx context.Context, t *testing.T, opts ...store.Option) store.Store { + t.Helper() + + var err error + var s store.Store + for i := 0; i < 5; i++ { + nCtx, cancel := context.WithCancel(ctx) + addr := startNatsServer(nCtx, t) + + opts = append(opts, store.Nodes(addr)) + s = NewStore(opts...) + + err = s.Init() + if err != nil { + t.Log(errors.Wrap(err, "Error: Server initialization failed, restarting server")) + cancel() + if err = s.Close(); err != nil { + t.Logf("Failed to close store: %v", err) + } + time.Sleep(time.Second) + continue + } + + go func() { + <-ctx.Done() + cancel() + if err = s.Close(); err != nil { + t.Logf("Failed to close store: %v", err) + } + }() + + return s + } + t.Error(errors.Wrap(err, "Store initialization failed")) + return s +} + +func startNatsServer(ctx context.Context, t *testing.T) string { + t.Helper() + natsAddr := getFreeLocalhostAddress() + natsPort, err := strconv.Atoi(strings.Split(natsAddr, ":")[1]) + if err != nil { + t.Logf("Failed to parse port from address: %v", err) + } + + clusterName := "gomicro-store-test-cluster" + + // start the NATS with JetStream server + go natsServer(ctx, + t, + &nserver.Options{ + Host: strings.Split(natsAddr, ":")[0], + Port: natsPort, + Cluster: nserver.ClusterOpts{ + Name: clusterName, + }, + }, + ) + + time.Sleep(2 * time.Second) + + return natsAddr +} + +func getFreeLocalhostAddress() string { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "" + } + + addr := l.Addr().String() + if err := l.Close(); err != nil { + return addr + } + return addr +} + +func natsServer(ctx context.Context, t *testing.T, opts *nserver.Options) { + t.Helper() + + opts.TLSTimeout = 180 + server, err := nserver.NewServer( + opts, + ) + require.NoError(t, err) + if err != nil { + return + } + defer server.Shutdown() + + server.SetLoggerV2( + NewLogWrapper(), + false, false, false, + ) + + tmpdir := t.TempDir() + natsdir := filepath.Join(tmpdir, "nats-js") + jsConf := &nserver.JetStreamConfig{ + StoreDir: natsdir, + } + + // first start NATS + go server.Start() + time.Sleep(time.Second) + + // second start JetStream + err = server.EnableJetStream(jsConf) + require.NoError(t, err) + if err != nil { + return + } + + // This fixes some issues where tests fail because directory cleanup fails + t.Cleanup(func() { + contents, err := filepath.Glob(natsdir + "/*") + if err != nil { + t.Logf("Failed to glob directory: %v", err) + } + for _, item := range contents { + if err := os.RemoveAll(item); err != nil { + t.Logf("Failed to remove file: %v", err) + } + } + if err := os.RemoveAll(natsdir); err != nil { + t.Logf("Failed to remove directory: %v", err) + } + }) + + <-ctx.Done() +} + +func NewLogWrapper() *LogWrapper { + return &LogWrapper{} +} + +type LogWrapper struct { +} + +// Noticef logs a notice statement. +func (l *LogWrapper) Noticef(_ string, _ ...interface{}) { +} + +// Warnf logs a warning statement. +func (l *LogWrapper) Warnf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Fatalf logs a fatal statement. +func (l *LogWrapper) Fatalf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Errorf logs an error statement. +func (l *LogWrapper) Errorf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Debugf logs a debug statement. +func (l *LogWrapper) Debugf(_ string, _ ...interface{}) { +} + +// Tracef logs a trace statement. +func (l *LogWrapper) Tracef(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} diff --git a/v4/store/nats-js-kv/keys.go b/v4/store/nats-js-kv/keys.go new file mode 100644 index 00000000..8fd6f1aa --- /dev/null +++ b/v4/store/nats-js-kv/keys.go @@ -0,0 +1,109 @@ +package natsjskv + +import ( + "encoding/base32" + "strings" +) + +// NatsKey is a convenience function to create a key for the nats kv store. +func NatsKey(table, microkey string) string { + return NewKey(table, microkey, "").NatsKey() +} + +// MicroKey is a convenience function to create a key for the micro interface. +func MicroKey(table, natskey string) string { + return NewKey(table, "", natskey).MicroKey() +} + +// MicroKeyFilter is a convenience function to create a key for the micro interface. +// It returns false if the key does not match the table, prefix or suffix. +func MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) { + k := NewKey(table, "", natskey) + return k.MicroKey(), k.Check(table, prefix, suffix) +} + +// Key represents a key in the store. +// They are used to convert nats keys (base64 encoded) to micro keys (plain text - no table prefix) and vice versa. +type Key struct { + // Plain is the plain key as requested by the go-micro interface. + Plain string + // Full is the full key including the table prefix. + Full string + // Encoded is the base64 encoded key as used by the nats kv store. + Encoded string +} + +// NewKey creates a new key. Either plain or encoded must be set. +func NewKey(table string, plain, encoded string) *Key { + k := &Key{ + Plain: plain, + Encoded: encoded, + } + + switch { + case k.Plain != "": + k.Full = getKey(k.Plain, table) + k.Encoded = encode(k.Full) + case k.Encoded != "": + k.Full = decode(k.Encoded) + k.Plain = trimKey(k.Full, table) + } + + return k +} + +// NatsKey returns a key the nats kv store can work with. +func (k *Key) NatsKey() string { + return k.Encoded +} + +// MicroKey returns a key the micro interface can work with. +func (k *Key) MicroKey() string { + return k.Plain +} + +// Check returns false if the key does not match the table, prefix or suffix. +func (k *Key) Check(table, prefix, suffix string) bool { + if table != "" && k.Full != getKey(k.Plain, table) { + return false + } + + if prefix != "" && !strings.HasPrefix(k.Plain, prefix) { + return false + } + + if suffix != "" && !strings.HasSuffix(k.Plain, suffix) { + return false + } + + return true +} + +func encode(s string) string { + return base32.StdEncoding.EncodeToString([]byte(s)) +} + +func decode(s string) string { + b, err := base32.StdEncoding.DecodeString(s) + if err != nil { + return s + } + + return string(b) +} + +func getKey(key, table string) string { + if table != "" { + return table + "_" + key + } + + return key +} + +func trimKey(key, table string) string { + if table != "" { + return strings.TrimPrefix(key, table+"_") + } + + return key +} diff --git a/v4/store/nats-js-kv/nats.go b/v4/store/nats-js-kv/nats.go new file mode 100644 index 00000000..aa7d5045 --- /dev/null +++ b/v4/store/nats-js-kv/nats.go @@ -0,0 +1,479 @@ +// Package natsjskv is a go-micro store plugin for NATS JetStream Key-Value store. +package natsjskv + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/cornelk/hashmap" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "go-micro.dev/v4/store" + "go-micro.dev/v4/util/cmd" +) + +var ( + // ErrBucketNotFound is returned when the requested bucket does not exist. + ErrBucketNotFound = errors.New("Bucket (database) not found") +) + +// KeyValueEnvelope is the data structure stored in the key value store. +type KeyValueEnvelope struct { + Key string `json:"key"` + Data []byte `json:"data"` + Metadata map[string]interface{} `json:"metadata"` +} + +type natsStore struct { + sync.Once + sync.RWMutex + + ttl time.Duration + storageType nats.StorageType + description string + + opts store.Options + nopts nats.Options + jsopts []nats.JSOpt + kvConfigs []*nats.KeyValueConfig + + conn *nats.Conn + js nats.JetStreamContext + buckets *hashmap.Map[string, nats.KeyValue] +} + +func init() { + cmd.DefaultStores["natsjskv"] = NewStore +} + +// NewStore will create a new NATS JetStream Object Store. +func NewStore(opts ...store.Option) store.Store { + options := store.Options{ + Nodes: []string{}, + Database: "default", + Table: "", + Context: context.Background(), + } + + n := &natsStore{ + description: "KeyValue storage administered by go-micro store plugin", + opts: options, + jsopts: []nats.JSOpt{}, + kvConfigs: []*nats.KeyValueConfig{}, + buckets: hashmap.New[string, nats.KeyValue](), + storageType: nats.FileStorage, + } + + n.setOption(opts...) + + return n +} + +// Init initializes the store. It must perform any required setup on the +// backing storage implementation and check that it is ready for use, +// returning any errors. +func (n *natsStore) Init(opts ...store.Option) error { + n.setOption(opts...) + + // Connect to NATS servers + conn, err := n.nopts.Connect() + if err != nil { + return errors.Wrap(err, "Failed to connect to NATS Server") + } + + // Create JetStream context + js, err := conn.JetStream(n.jsopts...) + if err != nil { + return errors.Wrap(err, "Failed to create JetStream context") + } + + n.conn = conn + n.js = js + + // Create default config if no configs present + if len(n.kvConfigs) == 0 { + if _, err := n.mustGetBucketByName(n.opts.Database); err != nil { + return err + } + } + + // Create kv store buckets + for _, cfg := range n.kvConfigs { + if _, err := n.mustGetBucket(cfg); err != nil { + return err + } + } + + return nil +} + +func (n *natsStore) setOption(opts ...store.Option) { + for _, o := range opts { + o(&n.opts) + } + + n.Once.Do(func() { + n.nopts = nats.GetDefaultOptions() + }) + + // Extract options from context + if nopts, ok := n.opts.Context.Value(natsOptionsKey{}).(nats.Options); ok { + n.nopts = nopts + } + + if jsopts, ok := n.opts.Context.Value(jsOptionsKey{}).([]nats.JSOpt); ok { + n.jsopts = append(n.jsopts, jsopts...) + } + + if cfg, ok := n.opts.Context.Value(kvOptionsKey{}).([]*nats.KeyValueConfig); ok { + n.kvConfigs = append(n.kvConfigs, cfg...) + } + + if ttl, ok := n.opts.Context.Value(ttlOptionsKey{}).(time.Duration); ok { + n.ttl = ttl + } + + if sType, ok := n.opts.Context.Value(memoryOptionsKey{}).(nats.StorageType); ok { + n.storageType = sType + } + + if text, ok := n.opts.Context.Value(descriptionOptionsKey{}).(string); ok { + n.description = text + } + + // Assign store option server addresses to nats options + if len(n.opts.Nodes) > 0 { + n.nopts.Url = "" + n.nopts.Servers = n.opts.Nodes + } + + if len(n.nopts.Servers) == 0 && n.nopts.Url == "" { + n.nopts.Url = nats.DefaultURL + } +} + +// Options allows you to view the current options. +func (n *natsStore) Options() store.Options { + return n.opts +} + +// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error. +func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + if err := n.initConn(); err != nil { + return nil, err + } + + opt := store.ReadOptions{} + + for _, o := range opts { + o(&opt) + } + + if opt.Database == "" { + opt.Database = n.opts.Database + } + + if opt.Table == "" { + opt.Table = n.opts.Table + } + + bucket, ok := n.buckets.Get(opt.Database) + if !ok { + return nil, ErrBucketNotFound + } + + keys, err := n.natsKeys(bucket, opt.Table, key, opt.Prefix, opt.Suffix) + if err != nil { + return nil, err + } + + records := make([]*store.Record, 0, len(keys)) + + for _, key := range keys { + rec, ok, err := n.getRecord(bucket, key) + if err != nil { + return nil, err + } + + if ok { + records = append(records, rec) + } + } + + return enforceLimits(records, opt.Limit, opt.Offset), nil +} + +// Write writes a record to the store, and returns an error if the record was not written. +func (n *natsStore) Write(rec *store.Record, opts ...store.WriteOption) error { + if err := n.initConn(); err != nil { + return err + } + + opt := store.WriteOptions{} + for _, o := range opts { + o(&opt) + } + + if opt.Database == "" { + opt.Database = n.opts.Database + } + + if opt.Table == "" { + opt.Table = n.opts.Table + } + + store, err := n.mustGetBucketByName(opt.Database) + if err != nil { + return err + } + + b, err := json.Marshal(KeyValueEnvelope{ + Key: rec.Key, + Data: rec.Value, + Metadata: rec.Metadata, + }) + if err != nil { + return errors.Wrap(err, "Failed to marshal object") + } + + if _, err := store.Put(NatsKey(opt.Table, rec.Key), b); err != nil { + return errors.Wrapf(err, "Failed to store data in bucket '%s'", NatsKey(opt.Table, rec.Key)) + } + + return nil +} + +// Delete removes the record with the corresponding key from the store. +func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error { + if err := n.initConn(); err != nil { + return err + } + + opt := store.DeleteOptions{} + + for _, o := range opts { + o(&opt) + } + + if opt.Database == "" { + opt.Database = n.opts.Database + } + + if opt.Table == "" { + opt.Table = n.opts.Table + } + + if opt.Table == "DELETE_BUCKET" { + n.buckets.Del(key) + + if err := n.js.DeleteKeyValue(key); err != nil { + return errors.Wrap(err, "Failed to delete bucket") + } + + return nil + } + + store, ok := n.buckets.Get(opt.Database) + if !ok { + return ErrBucketNotFound + } + + if err := store.Delete(NatsKey(opt.Table, key)); err != nil { + return errors.Wrap(err, "Failed to delete data") + } + + return nil +} + +// List returns any keys that match, or an empty list with no error if none matched. +func (n *natsStore) List(opts ...store.ListOption) ([]string, error) { + if err := n.initConn(); err != nil { + return nil, err + } + + opt := store.ListOptions{} + for _, o := range opts { + o(&opt) + } + + if opt.Database == "" { + opt.Database = n.opts.Database + } + + if opt.Table == "" { + opt.Table = n.opts.Table + } + + store, ok := n.buckets.Get(opt.Database) + if !ok { + return nil, ErrBucketNotFound + } + + keys, err := n.microKeys(store, opt.Table, opt.Prefix, opt.Suffix) + if err != nil { + return nil, errors.Wrap(err, "Failed to list keys in bucket") + } + + return enforceLimits(keys, opt.Limit, opt.Offset), nil +} + +// Close the store. +func (n *natsStore) Close() error { + n.conn.Close() + return nil +} + +// String returns the name of the implementation. +func (n *natsStore) String() string { + return "NATS JetStream KeyValueStore" +} + +// thread safe way to initialize the connection. +func (n *natsStore) initConn() error { + if n.hasConn() { + return nil + } + + n.Lock() + defer n.Unlock() + + // check if conn was initialized meanwhile + if n.conn != nil { + return nil + } + + return n.Init() +} + +// thread safe way to check if n is initialized. +func (n *natsStore) hasConn() bool { + n.RLock() + defer n.RUnlock() + + return n.conn != nil +} + +// mustGetDefaultBucket returns the bucket with the given name creating it with default configuration if needed. +func (n *natsStore) mustGetBucketByName(name string) (nats.KeyValue, error) { + return n.mustGetBucket(&nats.KeyValueConfig{ + Bucket: name, + Description: n.description, + TTL: n.ttl, + Storage: n.storageType, + }) +} + +// mustGetBucket creates a new bucket if it does not exist yet. +func (n *natsStore) mustGetBucket(kv *nats.KeyValueConfig) (nats.KeyValue, error) { + if store, ok := n.buckets.Get(kv.Bucket); ok { + return store, nil + } + + store, err := n.js.KeyValue(kv.Bucket) + if err != nil { + if !errors.Is(err, nats.ErrBucketNotFound) { + return nil, errors.Wrapf(err, "Failed to get bucket (%s)", kv.Bucket) + } + + store, err = n.js.CreateKeyValue(kv) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create bucket (%s)", kv.Bucket) + } + } + + n.buckets.Set(kv.Bucket, store) + + return store, nil +} + +// getRecord returns the record with the given key from the nats kv store. +func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, bool, error) { + obj, err := bucket.Get(key) + if errors.Is(err, nats.ErrKeyNotFound) { + return nil, false, nil + } else if err != nil { + return nil, false, errors.Wrap(err, "Failed to get object from bucket") + } + + var kv KeyValueEnvelope + if err := json.Unmarshal(obj.Value(), &kv); err != nil { + return nil, false, errors.Wrap(err, "Failed to unmarshal object") + } + + if obj.Operation() != nats.KeyValuePut { + return nil, false, nil + } + + return &store.Record{ + Key: kv.Key, + Value: kv.Data, + Metadata: kv.Metadata, + }, true, nil +} + +func (n *natsStore) natsKeys(bucket nats.KeyValue, table, key string, prefix, suffix bool) ([]string, error) { + if !suffix && !prefix { + return []string{NatsKey(table, key)}, nil + } + + toS := func(s string, b bool) string { + if b { + return s + } + + return "" + } + + keys, _, err := n.getKeys(bucket, table, toS(key, prefix), toS(key, suffix)) + + return keys, err +} + +func (n *natsStore) microKeys(bucket nats.KeyValue, table, prefix, suffix string) ([]string, error) { + _, keys, err := n.getKeys(bucket, table, prefix, suffix) + + return keys, err +} + +func (n *natsStore) getKeys(bucket nats.KeyValue, table string, prefix, suffix string) ([]string, []string, error) { + names, err := bucket.Keys(nats.IgnoreDeletes()) + if errors.Is(err, nats.ErrKeyNotFound) { + return []string{}, []string{}, nil + } else if err != nil { + return []string{}, []string{}, errors.Wrap(err, "Failed to list objects") + } + + natsKeys := make([]string, 0, len(names)) + microKeys := make([]string, 0, len(names)) + + for _, k := range names { + mkey, ok := MicroKeyFilter(table, k, prefix, suffix) + if !ok { + continue + } + + natsKeys = append(natsKeys, k) + microKeys = append(microKeys, mkey) + } + + return natsKeys, microKeys, nil +} + +// enforces offset and limit without causing a panic. +func enforceLimits[V any](recs []V, limit, offset uint) []V { + l := uint(len(recs)) + + from := offset + if from > l { + from = l + } + + to := l + if limit > 0 && offset+limit < l { + to = offset + limit + } + + return recs[from:to] +} diff --git a/v4/store/nats-js-kv/nats_test.go b/v4/store/nats-js-kv/nats_test.go new file mode 100644 index 00000000..c6e5bc8b --- /dev/null +++ b/v4/store/nats-js-kv/nats_test.go @@ -0,0 +1,336 @@ +package natsjskv + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/google/uuid" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "go-micro.dev/v4/store" +) + +func TestNats(t *testing.T) { + // Setup without calling Init on purpose + var err error + var cancel func() + var ctx context.Context + for i := 0; i < 5; i++ { + ctx, cancel = context.WithCancel(context.Background()) + addr := startNatsServer(ctx, t) + s := NewStore(store.Nodes(addr)) + + // Test String method + t.Log("Testing:", s.String()) + + err = basicTest(t, s) + if err != nil { + t.Log(err) + continue + } + + // Test reading non-existing key + r, err := s.Read("this-is-a-random-key") + if err != nil { + t.Fatal(err) + } + if len(r) > 0 { + t.Fatal("Lenth should be 0") + } + err = s.Close() + if err != nil { + t.Logf("Failed to close store: %v", err) + } + cancel() + return + } + cancel() + t.Fatal(err) +} + +func TestOptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s := testSetup(ctx, t, + DefaultMemory(), + + // Having a non-default description will trigger nats.ErrStreamNameAlreadyInUse + // since the buckets have been created in previous tests with a different description. + // + // NOTE: this is only the case with a manually set up server, not with current + // test setup, where new servers are started for each test. + DefaultDescription("My fancy description"), + + // Option has no effect in this context, just to test setting the option + JetStreamOptions(nats.PublishAsyncMaxPending(256)), + + // Sets a custom NATS client name, just to test the NatsOptions() func + NatsOptions(nats.Options{Name: "Go NATS Store Plugin Tests Client"}), + + KeyValueOptions(&nats.KeyValueConfig{ + Bucket: "TestBucketName", + Description: "This bucket is not used", + TTL: 5 * time.Minute, + MaxBytes: 1024, + Storage: nats.MemoryStorage, + Replicas: 1, + }), + ) + defer cancel() + + if err := basicTest(t, s); err != nil { + t.Fatal(err) + } +} + +func TestTTL(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + ttl := 500 * time.Millisecond + s := testSetup(ctx, t, + DefaultTTL(ttl), + + // Since these buckets will be new they will have the new description + DefaultDescription("My fancy description"), + ) + defer cancel() + + // Use a uuid to make sure a new bucket is created when using local server + id := uuid.New().String() + for _, r := range table { + if err := s.Write(r.Record, store.WriteTo(r.Database+id, r.Table)); err != nil { + t.Fatal(err) + } + } + + time.Sleep(ttl * 2) + + for _, r := range table { + res, err := s.Read(r.Record.Key, store.ReadFrom(r.Database+id, r.Table)) + if err != nil { + t.Fatal(err) + } + if len(res) > 0 { + t.Fatal("Fetched record while it should have expired") + } + } +} + +func TestMetaData(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s := testSetup(ctx, t) + defer cancel() + + record := store.Record{ + Key: "KeyOne", + Value: []byte("Some value"), + Metadata: map[string]interface{}{ + "meta-one": "val", + "meta-two": 5, + }, + Expiry: 0, + } + bucket := "meta-data-test" + if err := s.Write(&record, store.WriteTo(bucket, "")); err != nil { + t.Fatal(err) + } + + r, err := s.Read(record.Key, store.ReadFrom(bucket, "")) + if err != nil { + t.Fatal(err) + } + if len(r) == 0 { + t.Fatal("No results found") + } + + m := r[0].Metadata + if m["meta-one"].(string) != record.Metadata["meta-one"].(string) || + m["meta-two"].(float64) != float64(record.Metadata["meta-two"].(int)) { + t.Fatalf("Metadata does not match: (%+v) != (%+v)", m, record.Metadata) + } +} + +func TestDelete(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s := testSetup(ctx, t) + defer cancel() + + for _, r := range table { + if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil { + t.Fatal(err) + } + + if err := s.Delete(r.Record.Key, store.DeleteFrom(r.Database, r.Table)); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + + res, err := s.Read(r.Record.Key, store.ReadFrom(r.Database, r.Table)) + if err != nil { + t.Fatal(err) + } + if len(res) > 0 { + t.Fatalf("Failed to delete %s:%s from %s %s (len: %d)", r.Record.Key, r.Record.Value, r.Database, r.Table, len(res)) + } + } +} + +func TestList(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s := testSetup(ctx, t) + defer cancel() + + for _, r := range table { + if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil { + t.Fatal(err) + } + } + + l := []struct { + Database string + Table string + Length int + Prefix string + Suffix string + Offset int + Limit int + }{ + {Length: 7}, + {Database: "prefix-test", Length: 7}, + {Database: "prefix-test", Offset: 2, Length: 5}, + {Database: "prefix-test", Offset: 2, Limit: 3, Length: 3}, + {Database: "prefix-test", Table: "names", Length: 3}, + {Database: "prefix-test", Table: "cities", Length: 4}, + {Database: "prefix-test", Table: "cities", Suffix: "City", Length: 3}, + {Database: "prefix-test", Table: "cities", Suffix: "City", Limit: 2, Length: 2}, + {Database: "prefix-test", Table: "cities", Suffix: "City", Offset: 1, Length: 2}, + {Prefix: "test", Length: 1}, + {Table: "some_table", Prefix: "test", Suffix: "test", Length: 2}, + } + + for i, entry := range l { + // Test listing keys + keys, err := s.List( + store.ListFrom(entry.Database, entry.Table), + store.ListPrefix(entry.Prefix), + store.ListSuffix(entry.Suffix), + store.ListOffset(uint(entry.Offset)), + store.ListLimit(uint(entry.Limit)), + ) + if err != nil { + t.Fatal(err) + } + if len(keys) != entry.Length { + t.Fatalf("Length of returned keys is invalid for test %d - %+v (%d)", i+1, entry, len(keys)) + } + + // Test reading keys + if entry.Prefix != "" || entry.Suffix != "" { + var key string + options := []store.ReadOption{ + store.ReadFrom(entry.Database, entry.Table), + store.ReadLimit(uint(entry.Limit)), + store.ReadOffset(uint(entry.Offset)), + } + if entry.Prefix != "" { + key = entry.Prefix + options = append(options, store.ReadPrefix()) + } + if entry.Suffix != "" { + key = entry.Suffix + options = append(options, store.ReadSuffix()) + } + r, err := s.Read(key, options...) + if err != nil { + t.Fatal(err) + } + if len(r) != entry.Length { + t.Fatalf("Length of read keys is invalid for test %d - %+v (%d)", i+1, entry, len(r)) + } + } + } +} + +func TestDeleteBucket(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s := testSetup(ctx, t) + defer cancel() + + for _, r := range table { + if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil { + t.Fatal(err) + } + } + + bucket := "prefix-test" + if err := s.Delete(bucket, DeleteBucket()); err != nil { + t.Fatal(err) + } + + keys, err := s.List(store.ListFrom(bucket, "")) + if err != nil && !errors.Is(err, ErrBucketNotFound) { + t.Fatalf("Failed to delete bucket: %v", err) + } + + if len(keys) > 0 { + t.Fatal("Length of key list should be 0 after bucket deletion") + } + + r, err := s.Read("", store.ReadPrefix(), store.ReadFrom(bucket, "")) + if err != nil && !errors.Is(err, ErrBucketNotFound) { + t.Fatalf("Failed to delete bucket: %v", err) + } + if len(r) > 0 { + t.Fatal("Length of record list should be 0 after bucket deletion", len(r)) + } +} + +func TestEnforceLimits(t *testing.T) { + s := []string{"a", "b", "c", "d"} + var testCasts = []struct { + Alias string + Offset uint + Limit uint + Expected []string + }{ + {"plain", 0, 0, []string{"a", "b", "c", "d"}}, + {"offset&limit-1", 1, 3, []string{"b", "c", "d"}}, + {"offset&limit-2", 1, 1, []string{"b"}}, + {"offset=length", 4, 0, []string{}}, + {"offset>length", 222, 0, []string{}}, + {"limit>length", 0, 36, []string{"a", "b", "c", "d"}}, + } + for _, tc := range testCasts { + actual := enforceLimits(s, tc.Limit, tc.Offset) + if !reflect.DeepEqual(actual, tc.Expected) { + t.Fatalf("%s: Expected %v, got %v", tc.Alias, tc.Expected, actual) + } + } +} + +func basicTest(t *testing.T, s store.Store) error { + t.Helper() + for _, test := range table { + if err := s.Write(test.Record, store.WriteTo(test.Database, test.Table)); err != nil { + return errors.Wrap(err, "Failed to write record in basic test") + } + r, err := s.Read(test.Record.Key, store.ReadFrom(test.Database, test.Table)) + if err != nil { + return errors.Wrap(err, "Failed to read record in basic test") + } + if len(r) == 0 { + t.Fatalf("No results found for %s (%s) %s", test.Record.Key, test.Database, test.Table) + } + + key := test.Record.Key + val1 := string(test.Record.Value) + + key2 := r[0].Key + val2 := string(r[0].Value) + if val1 != val2 { + t.Fatalf("Value not equal for (%s: %s) != (%s: %s)", key, val1, key2, val2) + } + } + return nil +} diff --git a/v4/store/nats-js-kv/options.go b/v4/store/nats-js-kv/options.go new file mode 100644 index 00000000..424ce947 --- /dev/null +++ b/v4/store/nats-js-kv/options.go @@ -0,0 +1,75 @@ +package natsjskv + +import ( + "time" + + "github.com/nats-io/nats.go" + "go-micro.dev/v4/store" +) + +// store.Option. +type natsOptionsKey struct{} +type jsOptionsKey struct{} +type kvOptionsKey struct{} +type ttlOptionsKey struct{} +type memoryOptionsKey struct{} +type descriptionOptionsKey struct{} + +// NatsOptions accepts nats.Options. +func NatsOptions(opts nats.Options) store.Option { + return setStoreOption(natsOptionsKey{}, opts) +} + +// JetStreamOptions accepts multiple nats.JSOpt. +func JetStreamOptions(opts ...nats.JSOpt) store.Option { + return setStoreOption(jsOptionsKey{}, opts) +} + +// KeyValueOptions accepts multiple nats.KeyValueConfig +// This will create buckets with the provided configs at initialization. +func KeyValueOptions(cfg ...*nats.KeyValueConfig) store.Option { + return setStoreOption(kvOptionsKey{}, cfg) +} + +// DefaultTTL sets the default TTL to use for new buckets +// +// By default no TTL is set. +// +// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL. +// Either set a default TTL with this option or provide bucket specific options +// +// with ObjectStoreOptions +func DefaultTTL(ttl time.Duration) store.Option { + return setStoreOption(ttlOptionsKey{}, ttl) +} + +// DefaultMemory sets the default storage type to memory only. +// +// The default is file storage, persisting storage between service restarts. +// +// Be aware that the default storage location of NATS the /tmp dir is, and thus +// +// won't persist reboots. +func DefaultMemory() store.Option { + return setStoreOption(memoryOptionsKey{}, nats.MemoryStorage) +} + +// DefaultDescription sets the default description to use when creating new +// +// buckets. The default is "Store managed by go-micro" +func DefaultDescription(text string) store.Option { + return setStoreOption(descriptionOptionsKey{}, text) +} + +// DeleteBucket will use the key passed to Delete as a bucket (database) name, +// +// and delete the bucket. +// +// This option should not be combined with the store.DeleteFrom option, as +// +// that will overwrite the delete action. +func DeleteBucket() store.DeleteOption { + return func(d *store.DeleteOptions) { + d.Table = "DELETE_BUCKET" + } +} diff --git a/v4/store/nats-js-kv/test_data.go b/v4/store/nats-js-kv/test_data.go new file mode 100644 index 00000000..8bdfb55c --- /dev/null +++ b/v4/store/nats-js-kv/test_data.go @@ -0,0 +1,138 @@ +package natsjskv + +import "go-micro.dev/v4/store" + +type test struct { + Record *store.Record + Database string + Table string +} + +var ( + table = []test{ + { + Record: &store.Record{ + Key: "One", + Value: []byte("First value"), + }, + }, + { + Record: &store.Record{ + Key: "Two", + Value: []byte("Second value"), + }, + Table: "prefix_test", + }, + { + Record: &store.Record{ + Key: "Third", + Value: []byte("Third value"), + }, + Database: "new-bucket", + }, + { + Record: &store.Record{ + Key: "Four", + Value: []byte("Fourth value"), + }, + Database: "new-bucket", + Table: "prefix_test", + }, + { + Record: &store.Record{ + Key: "empty-value", + Value: []byte{}, + }, + Database: "new-bucket", + }, + { + Record: &store.Record{ + Key: "Alex", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "names", + }, + { + Record: &store.Record{ + Key: "Jones", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "names", + }, + { + Record: &store.Record{ + Key: "Adrianna", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "names", + }, + { + Record: &store.Record{ + Key: "MexicoCity", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "cities", + }, + { + Record: &store.Record{ + Key: "HoustonCity", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "cities", + }, + { + Record: &store.Record{ + Key: "ZurichCity", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "cities", + }, + { + Record: &store.Record{ + Key: "Helsinki", + Value: []byte("Some value"), + }, + Database: "prefix-test", + Table: "cities", + }, + { + Record: &store.Record{ + Key: "testKeytest", + Value: []byte("Some value"), + }, + Table: "some_table", + }, + { + Record: &store.Record{ + Key: "testSecondtest", + Value: []byte("Some value"), + }, + Table: "some_table", + }, + { + Record: &store.Record{ + Key: "lalala", + Value: []byte("Some value"), + }, + Table: "some_table", + }, + { + Record: &store.Record{ + Key: "testAnothertest", + Value: []byte("Some value"), + }, + }, + { + Record: &store.Record{ + Key: "FobiddenCharactersAreAllowed:|@..+", + Value: []byte("data no matter"), + }, + }, + } +) From 729fa0bddf975b86c1fb373f571ae3849faf1303 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 12 Dec 2023 14:34:54 +0100 Subject: [PATCH 2/2] feat(store/natsjskv): make encoding of keys configurable Signed-off-by: jkoberg --- v4/store/nats-js-kv/helpers_test.go | 2 +- v4/store/nats-js-kv/keys.go | 44 ++++++++++++++++++----------- v4/store/nats-js-kv/nats.go | 15 ++++++---- v4/store/nats-js-kv/nats_test.go | 5 +++- v4/store/nats-js-kv/options.go | 8 ++++++ 5 files changed, 50 insertions(+), 24 deletions(-) diff --git a/v4/store/nats-js-kv/helpers_test.go b/v4/store/nats-js-kv/helpers_test.go index a5317e93..eff394bf 100644 --- a/v4/store/nats-js-kv/helpers_test.go +++ b/v4/store/nats-js-kv/helpers_test.go @@ -26,7 +26,7 @@ func testSetup(ctx context.Context, t *testing.T, opts ...store.Option) store.St nCtx, cancel := context.WithCancel(ctx) addr := startNatsServer(nCtx, t) - opts = append(opts, store.Nodes(addr)) + opts = append(opts, store.Nodes(addr), EncodeKeys()) s = NewStore(opts...) err = s.Init() diff --git a/v4/store/nats-js-kv/keys.go b/v4/store/nats-js-kv/keys.go index 8fd6f1aa..8eb2869c 100644 --- a/v4/store/nats-js-kv/keys.go +++ b/v4/store/nats-js-kv/keys.go @@ -6,24 +6,24 @@ import ( ) // NatsKey is a convenience function to create a key for the nats kv store. -func NatsKey(table, microkey string) string { - return NewKey(table, microkey, "").NatsKey() +func (n *natsStore) NatsKey(table, microkey string) string { + return n.NewKey(table, microkey, "").NatsKey() } // MicroKey is a convenience function to create a key for the micro interface. -func MicroKey(table, natskey string) string { - return NewKey(table, "", natskey).MicroKey() +func (n *natsStore) MicroKey(table, natskey string) string { + return n.NewKey(table, "", natskey).MicroKey() } // MicroKeyFilter is a convenience function to create a key for the micro interface. // It returns false if the key does not match the table, prefix or suffix. -func MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) { - k := NewKey(table, "", natskey) +func (n *natsStore) MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) { + k := n.NewKey(table, "", natskey) return k.MicroKey(), k.Check(table, prefix, suffix) } // Key represents a key in the store. -// They are used to convert nats keys (base64 encoded) to micro keys (plain text - no table prefix) and vice versa. +// They are used to convert nats keys (base32 encoded) to micro keys (plain text - no table prefix) and vice versa. type Key struct { // Plain is the plain key as requested by the go-micro interface. Plain string @@ -34,7 +34,7 @@ type Key struct { } // NewKey creates a new key. Either plain or encoded must be set. -func NewKey(table string, plain, encoded string) *Key { +func (n *natsStore) NewKey(table string, plain, encoded string) *Key { k := &Key{ Plain: plain, Encoded: encoded, @@ -43,9 +43,9 @@ func NewKey(table string, plain, encoded string) *Key { switch { case k.Plain != "": k.Full = getKey(k.Plain, table) - k.Encoded = encode(k.Full) + k.Encoded = encode(k.Full, n.encoding) case k.Encoded != "": - k.Full = decode(k.Encoded) + k.Full = decode(k.Encoded, n.encoding) k.Plain = trimKey(k.Full, table) } @@ -79,17 +79,27 @@ func (k *Key) Check(table, prefix, suffix string) bool { return true } -func encode(s string) string { - return base32.StdEncoding.EncodeToString([]byte(s)) +func encode(s string, alg string) string { + switch alg { + case "base32": + return base32.StdEncoding.EncodeToString([]byte(s)) + default: + return s + } } -func decode(s string) string { - b, err := base32.StdEncoding.DecodeString(s) - if err != nil { +func decode(s string, alg string) string { + switch alg { + case "base32": + b, err := base32.StdEncoding.DecodeString(s) + if err != nil { + return s + } + + return string(b) + default: return s } - - return string(b) } func getKey(key, table string) string { diff --git a/v4/store/nats-js-kv/nats.go b/v4/store/nats-js-kv/nats.go index aa7d5045..13ae81d2 100644 --- a/v4/store/nats-js-kv/nats.go +++ b/v4/store/nats-js-kv/nats.go @@ -30,6 +30,7 @@ type natsStore struct { sync.Once sync.RWMutex + encoding string ttl time.Duration storageType nats.StorageType description string @@ -143,6 +144,10 @@ func (n *natsStore) setOption(opts ...store.Option) { n.description = text } + if encoding, ok := n.opts.Context.Value(keyEncodeOptionsKey{}).(string); ok { + n.encoding = encoding + } + // Assign store option server addresses to nats options if len(n.opts.Nodes) > 0 { n.nopts.Url = "" @@ -238,8 +243,8 @@ func (n *natsStore) Write(rec *store.Record, opts ...store.WriteOption) error { return errors.Wrap(err, "Failed to marshal object") } - if _, err := store.Put(NatsKey(opt.Table, rec.Key), b); err != nil { - return errors.Wrapf(err, "Failed to store data in bucket '%s'", NatsKey(opt.Table, rec.Key)) + if _, err := store.Put(n.NatsKey(opt.Table, rec.Key), b); err != nil { + return errors.Wrapf(err, "Failed to store data in bucket '%s'", n.NatsKey(opt.Table, rec.Key)) } return nil @@ -280,7 +285,7 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error { return ErrBucketNotFound } - if err := store.Delete(NatsKey(opt.Table, key)); err != nil { + if err := store.Delete(n.NatsKey(opt.Table, key)); err != nil { return errors.Wrap(err, "Failed to delete data") } @@ -415,7 +420,7 @@ func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, func (n *natsStore) natsKeys(bucket nats.KeyValue, table, key string, prefix, suffix bool) ([]string, error) { if !suffix && !prefix { - return []string{NatsKey(table, key)}, nil + return []string{n.NatsKey(table, key)}, nil } toS := func(s string, b bool) string { @@ -449,7 +454,7 @@ func (n *natsStore) getKeys(bucket nats.KeyValue, table string, prefix, suffix s microKeys := make([]string, 0, len(names)) for _, k := range names { - mkey, ok := MicroKeyFilter(table, k, prefix, suffix) + mkey, ok := n.MicroKeyFilter(table, k, prefix, suffix) if !ok { continue } diff --git a/v4/store/nats-js-kv/nats_test.go b/v4/store/nats-js-kv/nats_test.go index c6e5bc8b..5f248f5c 100644 --- a/v4/store/nats-js-kv/nats_test.go +++ b/v4/store/nats-js-kv/nats_test.go @@ -20,7 +20,7 @@ func TestNats(t *testing.T) { for i := 0; i < 5; i++ { ctx, cancel = context.WithCancel(context.Background()) addr := startNatsServer(ctx, t) - s := NewStore(store.Nodes(addr)) + s := NewStore(store.Nodes(addr), EncodeKeys()) // Test String method t.Log("Testing:", s.String()) @@ -76,6 +76,9 @@ func TestOptions(t *testing.T) { Storage: nats.MemoryStorage, Replicas: 1, }), + + // Encode keys to avoid character limitations + EncodeKeys(), ) defer cancel() diff --git a/v4/store/nats-js-kv/options.go b/v4/store/nats-js-kv/options.go index 424ce947..a9c9ca61 100644 --- a/v4/store/nats-js-kv/options.go +++ b/v4/store/nats-js-kv/options.go @@ -14,6 +14,7 @@ type kvOptionsKey struct{} type ttlOptionsKey struct{} type memoryOptionsKey struct{} type descriptionOptionsKey struct{} +type keyEncodeOptionsKey struct{} // NatsOptions accepts nats.Options. func NatsOptions(opts nats.Options) store.Option { @@ -61,6 +62,13 @@ func DefaultDescription(text string) store.Option { return setStoreOption(descriptionOptionsKey{}, text) } +// EncodeKeys will "base32" encode the keys. +// This is to work around limited characters usable as keys for the natsjs kv store. +// See details here: https://docs.nats.io/nats-concepts/subjects#characters-allowed-for-subject-names +func EncodeKeys() store.Option { + return setStoreOption(keyEncodeOptionsKey{}, "base32") +} + // DeleteBucket will use the key passed to Delete as a bucket (database) name, // // and delete the bucket.