Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
diPhantxm committed Aug 26, 2024
1 parent c8b373a commit e641e31
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
10 changes: 4 additions & 6 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,6 @@ func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) err
rst.Client().RLock()
defer rst.Client().RUnlock()

spqrlog.Zero.Debug().Interface("copy stmt", stmt).Msg("here201")

// Read delimiter from COPY options
delimiter := byte(';')
for _, opt := range stmt.Options {
Expand Down Expand Up @@ -698,8 +696,6 @@ func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) err
return err
}

spqrlog.Zero.Debug().Interface("value", valueClause.Values).Msg("here")

smt, ok := r.(routingstate.ShardMatchState)
if !ok {
return fmt.Errorf("multishard copy is not supported")
Expand Down Expand Up @@ -809,6 +805,7 @@ func (rst *RelayStateImpl) ProcQuery(query pgproto3.FrontendMessage, waitForResp
q := rst.qp.Stmt().(*lyx.Copy)

if err := func() error {
buf := []byte{}
for {
cpMsg, err := rst.Client().Receive()
if err != nil {
Expand All @@ -817,10 +814,11 @@ func (rst *RelayStateImpl) ProcQuery(query pgproto3.FrontendMessage, waitForResp

switch msg := cpMsg.(type) {
case *pgproto3.CopyData:
if err := rst.ProcCopy(q, msg); err != nil {
buf = append(buf, msg.Data...)
case *pgproto3.CopyDone, *pgproto3.CopyFail:
if err := rst.ProcCopy(q, &pgproto3.CopyData{Data: buf}); err != nil {
return err
}
case *pgproto3.CopyDone, *pgproto3.CopyFail:
if err := rst.ProcCopyComplete(&cpMsg); err != nil {
return err
}
Expand Down
36 changes: 26 additions & 10 deletions test/regress/tests/router/expected/copy_routing.out
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ ALTER DISTRIBUTION ds1 ATTACH RELATION copy_test DISTRIBUTION KEY id;
\c regress
CREATE TABLE copy_test (id int);
NOTICE: send query to shard(s) : sh1,sh2
COPY copy_test FROM STDIN WHERE id <= 10;
NOTICE: send query to shard(s) : sh1
COPY copy_test(id) FROM STDIN WHERE id <= 10;
NOTICE: send query to shard(s) : sh1,sh2
SELECT * FROM copy_test WHERE id <= 10;
NOTICE: send query to shard(s) : sh1
id
Expand All @@ -47,21 +47,37 @@ NOTICE: send query to shard(s) : sh1
5
(5 rows)

COPY copy_test FROM STDIN WHERE id <= 30;
NOTICE: send query to shard(s) : sh2
SELECT * FROM copy_test WHERE id <= 30 ORDER BY copy_test;
NOTICE: send query to shard(s) : sh2
COPY copy_test(id) FROM STDIN;
NOTICE: send query to shard(s) : sh1,sh2
ERROR: client processing error: multishard copy is not supported, tx status IDLE
SELECT * FROM copy_test;
NOTICE: send query to shard(s) : sh1,sh2
id
----
1
2
3
4
5
(5 rows)

COPY copy_test(id) FROM STDIN;
NOTICE: send query to shard(s) : sh1,sh2
SELECT * FROM copy_test;
NOTICE: send query to shard(s) : sh1,sh2
id
----
1
2
3
4
5
12
22
23
(8 rows)
41
42
43
44
45
(10 rows)

DROP TABLE copy_test;
NOTICE: send query to shard(s) : sh1,sh2
Expand Down
19 changes: 13 additions & 6 deletions test/regress/tests/router/sql/copy_routing.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ ALTER DISTRIBUTION ds1 ATTACH RELATION copy_test DISTRIBUTION KEY id;
\c regress
CREATE TABLE copy_test (id int);

COPY copy_test FROM STDIN WHERE id <= 10;
COPY copy_test(id) FROM STDIN WHERE id <= 10;
1
2
3
4
5
12
3434
43
\.

SELECT * FROM copy_test WHERE id <= 10;

COPY copy_test FROM STDIN WHERE id <= 30;
COPY copy_test(id) FROM STDIN;
1
2
3
Expand All @@ -35,7 +32,17 @@ COPY copy_test FROM STDIN WHERE id <= 30;
43
\.

SELECT * FROM copy_test WHERE id <= 30 ORDER BY copy_test;
SELECT * FROM copy_test;

COPY copy_test(id) FROM STDIN;
41
42
43
44
45
\.

SELECT * FROM copy_test;

DROP TABLE copy_test;

Expand Down

0 comments on commit e641e31

Please sign in to comment.