Skip to content

Commit

Permalink
Extract TenantManager out of pusher into its own package
Browse files Browse the repository at this point in the history
The intention here is that it should be possible to use the tenant
manager in more places than just the pusher, and it should be possible
to share the cache.

Signed-off-by: Marcelo E. Magallon <marcelo.magallon@grafana.com>
  • Loading branch information
mem committed Sep 6, 2023
1 parent ae3790a commit 3cbf949
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
3 changes: 2 additions & 1 deletion cmd/synthetic-monitoring-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
pusherV1 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v1"
pusherV2 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v2"
"github.com/grafana/synthetic-monitoring-agent/internal/tenants"
"github.com/grafana/synthetic-monitoring-agent/internal/version"
"github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
)
Expand Down Expand Up @@ -208,7 +209,7 @@ func run(args []string, stdout io.Writer) error {
k6Runner = k6runner.New(*k6URI)
}

tm := pusher.NewTenantManager(ctx, synthetic_monitoring.NewTenantsClient(conn), tenantCh, 15*time.Minute)
tm := tenants.NewManager(ctx, synthetic_monitoring.NewTenantsClient(conn), tenantCh, 15*time.Minute)

pusherRegistry := pusher.NewRegistry[pusher.Factory]()
pusherRegistry.MustRegister(pusherV1.Name, pusherV1.NewPublisher)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package pusher
package tenants

import (
"context"
"sync"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
)

type TenantManager struct {
type Manager struct {
tenantCh <-chan sm.Tenant
tenantsClient sm.TenantsClient
timeout time.Duration
tenantsMutex sync.Mutex
tenants map[int64]*tenantInfo
}

var _ TenantProvider = &TenantManager{}
var _ pusher.TenantProvider = &Manager{}

type tenantInfo struct {
mutex sync.Mutex // protects the entire structure
Expand All @@ -31,8 +32,8 @@ type tenantInfo struct {
//
// A new goroutine is started which stops when the provided context is
// cancelled.
func NewTenantManager(ctx context.Context, tenantsClient sm.TenantsClient, tenantCh <-chan sm.Tenant, timeout time.Duration) *TenantManager {
tm := &TenantManager{
func NewManager(ctx context.Context, tenantsClient sm.TenantsClient, tenantCh <-chan sm.Tenant, timeout time.Duration) *Manager {
tm := &Manager{
tenantCh: tenantCh,
tenantsClient: tenantsClient,
timeout: timeout,
Expand All @@ -44,7 +45,7 @@ func NewTenantManager(ctx context.Context, tenantsClient sm.TenantsClient, tenan
return tm
}

func (tm *TenantManager) run(ctx context.Context) {
func (tm *Manager) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand All @@ -56,7 +57,7 @@ func (tm *TenantManager) run(ctx context.Context) {
}
}

func (tm *TenantManager) updateTenant(tenant sm.Tenant) {
func (tm *Manager) updateTenant(tenant sm.Tenant) {
tm.tenantsMutex.Lock()

info, found := tm.tenants[tenant.Id]
Expand Down Expand Up @@ -94,7 +95,7 @@ func (tm *TenantManager) updateTenant(tenant sm.Tenant) {

// GetTenant retrieves the tenant specified by `req`, either from a
// local cache or by making a request to the API.
func (tm *TenantManager) GetTenant(ctx context.Context, req *sm.TenantInfo) (*sm.Tenant, error) {
func (tm *Manager) GetTenant(ctx context.Context, req *sm.TenantInfo) (*sm.Tenant, error) {
tm.tenantsMutex.Lock()
now := time.Now()
info, found := tm.tenants[req.Id]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pusher
package tenants

import (
"context"
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestTenantManagerGetTenant(t *testing.T) {

tenantCh := make(chan sm.Tenant)

tm := NewTenantManager(ctx, &tc, tenantCh, 500*time.Millisecond)
tm := NewManager(ctx, &tc, tenantCh, 500*time.Millisecond)

t1 := tc.tenants[1]

Expand Down

0 comments on commit 3cbf949

Please sign in to comment.