diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f787fc18cd6f..01fdd412288b 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -446,7 +446,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { } info, err := cfg.sqlInstanceProvider.GetInstance(cfg.rpcContext.MasterCtx, base.SQLInstanceID(nodeID)) if err != nil { - return nil, errors.Errorf("unable to look up descriptor for nsql%d", nodeID) + return nil, errors.Wrapf(err, "unable to look up descriptor for nsql%d", nodeID) } return &util.UnresolvedAddr{AddressField: info.InstanceAddr}, nil } diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go index 363cf52305f5..d7fcecb78bea 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go @@ -122,6 +122,7 @@ func alterTableAddColumn( GeneratedAsIdentityType: desc.GeneratedAsIdentityType, PgAttributeNum: desc.GetPGAttributeNum(), }, + unique: d.Unique.IsUnique, } if ptr := desc.GeneratedAsIdentitySequenceOption; ptr != nil { spec.col.GeneratedAsIdentitySequenceOption = *ptr @@ -277,6 +278,7 @@ type addColumnSpec struct { def *scpb.ColumnDefaultExpression onUpdate *scpb.ColumnOnUpdateExpression comment *scpb.ColumnComment + unique bool } // addColumn is a helper function which adds column element targets and ensures @@ -338,6 +340,11 @@ func addColumn(b BuildCtx, spec addColumnSpec, n tree.NodeFormatter) (backing *s // follow-up change in order to get this in. allTargets := b.QueryByID(spec.tbl.TableID) if spec.def == nil && spec.colType.ComputeExpr == nil { + if !spec.colType.IsNullable && spec.unique { + panic(scerrors.NotImplementedErrorf(n, + "`ADD COLUMN NOT NULL UNIQUE` is problematic with "+ + "concurrent insert. See issue #90174")) + } b.Add(&scpb.IndexColumn{ TableID: spec.tbl.TableID, IndexID: existing.IndexID, diff --git a/pkg/sql/schemachanger/scbuild/testdata/unimplemented_alter_table b/pkg/sql/schemachanger/scbuild/testdata/unimplemented_alter_table index 194dd30ffa68..ced3a5569ff8 100644 --- a/pkg/sql/schemachanger/scbuild/testdata/unimplemented_alter_table +++ b/pkg/sql/schemachanger/scbuild/testdata/unimplemented_alter_table @@ -56,6 +56,10 @@ unimplemented ALTER TABLE defaultdb.foo ADD COLUMN p INT, DROP COLUMN o ---- +unimplemented +ALTER TABLE defaultdb.foo ADD COLUMN p INT NOT NULL UNIQUE +---- + unimplemented ALTER TABLE defaultdb.foo DROP CONSTRAINT foobar ---- diff --git a/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel b/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel index 88f6dd183644..40abb9952202 100644 --- a/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel +++ b/pkg/sql/sqlinstance/instanceprovider/BUILD.bazel @@ -23,7 +23,6 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_logtags//:logtags", ], ) diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index 5cae12392b36..99c152e25bce 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -13,7 +13,6 @@ package instanceprovider import ( "context" - "sync" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -28,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" ) type writer interface { @@ -45,15 +43,10 @@ type provider struct { instanceAddr string session sqlliveness.Instance locality roachpb.Locality - initOnce sync.Once - initialized chan struct{} instanceID base.SQLInstanceID sessionID sqlliveness.SessionID - initError error - mu struct { - syncutil.Mutex - started bool - } + started bool + stopped syncutil.AtomicBool } // New constructs a new Provider. @@ -76,89 +69,35 @@ func New( session: slProvider, instanceAddr: addr, locality: locality, - initialized: make(chan struct{}), } return p } // Start implements the sqlinstance.Provider interface. func (p *provider) Start(ctx context.Context) error { - if p.started() { - return p.initError - } - // Initialize the instance. We need to do this before starting the reader, so - // that the reader sees the instance. - if err := p.initAndWait(ctx); err != nil { - return err - } - - if err := p.Reader.Start(ctx); err != nil { - p.initOnce.Do(func() { - p.initError = err - close(p.initialized) - }) - } - p.mu.Lock() - defer p.mu.Unlock() - p.mu.started = true - return p.initError -} - -func (p *provider) started() bool { - p.mu.Lock() - defer p.mu.Unlock() - return p.mu.started -} - -// Instance implements the sqlinstance.Provider interface. -func (p *provider) Instance( - ctx context.Context, -) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) { - if !p.started() { - return base.SQLInstanceID(0), "", sqlinstance.NotStartedError + if p.started { + return errors.New("already started") } - select { - case <-ctx.Done(): - return base.SQLInstanceID(0), "", ctx.Err() - case <-p.stopper.ShouldQuiesce(): - return base.SQLInstanceID(0), "", stop.ErrUnavailable - case <-p.initialized: - return p.instanceID, p.sessionID, p.initError - } -} + p.started = true -func (p *provider) initAndWait(ctx context.Context) error { - p.maybeInitialize() - select { - case <-ctx.Done(): - return ctx.Err() - case <-p.stopper.ShouldQuiesce(): - return stop.ErrUnavailable - case <-p.initialized: - if p.initError == nil { - log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID) - } else { - log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError) + { + // Initialize the instance. We need to do this before starting the reader, + // so that the reader sees the instance. + ctx, cancel := p.stopper.WithCancelOnQuiesce(ctx) + defer cancel() + if err := p.init(ctx); err != nil { + log.Ops.Warningf(ctx, "error creating SQL instance: %s", err) + return err } } - return p.initError -} -func (p *provider) maybeInitialize() { - p.initOnce.Do(func() { - ctx := context.Background() - if err := p.stopper.RunAsyncTask(ctx, "initialize-instance", func(ctx context.Context) { - ctx = logtags.AddTag(ctx, "initialize-instance", nil) - p.initError = p.initialize(ctx) - close(p.initialized) - }); err != nil { - p.initError = err - close(p.initialized) - } - }) + if err := p.Reader.Start(ctx); err != nil { + return err + } + return nil } -func (p *provider) initialize(ctx context.Context) error { +func (p *provider) init(ctx context.Context) error { session, err := p.session.Session(ctx) if err != nil { return errors.Wrap(err, "constructing session") @@ -182,28 +121,26 @@ func (p *provider) initialize(ctx context.Context) error { return nil } -// shutdownSQLInstance shuts down the SQL instance. -func (p *provider) shutdownSQLInstance(ctx context.Context) { - if !p.started() { - return - } - // Initialize initError if shutdownSQLInstance is called - // before initialization of the instance ID - go func() { - p.initOnce.Do(func() { - p.initError = errors.New("instance never initialized") - close(p.initialized) - }) - }() - select { - case <-ctx.Done(): - return - case <-p.initialized: +// ErrProviderShutDown is returned by Instance() if called after the instance ID +// has been released. +var ErrProviderShutDown = errors.New("instance provider shut down") + +// Instance implements the sqlinstance.Provider interface. +func (p *provider) Instance( + ctx context.Context, +) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) { + if !p.started { + return base.SQLInstanceID(0), "", sqlinstance.NotStartedError } - // If there is any initialization error, return as there is nothing to do. - if p.initError != nil { - return + if p.stopped.Get() { + return base.SQLInstanceID(0), "", ErrProviderShutDown } + return p.instanceID, p.sessionID, nil +} + +// shutdownSQLInstance releases the instance ID and stops the stopper. +func (p *provider) shutdownSQLInstance(ctx context.Context) { + p.stopped.Set(true) err := p.storage.ReleaseInstanceID(ctx, p.instanceID) if err != nil { log.Ops.Warningf(ctx, "could not release instance id %d", p.instanceID) diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go index 15949afe64f7..a8165d54c117 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go @@ -65,7 +65,7 @@ func TestInstanceProvider(t *testing.T) { defer stopper.Stop(ctx) instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr) slInstance.Start(ctx) - instanceProvider.InitAndWaitForTest(ctx) + instanceProvider.InitForTest(ctx) instanceID, sessionID, err := instanceProvider.Instance(ctx) require.NoError(t, err) require.Equal(t, expectedInstanceID, instanceID) @@ -89,22 +89,10 @@ func TestInstanceProvider(t *testing.T) { // Verify that the SQL instance is shutdown on session expiry. testutils.SucceedsSoon(t, func() error { - if _, _, err = instanceProvider.Instance(ctx); !errors.Is(err, stop.ErrUnavailable) { + if _, _, err = instanceProvider.Instance(ctx); !errors.Is(err, instanceprovider.ErrProviderShutDown) { return errors.Errorf("sql instance is not shutdown on session expiry") } return nil }) }) - - t.Run("test-shutdown-before-init", func(t *testing.T) { - stopper, slInstance, _, _ := setup(t) - defer stopper.Stop(ctx) - instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr") - slInstance.Start(ctx) - instanceProvider.ShutdownSQLInstanceForTest(ctx) - instanceProvider.InitAndWaitForTest(ctx) - _, _, err := instanceProvider.Instance(ctx) - require.Error(t, err) - require.Equal(t, "instance never initialized", err.Error()) - }) } diff --git a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go index a1eb1c6ea42b..cecb6aef9a95 100644 --- a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go +++ b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go @@ -23,7 +23,7 @@ import ( // and InitAndWaitForTest methods for testing purposes. type TestInstanceProvider interface { sqlinstance.Provider - InitAndWaitForTest(context.Context) + InitForTest(context.Context) ShutdownSQLInstanceForTest(context.Context) } @@ -38,15 +38,14 @@ func NewTestInstanceProvider( stopper: stopper, session: session, instanceAddr: addr, - initialized: make(chan struct{}), + started: true, } - p.mu.started = true return p } // InitAndWaitForTest explicitly calls initAndWait for testing purposes. -func (p *provider) InitAndWaitForTest(ctx context.Context) { - _ = p.initAndWait(ctx) +func (p *provider) InitForTest(ctx context.Context) { + _ = p.init(ctx) } // ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes. diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader.go b/pkg/sql/sqlinstance/instancestorage/instancereader.go index 5118e76999cb..77c2f5370e16 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancereader.go +++ b/pkg/sql/sqlinstance/instancestorage/instancereader.go @@ -89,7 +89,8 @@ func NewReader( return NewTestingReader(storage, slReader, f, codec, keys.SQLInstancesTableID, clock, stopper) } -// Start initializes the rangefeed for the Reader. +// Start initializes the rangefeed for the Reader. The rangefeed will run until +// the stopper stops. func (r *Reader) Start(ctx context.Context) error { rf := r.maybeStartRangeFeed(ctx) select {