Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement replication validation #22581

Merged
merged 4 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions cmd/influxd/launcher/replication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package launcher_test

import (
"testing"

"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/stretchr/testify/require"
)

func TestValidateReplication_Valid(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
})
defer l.ShutdownOrFail(t, ctx)
client := l.APIClient(t)

// Create a "remote" connection to the launcher from itself.
remote, err := client.RemoteConnectionsApi.PostRemoteConnection(ctx).
RemoteConnectionCreationRequest(api.RemoteConnectionCreationRequest{
Name: "self",
OrgID: l.Org.ID.String(),
RemoteURL: l.URL().String(),
RemoteAPIToken: l.Auth.Token,
RemoteOrgID: l.Org.ID.String(),
AllowInsecureTLS: false,
}).Execute()
require.NoError(t, err)

// Validate the replication before creating it.
createReq := api.ReplicationCreationRequest{
Name: "test",
OrgID: l.Org.ID.String(),
RemoteID: remote.Id,
LocalBucketID: l.Bucket.ID.String(),
RemoteBucketID: l.Bucket.ID.String(),
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes,
}
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Validate(true).Execute()
require.NoError(t, err)

// Create the replication.
replication, err := client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Execute()
require.NoError(t, err)

// Ensure the replication is marked as valid.
require.NoError(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute())

// Create a new auth token that can only write to the bucket.
auth := influxdb.Authorization{
Status: "active",
OrgID: l.Org.ID,
UserID: l.User.ID,
Permissions: []influxdb.Permission{{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.BucketsResourceType,
ID: &l.Bucket.ID,
OrgID: &l.Org.ID,
},
}},
CRUDLog: influxdb.CRUDLog{},
}
require.NoError(t, l.AuthorizationService(t).CreateAuthorization(ctx, &auth))

// Update the remote to use the new token.
_, err = client.RemoteConnectionsApi.PatchRemoteConnectionByID(ctx, remote.Id).
RemoteConnenctionUpdateRequest(api.RemoteConnenctionUpdateRequest{RemoteAPIToken: &auth.Token}).
Execute()
require.NoError(t, err)

// Ensure the replication is still valid.
require.NoError(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute())
}

func TestValidateReplication_Invalid(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would these two tests run faster if they were the same test (since we don't have to setup / tear down the server)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, though it's not as slow as it could be since they're using the inmem metadata store instead of a real boltDB.

o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
})
defer l.ShutdownOrFail(t, ctx)
client := l.APIClient(t)

// Create a "remote" connection to the launcher from itself,
// but with a bad auth token.
remote, err := client.RemoteConnectionsApi.PostRemoteConnection(ctx).
RemoteConnectionCreationRequest(api.RemoteConnectionCreationRequest{
Name: "self",
OrgID: l.Org.ID.String(),
RemoteURL: l.URL().String(),
RemoteAPIToken: "foo",
RemoteOrgID: l.Org.ID.String(),
AllowInsecureTLS: false,
}).Execute()
require.NoError(t, err)

// Validate the replication before creating it. This should fail because of the bad
// auth token in the linked remote.
createReq := api.ReplicationCreationRequest{
Name: "test",
OrgID: l.Org.ID.String(),
RemoteID: remote.Id,
LocalBucketID: l.Bucket.ID.String(),
RemoteBucketID: l.Bucket.ID.String(),
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes,
}
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Validate(true).Execute()
require.Error(t, err)

// Create the replication even though it failed validation.
replication, err := client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Execute()
require.NoError(t, err)

// Ensure the replication is marked as invalid.
require.Error(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute())

// Create a new auth token that can only write to the bucket.
auth := influxdb.Authorization{
Status: "active",
OrgID: l.Org.ID,
UserID: l.User.ID,
Permissions: []influxdb.Permission{{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.BucketsResourceType,
ID: &l.Bucket.ID,
OrgID: &l.Org.ID,
},
}},
CRUDLog: influxdb.CRUDLog{},
}
require.NoError(t, l.AuthorizationService(t).CreateAuthorization(ctx, &auth))

// Update the remote to use the new token.
_, err = client.RemoteConnectionsApi.PatchRemoteConnectionByID(ctx, remote.Id).
RemoteConnenctionUpdateRequest(api.RemoteConnenctionUpdateRequest{RemoteAPIToken: &auth.Token}).
Execute()
require.NoError(t, err)

// Ensure the replication is now valid.
require.NoError(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute())

// Create a new bucket.
bucket2 := influxdb.Bucket{
OrgID: l.Org.ID,
Name: "bucket2",
RetentionPeriod: 0,
ShardGroupDuration: 0,
}
require.NoError(t, l.BucketService(t).CreateBucket(ctx, &bucket2))
bucket2Id := bucket2.ID.String()

// Updating the replication to point at the new bucket should fail validation.
_, err = client.ReplicationsApi.PatchReplicationByID(ctx, replication.Id).
ReplicationUpdateRequest(api.ReplicationUpdateRequest{RemoteBucketID: &bucket2Id}).
Validate(true).
Execute()
require.Error(t, err)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ require (
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe
github.com/influxdata/flux v0.131.0
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210813175002-13799e7662c0
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210924182719-d0640ad6c4d4
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 // indirect
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ github.com/influxdata/flux v0.131.0 h1:b6vCQyJorkbUEv25eHbNL1XIjSQ4/XMNFW8p9Uytm
github.com/influxdata/flux v0.131.0/go.mod h1:CKvnYe6FHpTj/E0YGI7TcOZdGiYHoToOPSnoa12RtKI=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210813175002-13799e7662c0 h1:llPYnejbp/s9JkkS2xjSlAsdPKqIAsabhAgiOLV1NHw=
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210813175002-13799e7662c0/go.mod h1:3KoUqKdsfmm7CREOuWnbYJZbl6j2akSdQUaLctE42so=
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210924182719-d0640ad6c4d4 h1:brA9egXkPF/ZGKbPu2Vt7GXJ4cv5Oo6eSff4ykhnwTE=
github.com/influxdata/influx-cli/v2 v2.1.1-0.20210924182719-d0640ad6c4d4/go.mod h1:piIN/dAOSRqdZZc2sHO7CORuWUQ0UXdNrjugF3cEr8k=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/influxql v1.1.1-0.20210223160523-b6ab99450c93 h1:4t/8PcmLnI2vrcaHcEKeeLsGxC0WMRaOQdPX9b7DF8Y=
Expand Down
51 changes: 46 additions & 5 deletions replications/internal/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,57 @@ package internal

import (
"context"
"fmt"
"net/url"
"runtime"

"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influxdb/v2"
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
)

func NewValidator() *stubValidator {
return &stubValidator{}
func NewValidator() *noopWriteValidator {
return &noopWriteValidator{}
}

type stubValidator struct{}
// noopWriteValidator checks if replication parameters are valid by attempting to write an empty payload
// to the remote host using the configured information.
type noopWriteValidator struct{}

func (s stubValidator) ValidateReplication(ctx context.Context, config *ReplicationHTTPConfig) error {
return &ierrors.Error{Code: ierrors.ENotImplemented, Msg: "replication validation not implemented"}
var userAgent = fmt.Sprintf(
"influxdb-oss/%s (%s) Sha/%s Date/%s",
influxdb.GetBuildInfo().Version,
runtime.GOOS,
influxdb.GetBuildInfo().Commit,
influxdb.GetBuildInfo().Date)

func (s noopWriteValidator) ValidateReplication(ctx context.Context, config *ReplicationHTTPConfig) error {
u, err := url.Parse(config.RemoteURL)
if err != nil {
return &ierrors.Error{
Code: ierrors.EInvalid,
Msg: fmt.Sprintf("host URL %q is invalid", config.RemoteURL),
Err: err,
}
}
params := api.ConfigParams{
Host: u,
UserAgent: userAgent,
Token: &config.RemoteToken,
AllowInsecureTLS: config.AllowInsecureTLS,
}
client := api.NewAPIClient(api.NewAPIConfig(params)).WriteApi

noopReq := client.PostWrite(ctx).
Org(config.RemoteOrgID.String()).
Bucket(config.RemoteBucketID.String()).
Body([]byte{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we checked this also doesn't error in cloud? Should we maybe add a cloud unit test that this works? It is and odd kind of request to attempt to write with no data, I can see us rejecting or otherwise flagging that it is weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked (it used to error, but I changed the behavior as part of an EAR last year). Agreed it would be good to have a regression test to make sure things don't revert.

Ideally we could instead have/use some sort of "can I?" API where you submit a resource/action and get back a yes/no.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer this kind of check (where we actually do the thing) vs a 'can I' API which might drift from the actual checks on the 'real' API. I just want to make sure it works.


if err := noopReq.Execute(); err != nil {
return &ierrors.Error{
Code: ierrors.EInvalid,
Err: err,
}
}
return nil
}
5 changes: 4 additions & 1 deletion replications/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func (s service) ValidateUpdatedReplication(ctx context.Context, id platform.ID,
if err != nil {
return err
}
if request.RemoteBucketID != nil {
baseConfig.RemoteBucketID = *request.RemoteBucketID
}

if request.RemoteID != nil {
if err := s.populateRemoteHTTPConfig(ctx, *request.RemoteID, baseConfig); err != nil {
Expand Down Expand Up @@ -295,7 +298,7 @@ func (s service) ValidateReplication(ctx context.Context, id platform.ID) error
if err := s.validator.ValidateReplication(ctx, config); err != nil {
return &ierrors.Error{
Code: ierrors.EInvalid,
Msg: "remote failed validation",
Msg: "replication failed validation",
Err: err,
}
}
Expand Down