Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(contrib/registry/consul): add consul registry support #4016

Merged
merged 28 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/consul/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3.7'
services:

consul-server:
image: loads/consul:1.15
image: consul:1.15
container_name: consul-server
restart: always
volumes:
Expand All @@ -17,7 +17,7 @@ services:
command: "agent"

consul-client:
image: loads/consul:1.15
image: consul:1.15
container_name: consul-client
restart: always
volumes:
Expand Down
40 changes: 38 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
- feature/**
- enhance/**
- fix/**
- feat/**
pull_request:
branches:
- master
Expand All @@ -22,12 +23,13 @@ on:
- feature/**
- enhance/**
- fix/**
- feat/**

jobs:
golangci:
strategy:
matrix:
go-version: [ '1.20','1.21.4','1.22','1.23' ]
go-version: [ 'stable' ]
name: golangci-lint
runs-on: ubuntu-latest
steps:
Expand All @@ -42,4 +44,38 @@ jobs:
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.62.2
args: --timeout 3m0s
only-new-issues: true
github-token: ${{ secrets.GITHUB_TOKEN }}
args: --timeout 3m0s --fix
- name: Install gci
run: go install github.com/daixiang0/gci@latest
- name: Run gci
run: |
gci write --custom-order \
--skip-generated \
--skip-vendor \
-s standard \
-s blank \
-s default \
-s dot \
-s "prefix(github.com/gogf/gf/v2)" \
-s "prefix(github.com/gogf/gf/cmd)" \
-s "prefix(github.com/gogf/gf/contrib)" \
-s "prefix(github.com/gogf/gf/example)" \
./
- name: Check for changes
id: check_changes
run: |
if [[ -n "$(git status --porcelain)" ]]; then
echo "HAS_CHANGES=true" >> $GITHUB_ENV
else
echo "HAS_CHANGES=false" >> $GITHUB_ENV
fi
- name: Commit and push changes
if: env.HAS_CHANGES == 'true'
run: |
git config --global user.name "github-actions[bot]"
git config --global user.email "github-actions[bot]@users.noreply.github.com"
git add .
git commit -m "Apply gci import order changes"
git push
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ linters-settings:
custom-order: true
# Drops lexical ordering for custom sections.
# Default: false
no-lex-order: true
no-lex-order: false
# https://golangci-lint.run/usage/linters/#revive
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md
revive:
Expand Down
104 changes: 104 additions & 0 deletions contrib/registry/consul/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# GoFrame Consul Registry

Use `consul` as service registration and discovery management.

## Installation
```bash
go get -u github.com/gogf/gf/contrib/registry/consul/v2
```
suggested using `go.mod`:
```bash
require github.com/gogf/gf/contrib/registry/consul/v2 latest
```

## Example

### HTTP Server
```go
package main

import (
"context"

"github.com/gogf/gf/contrib/registry/consul/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gsvc"
)

func main() {
registry, err := consul.New(consul.WithAddress("127.0.0.1:8500"))
if err != nil {
g.Log().Fatal(context.Background(), err)
}
gsvc.SetRegistry(registry)

s := g.Server("hello.svc")
s.BindHandler("/", func(r *ghttp.Request) {
g.Log().Info(r.Context(), "request received")
r.Response.Write("Hello world")
})
s.Run()
}
```

### HTTP Client
```go
package main

import (
"context"
"fmt"
"time"

"github.com/gogf/gf/contrib/registry/consul/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gsel"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/gctx"
)

func main() {
registry, err := consul.New(consul.WithAddress("127.0.0.1:8500"))
if err != nil {
g.Log().Fatal(context.Background(), err)
}
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())

client := g.Client()
for i := 0; i < 100; i++ {
res, err := client.Get(gctx.New(), "http://hello.svc/")
if err != nil {
panic(err)
}
fmt.Println(res.ReadAllString())
res.Close()
time.Sleep(time.Second)
}
}
```

## Configuration Options

The registry supports the following configuration options:

- `WithAddress(address string)`: Sets the Consul server address (default: "127.0.0.1:8500")
- `WithToken(token string)`: Sets the ACL token for Consul authentication

## Features

- Service registration with TTL health check
- Service discovery with health status filtering
- Service metadata support
- Watch support for service changes
- Consul ACL token support

## Requirements

- Go 1.18 or higher
- Consul 1.0 or higher

## License

`GoFrame Consul` is licensed under the [MIT License](../../../LICENSE), 100% free and open-source, forever.
199 changes: 199 additions & 0 deletions contrib/registry/consul/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

// Package consul implements service Registry and Discovery using consul.
package consul

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/hashicorp/consul/api"

"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
)

const (
// DefaultTTL is the default TTL for service registration
DefaultTTL = 20 * time.Second

// DefaultHealthCheckInterval is the default interval for health check
DefaultHealthCheckInterval = 10 * time.Second
)

var (
_ gsvc.Registry = (*Registry)(nil)
)

// Registry implements gsvc.Registry interface using consul.
type Registry struct {
client *api.Client // Consul client
address string // Consul address
options map[string]string // Additional options
mu sync.RWMutex // Mutex for thread safety
}

// Option is the configuration option type for registry.
type Option func(r *Registry)

// WithAddress sets the address for consul client.
func WithAddress(address string) Option {
return func(r *Registry) {
r.mu.Lock()
r.address = address
r.mu.Unlock()
}
}

// WithToken sets the ACL token for consul client.
func WithToken(token string) Option {
return func(r *Registry) {
r.mu.Lock()
r.options["token"] = token
r.mu.Unlock()
}
}

// New creates and returns a new Registry.
func New(opts ...Option) (gsvc.Registry, error) {
r := &Registry{
address: "127.0.0.1:8500",
options: make(map[string]string),
}

// Apply options
for _, opt := range opts {
opt(r)
}

// Create consul config
config := api.DefaultConfig()
r.mu.RLock()
config.Address = r.address
if token, ok := r.options["token"]; ok {
config.Token = token
}
r.mu.RUnlock()

// Create consul client
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
r.client = client

return r, nil
}

// Register registers a service to consul.
func (r *Registry) Register(ctx context.Context, service gsvc.Service) (gsvc.Service, error) {
metadata := service.GetMetadata()
if metadata == nil {
metadata = make(map[string]interface{})
}

// Convert metadata to string map
meta := make(map[string]string)
if len(metadata) > 0 {
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return nil, gerror.Wrap(err, "failed to marshal metadata")
}
meta["metadata"] = string(metadataBytes)
}

// Add version to meta
meta["version"] = service.GetVersion()

endpoints := service.GetEndpoints()
if len(endpoints) == 0 {
return nil, gerror.New("no endpoints found in service")
}

// Create service ID
serviceID := fmt.Sprintf("%s-%s-%s:%d", service.GetName(), service.GetVersion(), endpoints[0].Host(), endpoints[0].Port())

// Create registration
reg := &api.AgentServiceRegistration{
ID: serviceID,
Name: service.GetName(),
Tags: []string{service.GetVersion()},
Meta: meta,
Address: endpoints[0].Host(),
Port: endpoints[0].Port(),
}

// Add health check
checkID := fmt.Sprintf("service:%s", serviceID)
reg.Check = &api.AgentServiceCheck{
CheckID: checkID,
TTL: DefaultTTL.String(),
DeregisterCriticalServiceAfter: "1m",
}

// Register service
if err := r.client.Agent().ServiceRegister(reg); err != nil {
return nil, gerror.Wrap(err, "failed to register service")
}

// Start TTL health check
if err := r.client.Agent().PassTTL(checkID, ""); err != nil {
// Try to deregister service if health check fails
_ = r.client.Agent().ServiceDeregister(serviceID)
return nil, gerror.Wrap(err, "failed to pass TTL health check")
}

// Start TTL health check goroutine
go r.ttlHealthCheck(serviceID)

return service, nil
}

// Deregister deregisters a service from consul.
func (r *Registry) Deregister(ctx context.Context, service gsvc.Service) error {
endpoints := service.GetEndpoints()
if len(endpoints) == 0 {
return gerror.New("no endpoints found in service")
}

// Create service ID
serviceID := fmt.Sprintf("%s-%s-%s:%d", service.GetName(), service.GetVersion(), endpoints[0].Host(), endpoints[0].Port())

return r.client.Agent().ServiceDeregister(serviceID)
}

// ttlHealthCheck maintains the TTL health check for a service
func (r *Registry) ttlHealthCheck(serviceID string) {
ticker := time.NewTicker(DefaultHealthCheckInterval)
defer ticker.Stop()

checkID := fmt.Sprintf("service:%s", serviceID)
for range ticker.C {
if err := r.client.Agent().PassTTL(checkID, ""); err != nil {
return
}
}
}

// GetAddress returns the consul address
func (r *Registry) GetAddress() string {
r.mu.RLock()
defer r.mu.RUnlock()
return r.address
}

// Watch creates and returns a watcher for specified service.
func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) {
watcher, err := newWatcher(r, key)
if err != nil {
return nil, err
}
return watcher, nil
}
Loading
Loading