Skip to content

Commit

Permalink
Error on batch insert if InsertOpts.UniqueOpts is specified
Browse files Browse the repository at this point in the history
As I was writing docs for batch insertion today [1], I remembered that
we don't support job uniqueness on batch inserts because the mechanism
uses PG advisory locks, and holding many locks at once could easily lead
to contention and deadlocks.

We may remove this limitation in the future, but I figure that in the
meantime, it might not be the worst idea to error if they're specified
on batch insert so as not to mislead the user that what they expected to
happen happened. That's what this change does.

[1] https://riverqueue.com/docs/batch-job-insertion
  • Loading branch information
brandur committed Nov 11, 2023
1 parent 78dea02 commit b07d2af
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 19 deletions.
46 changes: 27 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,21 +1059,9 @@ type InsertManyParams struct {
// // handle error
// }
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error) {
if len(params) < 1 {
return 0, errors.New("no jobs to insert")
}

insertParams := make([]*dbadapter.JobInsertParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return 0, err
}

var err error
insertParams[i], err = insertParamsFromArgsAndOptions(param.Args, param.InsertOpts)
if err != nil {
return 0, err
}
insertParams, err := c.insertManyParams(params)
if err != nil {
return 0, err
}

return c.adapter.JobInsertMany(ctx, insertParams)
Expand All @@ -1099,24 +1087,44 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int64, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return 0, err
}

return c.adapter.JobInsertManyTx(ctx, c.driver.UnwrapTx(tx), insertParams)
}

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*dbadapter.JobInsertParams, error) {
if len(params) < 1 {
return 0, errors.New("no jobs to insert")
return nil, errors.New("no jobs to insert")
}

insertParams := make([]*dbadapter.JobInsertParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return 0, err
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't support for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously
// could easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], err = insertParamsFromArgsAndOptions(param.Args, param.InsertOpts)
if err != nil {
return 0, err
return nil, err
}
}

return c.adapter.JobInsertManyTx(ctx, c.driver.UnwrapTx(tx), insertParams)
return insertParams, nil
}

// Validates job args prior to insertion. Currently, verifies that a worker to
Expand Down
24 changes: 24 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ func Test_Client_InsertMany(t *testing.T) {
})
require.NoError(t, err)
})

t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.Equal(t, int64(0), count)
})
}

func Test_Client_InsertManyTx(t *testing.T) {
Expand Down Expand Up @@ -809,6 +821,18 @@ func Test_Client_InsertManyTx(t *testing.T) {
})
require.NoError(t, err)
})

t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.Equal(t, int64(0), count)
})
}

func Test_Client_ErrorHandler(t *testing.T) {
Expand Down

0 comments on commit b07d2af

Please sign in to comment.