Skip to content

Commit

Permalink
修復重連機制接連失敗的問題
Browse files Browse the repository at this point in the history
  • Loading branch information
eric2788 committed Jan 25, 2022
1 parent eccd6ae commit 3a0fce8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
52 changes: 23 additions & 29 deletions crawlers/bilibili/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
const id = "platforms_crawler"

var (
publisher crawling.Publisher
livedSet = mapset.NewSet()
subRequest *http.Request
publisher crawling.Publisher
livedSet = mapset.NewSet()
listening []string
)

// antiDuplicateLive 基於 LIVE 指令可能會連續發送幾次
Expand Down Expand Up @@ -57,7 +57,7 @@ func handleMessage(b []byte) {
}
}

func createSubscribeRequest(room []string) (url.URL, *http.Request, error) {
func doSubscribeRequest(room []string) (url.URL, error) {

httpUrl := url.URL{
Host: bilibiliYaml.BiliLiveHost,
Expand All @@ -77,11 +77,24 @@ func createSubscribeRequest(room []string) (url.URL, *http.Request, error) {

body := strings.NewReader(form.Encode())
req, err := http.NewRequest(http.MethodPost, httpUrl.String(), body)
if req != nil {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Authorization", id)

if err != nil {
return httpUrl, err
}
return httpUrl, req, err

req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Authorization", id)

resp, err := http.DefaultClient.Do(req)

if err != nil {
return httpUrl, err
}

if resp.StatusCode != 200 {
return httpUrl, fmt.Errorf(resp.Status)
}
return httpUrl, nil
}

func subscribeAll(room []string, ctx context.Context, done context.CancelFunc, p crawling.Publisher) {
Expand All @@ -101,16 +114,8 @@ func subscribeAll(room []string, ctx context.Context, done context.CancelFunc, p
}

logger.Debugf("正在設置訂閱...")
httpUrl, req, err := createSubscribeRequest(room)
subRequest = req

if err != nil {
logger.Errorf("嘗試請求 %s 時出現錯誤: %v", httpUrl.String(), err)
retry()
return
}

_, err = doRequest(req)
httpUrl, err := doSubscribeRequest(room)
listening = room

if err != nil {
logger.Errorf("嘗試設置訂閱時出現錯誤: %v", err)
Expand All @@ -125,17 +130,6 @@ func subscribeAll(room []string, ctx context.Context, done context.CancelFunc, p
unSubscribe(httpUrl)
}

func doRequest(req *http.Request) (*http.Response, error) {
resp, err := http.DefaultClient.Do(req.Clone(context.Background()))
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf(resp.Status)
}
return resp, nil
}

func unSubscribe(httpUrl url.URL) {

logger.Debugf("正在清除訂閱...")
Expand Down
4 changes: 2 additions & 2 deletions crawlers/bilibili/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func retryDelay(ctx context.Context, wg *sync.WaitGroup) {
logger.Warnf("五秒後重連...")
<-time.After(time.Second * 5)
startWebSocket(ctx, wg)
if subRequest != nil {
if listening != nil {
// 重新訂閱
for _, err := doRequest(subRequest); err != nil; {
for _, err := doSubscribeRequest(listening); err != nil; {
logger.Errorf("重新訂閱失敗: %v,五秒後重試...", err)
<-time.After(time.Second * 5)
}
Expand Down
22 changes: 15 additions & 7 deletions crawlers/bilibili/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"time"
)

var testListen = []string{
"876396",
"22571958",
"21320551",
}

func TestWebSocket(t *testing.T) {
// no need load because default settings is valid
// file.LoadYaml("bilibili", bilibiliYaml)
Expand All @@ -18,14 +24,16 @@ func TestWebSocket(t *testing.T) {
wg := &sync.WaitGroup{}
go startWebSocket(ctx, wg)

listen := []string{
"876396",
"22571958",
"21320551",
}

go subscribeAll(listen, ctx, cancel, nil)
go subscribeAll(testListen, ctx, cancel, nil)
<-time.After(time.Second * 30)
cancel()
wg.Wait()
}

func TestReuseRequest(t *testing.T) {
for i := 0; i < 3; i++ {
if _, err := doSubscribeRequest(testListen); err != nil {
t.Fatal(err)
}
}
}

0 comments on commit 3a0fce8

Please sign in to comment.