Skip to content

Commit

Permalink
Merge #90108 #90180
Browse files Browse the repository at this point in the history
90108: sqlinstance: simplify instance provider code r=andreimatei a=andreimatei

Before this patch, the instance provider was seemingly protecting
against different methods being called concurrently with or before
Start(). This protection was not necessary, as Start() is run to
completion before the provider is used by anyone else. Also, part of the
protection was broken: a bunch of unrelated code chunks were guarded by
a sync.Once and at least one instance (the error handling after
p.Reader.Start()) was running clearly after a the Once was used by
p.initAndWait() - and so it was erroneously never running. Also, the
code was bizarre - initializing was executed in an async task, but the
caller was immediately waiting for it.

This patch removes all such protection, assuming that Start() runs
before anything else.

Release note: None
Epic: None

90180: scbuildstmt: fallback for `ADD COLUMN NOT NULL UNIQUE` r=Xiang-Gu a=Xiang-Gu

An issue (#90174) was recently discovered when we have concurrent `add column not null unique` and inserts. This PR fall backs to the old schema changer for `add column unique` and `add column not null unique`.

Informs #90174
Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Xiang Gu <xiang@cockroachlabs.com>
  • Loading branch information
3 people committed Oct 18, 2022
3 parents f143b63 + f064d9f + ed33a4c commit fbb330c
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 121 deletions.
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
info, err := cfg.sqlInstanceProvider.GetInstance(cfg.rpcContext.MasterCtx, base.SQLInstanceID(nodeID))
if err != nil {
return nil, errors.Errorf("unable to look up descriptor for nsql%d", nodeID)
return nil, errors.Wrapf(err, "unable to look up descriptor for nsql%d", nodeID)
}
return &util.UnresolvedAddr{AddressField: info.InstanceAddr}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func alterTableAddColumn(
GeneratedAsIdentityType: desc.GeneratedAsIdentityType,
PgAttributeNum: desc.GetPGAttributeNum(),
},
unique: d.Unique.IsUnique,
}
if ptr := desc.GeneratedAsIdentitySequenceOption; ptr != nil {
spec.col.GeneratedAsIdentitySequenceOption = *ptr
Expand Down Expand Up @@ -277,6 +278,7 @@ type addColumnSpec struct {
def *scpb.ColumnDefaultExpression
onUpdate *scpb.ColumnOnUpdateExpression
comment *scpb.ColumnComment
unique bool
}

// addColumn is a helper function which adds column element targets and ensures
Expand Down Expand Up @@ -338,6 +340,11 @@ func addColumn(b BuildCtx, spec addColumnSpec, n tree.NodeFormatter) (backing *s
// follow-up change in order to get this in.
allTargets := b.QueryByID(spec.tbl.TableID)
if spec.def == nil && spec.colType.ComputeExpr == nil {
if !spec.colType.IsNullable && spec.unique {
panic(scerrors.NotImplementedErrorf(n,
"`ADD COLUMN NOT NULL UNIQUE` is problematic with "+
"concurrent insert. See issue #90174"))
}
b.Add(&scpb.IndexColumn{
TableID: spec.tbl.TableID,
IndexID: existing.IndexID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ unimplemented
ALTER TABLE defaultdb.foo ADD COLUMN p INT, DROP COLUMN o
----

unimplemented
ALTER TABLE defaultdb.foo ADD COLUMN p INT NOT NULL UNIQUE
----

unimplemented
ALTER TABLE defaultdb.foo DROP CONSTRAINT foobar
----
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sqlinstance/instanceprovider/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
],
)

Expand Down
135 changes: 36 additions & 99 deletions pkg/sql/sqlinstance/instanceprovider/instanceprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package instanceprovider

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

type writer interface {
Expand All @@ -45,15 +43,10 @@ type provider struct {
instanceAddr string
session sqlliveness.Instance
locality roachpb.Locality
initOnce sync.Once
initialized chan struct{}
instanceID base.SQLInstanceID
sessionID sqlliveness.SessionID
initError error
mu struct {
syncutil.Mutex
started bool
}
started bool
stopped syncutil.AtomicBool
}

// New constructs a new Provider.
Expand All @@ -76,89 +69,35 @@ func New(
session: slProvider,
instanceAddr: addr,
locality: locality,
initialized: make(chan struct{}),
}
return p
}

// Start implements the sqlinstance.Provider interface.
func (p *provider) Start(ctx context.Context) error {
if p.started() {
return p.initError
}
// Initialize the instance. We need to do this before starting the reader, so
// that the reader sees the instance.
if err := p.initAndWait(ctx); err != nil {
return err
}

if err := p.Reader.Start(ctx); err != nil {
p.initOnce.Do(func() {
p.initError = err
close(p.initialized)
})
}
p.mu.Lock()
defer p.mu.Unlock()
p.mu.started = true
return p.initError
}

func (p *provider) started() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.mu.started
}

// Instance implements the sqlinstance.Provider interface.
func (p *provider) Instance(
ctx context.Context,
) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) {
if !p.started() {
return base.SQLInstanceID(0), "", sqlinstance.NotStartedError
if p.started {
return errors.New("already started")
}
select {
case <-ctx.Done():
return base.SQLInstanceID(0), "", ctx.Err()
case <-p.stopper.ShouldQuiesce():
return base.SQLInstanceID(0), "", stop.ErrUnavailable
case <-p.initialized:
return p.instanceID, p.sessionID, p.initError
}
}
p.started = true

func (p *provider) initAndWait(ctx context.Context) error {
p.maybeInitialize()
select {
case <-ctx.Done():
return ctx.Err()
case <-p.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-p.initialized:
if p.initError == nil {
log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID)
} else {
log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError)
{
// Initialize the instance. We need to do this before starting the reader,
// so that the reader sees the instance.
ctx, cancel := p.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
if err := p.init(ctx); err != nil {
log.Ops.Warningf(ctx, "error creating SQL instance: %s", err)
return err
}
}
return p.initError
}

func (p *provider) maybeInitialize() {
p.initOnce.Do(func() {
ctx := context.Background()
if err := p.stopper.RunAsyncTask(ctx, "initialize-instance", func(ctx context.Context) {
ctx = logtags.AddTag(ctx, "initialize-instance", nil)
p.initError = p.initialize(ctx)
close(p.initialized)
}); err != nil {
p.initError = err
close(p.initialized)
}
})
if err := p.Reader.Start(ctx); err != nil {
return err
}
return nil
}

func (p *provider) initialize(ctx context.Context) error {
func (p *provider) init(ctx context.Context) error {
session, err := p.session.Session(ctx)
if err != nil {
return errors.Wrap(err, "constructing session")
Expand All @@ -182,28 +121,26 @@ func (p *provider) initialize(ctx context.Context) error {
return nil
}

// shutdownSQLInstance shuts down the SQL instance.
func (p *provider) shutdownSQLInstance(ctx context.Context) {
if !p.started() {
return
}
// Initialize initError if shutdownSQLInstance is called
// before initialization of the instance ID
go func() {
p.initOnce.Do(func() {
p.initError = errors.New("instance never initialized")
close(p.initialized)
})
}()
select {
case <-ctx.Done():
return
case <-p.initialized:
// ErrProviderShutDown is returned by Instance() if called after the instance ID
// has been released.
var ErrProviderShutDown = errors.New("instance provider shut down")

// Instance implements the sqlinstance.Provider interface.
func (p *provider) Instance(
ctx context.Context,
) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) {
if !p.started {
return base.SQLInstanceID(0), "", sqlinstance.NotStartedError
}
// If there is any initialization error, return as there is nothing to do.
if p.initError != nil {
return
if p.stopped.Get() {
return base.SQLInstanceID(0), "", ErrProviderShutDown
}
return p.instanceID, p.sessionID, nil
}

// shutdownSQLInstance releases the instance ID and stops the stopper.
func (p *provider) shutdownSQLInstance(ctx context.Context) {
p.stopped.Set(true)
err := p.storage.ReleaseInstanceID(ctx, p.instanceID)
if err != nil {
log.Ops.Warningf(ctx, "could not release instance id %d", p.instanceID)
Expand Down
16 changes: 2 additions & 14 deletions pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestInstanceProvider(t *testing.T) {
defer stopper.Stop(ctx)
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr)
slInstance.Start(ctx)
instanceProvider.InitAndWaitForTest(ctx)
instanceProvider.InitForTest(ctx)
instanceID, sessionID, err := instanceProvider.Instance(ctx)
require.NoError(t, err)
require.Equal(t, expectedInstanceID, instanceID)
Expand All @@ -89,22 +89,10 @@ func TestInstanceProvider(t *testing.T) {

// Verify that the SQL instance is shutdown on session expiry.
testutils.SucceedsSoon(t, func() error {
if _, _, err = instanceProvider.Instance(ctx); !errors.Is(err, stop.ErrUnavailable) {
if _, _, err = instanceProvider.Instance(ctx); !errors.Is(err, instanceprovider.ErrProviderShutDown) {
return errors.Errorf("sql instance is not shutdown on session expiry")
}
return nil
})
})

t.Run("test-shutdown-before-init", func(t *testing.T) {
stopper, slInstance, _, _ := setup(t)
defer stopper.Stop(ctx)
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr")
slInstance.Start(ctx)
instanceProvider.ShutdownSQLInstanceForTest(ctx)
instanceProvider.InitAndWaitForTest(ctx)
_, _, err := instanceProvider.Instance(ctx)
require.Error(t, err)
require.Equal(t, "instance never initialized", err.Error())
})
}
9 changes: 4 additions & 5 deletions pkg/sql/sqlinstance/instanceprovider/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// and InitAndWaitForTest methods for testing purposes.
type TestInstanceProvider interface {
sqlinstance.Provider
InitAndWaitForTest(context.Context)
InitForTest(context.Context)
ShutdownSQLInstanceForTest(context.Context)
}

Expand All @@ -38,15 +38,14 @@ func NewTestInstanceProvider(
stopper: stopper,
session: session,
instanceAddr: addr,
initialized: make(chan struct{}),
started: true,
}
p.mu.started = true
return p
}

// InitAndWaitForTest explicitly calls initAndWait for testing purposes.
func (p *provider) InitAndWaitForTest(ctx context.Context) {
_ = p.initAndWait(ctx)
func (p *provider) InitForTest(ctx context.Context) {
_ = p.init(ctx)
}

// ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/sqlinstance/instancestorage/instancereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func NewReader(
return NewTestingReader(storage, slReader, f, codec, keys.SQLInstancesTableID, clock, stopper)
}

// Start initializes the rangefeed for the Reader.
// Start initializes the rangefeed for the Reader. The rangefeed will run until
// the stopper stops.
func (r *Reader) Start(ctx context.Context) error {
rf := r.maybeStartRangeFeed(ctx)
select {
Expand Down

0 comments on commit fbb330c

Please sign in to comment.