@@ -7,12 +7,14 @@ import (
7
7
"context"
8
8
"encoding/binary"
9
9
"fmt"
10
+ "io"
10
11
"net"
11
12
"net/url"
12
13
"path"
13
14
"testing"
14
15
15
16
"github.com/pingcap/errors"
17
+ "github.com/pingcap/failpoint"
16
18
backup "github.com/pingcap/kvproto/pkg/brpb"
17
19
"github.com/pingcap/log"
18
20
berrors "github.com/pingcap/tidb/br/pkg/errors"
@@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) {
143
145
t .Run ("testGetGlobalCheckPointTS" , func (t * testing.T ) { testGetGlobalCheckPointTS (t , metaCli ) })
144
146
t .Run ("TestStreamListening" , func (t * testing.T ) { testStreamListening (t , streamhelper.AdvancerExt {MetaDataClient : metaCli }) })
145
147
t .Run ("TestStreamCheckpoint" , func (t * testing.T ) { testStreamCheckpoint (t , streamhelper.AdvancerExt {MetaDataClient : metaCli }) })
148
+ t .Run ("TestStreamClose" , func (t * testing.T ) { testStreamClose (t , streamhelper.AdvancerExt {MetaDataClient : metaCli }) })
146
149
}
147
150
148
151
func TestChecking (t * testing.T ) {
@@ -320,6 +323,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
320
323
taskInfo2 := simpleTask (taskName2 , 4 )
321
324
require .NoError (t , metaCli .PutTask (ctx , taskInfo2 ))
322
325
require .NoError (t , metaCli .DeleteTask (ctx , taskName2 ))
326
+
323
327
first := <- ch
324
328
require .Equal (t , first .Type , streamhelper .EventAdd )
325
329
require .Equal (t , first .Name , taskName )
@@ -335,8 +339,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
335
339
require .Equal (t , forth .Type , streamhelper .EventDel )
336
340
require .Equal (t , forth .Name , taskName2 )
337
341
cancel ()
338
- _ , ok := <- ch
339
- require .False (t , ok )
342
+ fifth , ok := <- ch
343
+ require .True (t , ok )
344
+ require .Equal (t , fifth .Type , streamhelper .EventErr )
345
+ require .Error (t , fifth .Err , context .Canceled )
346
+ item , ok := <- ch
347
+ require .False (t , ok , "%v" , item )
348
+ }
349
+
350
+ func testStreamClose (t * testing.T , metaCli streamhelper.AdvancerExt ) {
351
+ ctx := context .Background ()
352
+ taskName := "close_simple"
353
+ taskInfo := simpleTask (taskName , 4 )
354
+
355
+ require .NoError (t , metaCli .PutTask (ctx , taskInfo ))
356
+ ch := make (chan streamhelper.TaskEvent , 1024 )
357
+ require .NoError (t , metaCli .Begin (ctx , ch ))
358
+ require .NoError (t , metaCli .DeleteTask (ctx , taskName ))
359
+ first := <- ch
360
+ require .Equal (t , first .Type , streamhelper .EventAdd )
361
+ require .Equal (t , first .Name , taskName )
362
+ require .ElementsMatch (t , first .Ranges , simpleRanges (4 ))
363
+ second := <- ch
364
+ require .Equal (t , second .Type , streamhelper .EventDel , "%s" , second )
365
+ require .Equal (t , second .Name , taskName , "%s" , second )
366
+
367
+ require .NoError (t , failpoint .Enable ("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel" , "return" ))
368
+ defer failpoint .Disable ("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel" )
369
+ // We need to make the channel file some events hence we can simulate the closed channel.
370
+ taskName2 := "close_simple2"
371
+ taskInfo2 := simpleTask (taskName2 , 4 )
372
+ require .NoError (t , metaCli .PutTask (ctx , taskInfo2 ))
373
+ require .NoError (t , metaCli .DeleteTask (ctx , taskName2 ))
374
+
375
+ third := <- ch
376
+ require .Equal (t , third .Type , streamhelper .EventErr )
377
+ require .Error (t , third .Err , io .EOF )
378
+ item , ok := <- ch
379
+ require .False (t , ok , "%#v" , item )
340
380
}
341
381
342
382
func testStreamCheckpoint (t * testing.T , metaCli streamhelper.AdvancerExt ) {
0 commit comments