-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement replication validation #22581
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package launcher_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/influxdata/influx-cli/v2/api" | ||
"github.com/influxdata/influxdb/v2" | ||
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher" | ||
"github.com/influxdata/influxdb/v2/kit/feature" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestValidateReplication_Valid(t *testing.T) { | ||
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { | ||
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"} | ||
}) | ||
defer l.ShutdownOrFail(t, ctx) | ||
client := l.APIClient(t) | ||
|
||
// Create a "remote" connection to the launcher from itself. | ||
remote, err := client.RemoteConnectionsApi.PostRemoteConnection(ctx). | ||
RemoteConnectionCreationRequest(api.RemoteConnectionCreationRequest{ | ||
Name: "self", | ||
OrgID: l.Org.ID.String(), | ||
RemoteURL: l.URL().String(), | ||
RemoteAPIToken: l.Auth.Token, | ||
RemoteOrgID: l.Org.ID.String(), | ||
AllowInsecureTLS: false, | ||
}).Execute() | ||
require.NoError(t, err) | ||
|
||
// Validate the replication before creating it. | ||
createReq := api.ReplicationCreationRequest{ | ||
Name: "test", | ||
OrgID: l.Org.ID.String(), | ||
RemoteID: remote.Id, | ||
LocalBucketID: l.Bucket.ID.String(), | ||
RemoteBucketID: l.Bucket.ID.String(), | ||
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes, | ||
} | ||
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Validate(true).Execute() | ||
require.NoError(t, err) | ||
|
||
// Create the replication. | ||
replication, err := client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Execute() | ||
require.NoError(t, err) | ||
|
||
// Ensure the replication is marked as valid. | ||
require.NoError(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute()) | ||
|
||
// Create a new auth token that can only write to the bucket. | ||
auth := influxdb.Authorization{ | ||
Status: "active", | ||
OrgID: l.Org.ID, | ||
UserID: l.User.ID, | ||
Permissions: []influxdb.Permission{{ | ||
Action: "write", | ||
Resource: influxdb.Resource{ | ||
Type: influxdb.BucketsResourceType, | ||
ID: &l.Bucket.ID, | ||
OrgID: &l.Org.ID, | ||
}, | ||
}}, | ||
CRUDLog: influxdb.CRUDLog{}, | ||
} | ||
require.NoError(t, l.AuthorizationService(t).CreateAuthorization(ctx, &auth)) | ||
|
||
// Update the remote to use the new token. | ||
_, err = client.RemoteConnectionsApi.PatchRemoteConnectionByID(ctx, remote.Id). | ||
RemoteConnenctionUpdateRequest(api.RemoteConnenctionUpdateRequest{RemoteAPIToken: &auth.Token}). | ||
Execute() | ||
require.NoError(t, err) | ||
|
||
// Ensure the replication is still valid. | ||
require.NoError(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute()) | ||
} | ||
|
||
func TestValidateReplication_Invalid(t *testing.T) { | ||
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) { | ||
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"} | ||
}) | ||
defer l.ShutdownOrFail(t, ctx) | ||
client := l.APIClient(t) | ||
|
||
// Create a "remote" connection to the launcher from itself, | ||
// but with a bad auth token. | ||
remote, err := client.RemoteConnectionsApi.PostRemoteConnection(ctx). | ||
RemoteConnectionCreationRequest(api.RemoteConnectionCreationRequest{ | ||
Name: "self", | ||
OrgID: l.Org.ID.String(), | ||
RemoteURL: l.URL().String(), | ||
RemoteAPIToken: "foo", | ||
RemoteOrgID: l.Org.ID.String(), | ||
AllowInsecureTLS: false, | ||
}).Execute() | ||
require.NoError(t, err) | ||
|
||
// Validate the replication before creating it. This should fail because of the bad | ||
// auth token in the linked remote. | ||
createReq := api.ReplicationCreationRequest{ | ||
Name: "test", | ||
OrgID: l.Org.ID.String(), | ||
RemoteID: remote.Id, | ||
LocalBucketID: l.Bucket.ID.String(), | ||
RemoteBucketID: l.Bucket.ID.String(), | ||
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes, | ||
} | ||
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Validate(true).Execute() | ||
require.Error(t, err) | ||
|
||
// Create the replication even though it failed validation. | ||
replication, err := client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Execute() | ||
require.NoError(t, err) | ||
|
||
// Ensure the replication is marked as invalid. | ||
require.Error(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute()) | ||
|
||
// Create a new auth token that can only write to the bucket. | ||
auth := influxdb.Authorization{ | ||
Status: "active", | ||
OrgID: l.Org.ID, | ||
UserID: l.User.ID, | ||
Permissions: []influxdb.Permission{{ | ||
Action: "write", | ||
Resource: influxdb.Resource{ | ||
Type: influxdb.BucketsResourceType, | ||
ID: &l.Bucket.ID, | ||
OrgID: &l.Org.ID, | ||
}, | ||
}}, | ||
CRUDLog: influxdb.CRUDLog{}, | ||
} | ||
require.NoError(t, l.AuthorizationService(t).CreateAuthorization(ctx, &auth)) | ||
|
||
// Update the remote to use the new token. | ||
_, err = client.RemoteConnectionsApi.PatchRemoteConnectionByID(ctx, remote.Id). | ||
RemoteConnenctionUpdateRequest(api.RemoteConnenctionUpdateRequest{RemoteAPIToken: &auth.Token}). | ||
Execute() | ||
require.NoError(t, err) | ||
|
||
// Ensure the replication is now valid. | ||
require.NoError(t, client.ReplicationsApi.PostValidateReplicationByID(ctx, replication.Id).Execute()) | ||
|
||
// Create a new bucket. | ||
bucket2 := influxdb.Bucket{ | ||
OrgID: l.Org.ID, | ||
Name: "bucket2", | ||
RetentionPeriod: 0, | ||
ShardGroupDuration: 0, | ||
} | ||
require.NoError(t, l.BucketService(t).CreateBucket(ctx, &bucket2)) | ||
bucket2Id := bucket2.ID.String() | ||
|
||
// Updating the replication to point at the new bucket should fail validation. | ||
_, err = client.ReplicationsApi.PatchReplicationByID(ctx, replication.Id). | ||
ReplicationUpdateRequest(api.ReplicationUpdateRequest{RemoteBucketID: &bucket2Id}). | ||
Validate(true). | ||
Execute() | ||
require.Error(t, err) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,16 +2,57 @@ package internal | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/url" | ||
"runtime" | ||
|
||
"github.com/influxdata/influx-cli/v2/api" | ||
"github.com/influxdata/influxdb/v2" | ||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors" | ||
) | ||
|
||
func NewValidator() *stubValidator { | ||
return &stubValidator{} | ||
func NewValidator() *noopWriteValidator { | ||
return &noopWriteValidator{} | ||
} | ||
|
||
type stubValidator struct{} | ||
// noopWriteValidator checks if replication parameters are valid by attempting to write an empty payload | ||
// to the remote host using the configured information. | ||
type noopWriteValidator struct{} | ||
|
||
func (s stubValidator) ValidateReplication(ctx context.Context, config *ReplicationHTTPConfig) error { | ||
return &ierrors.Error{Code: ierrors.ENotImplemented, Msg: "replication validation not implemented"} | ||
var userAgent = fmt.Sprintf( | ||
"influxdb-oss/%s (%s) Sha/%s Date/%s", | ||
influxdb.GetBuildInfo().Version, | ||
runtime.GOOS, | ||
influxdb.GetBuildInfo().Commit, | ||
influxdb.GetBuildInfo().Date) | ||
|
||
func (s noopWriteValidator) ValidateReplication(ctx context.Context, config *ReplicationHTTPConfig) error { | ||
u, err := url.Parse(config.RemoteURL) | ||
if err != nil { | ||
return &ierrors.Error{ | ||
Code: ierrors.EInvalid, | ||
Msg: fmt.Sprintf("host URL %q is invalid", config.RemoteURL), | ||
Err: err, | ||
} | ||
} | ||
params := api.ConfigParams{ | ||
Host: u, | ||
UserAgent: userAgent, | ||
Token: &config.RemoteToken, | ||
AllowInsecureTLS: config.AllowInsecureTLS, | ||
} | ||
client := api.NewAPIClient(api.NewAPIConfig(params)).WriteApi | ||
|
||
noopReq := client.PostWrite(ctx). | ||
Org(config.RemoteOrgID.String()). | ||
Bucket(config.RemoteBucketID.String()). | ||
Body([]byte{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we checked this also doesn't error in cloud? Should we maybe add a cloud unit test that this works? It is and odd kind of request to attempt to write with no data, I can see us rejecting or otherwise flagging that it is weird. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked (it used to error, but I changed the behavior as part of an EAR last year). Agreed it would be good to have a regression test to make sure things don't revert. Ideally we could instead have/use some sort of "can I?" API where you submit a resource/action and get back a yes/no. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I prefer this kind of check (where we actually do the thing) vs a 'can I' API which might drift from the actual checks on the 'real' API. I just want to make sure it works. |
||
|
||
if err := noopReq.Execute(); err != nil { | ||
return &ierrors.Error{ | ||
Code: ierrors.EInvalid, | ||
Err: err, | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would these two tests run faster if they were the same test (since we don't have to setup / tear down the server)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, though it's not as slow as it could be since they're using the inmem metadata store instead of a real boltDB.