From 3a0fce858e674e196368335bca63de3d14ff4af6 Mon Sep 17 00:00:00 2001 From: eric2788 Date: Tue, 25 Jan 2022 22:22:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=BE=A9=E9=87=8D=E9=80=A3=E6=A9=9F?= =?UTF-8?q?=E5=88=B6=E6=8E=A5=E9=80=A3=E5=A4=B1=E6=95=97=E7=9A=84=E5=95=8F?= =?UTF-8?q?=E9=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawlers/bilibili/spider.go | 52 +++++++++++++---------------- crawlers/bilibili/websocket.go | 4 +-- crawlers/bilibili/websocket_test.go | 22 ++++++++---- 3 files changed, 40 insertions(+), 38 deletions(-) diff --git a/crawlers/bilibili/spider.go b/crawlers/bilibili/spider.go index 0626919..e126433 100644 --- a/crawlers/bilibili/spider.go +++ b/crawlers/bilibili/spider.go @@ -15,9 +15,9 @@ import ( const id = "platforms_crawler" var ( - publisher crawling.Publisher - livedSet = mapset.NewSet() - subRequest *http.Request + publisher crawling.Publisher + livedSet = mapset.NewSet() + listening []string ) // antiDuplicateLive 基於 LIVE 指令可能會連續發送幾次 @@ -57,7 +57,7 @@ func handleMessage(b []byte) { } } -func createSubscribeRequest(room []string) (url.URL, *http.Request, error) { +func doSubscribeRequest(room []string) (url.URL, error) { httpUrl := url.URL{ Host: bilibiliYaml.BiliLiveHost, @@ -77,11 +77,24 @@ func createSubscribeRequest(room []string) (url.URL, *http.Request, error) { body := strings.NewReader(form.Encode()) req, err := http.NewRequest(http.MethodPost, httpUrl.String(), body) - if req != nil { - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - req.Header.Set("Authorization", id) + + if err != nil { + return httpUrl, err } - return httpUrl, req, err + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Authorization", id) + + resp, err := http.DefaultClient.Do(req) + + if err != nil { + return httpUrl, err + } + + if resp.StatusCode != 200 { + return httpUrl, fmt.Errorf(resp.Status) + } + return httpUrl, nil } func subscribeAll(room []string, ctx context.Context, done context.CancelFunc, p crawling.Publisher) { @@ -101,16 +114,8 @@ func subscribeAll(room []string, ctx context.Context, done context.CancelFunc, p } logger.Debugf("正在設置訂閱...") - httpUrl, req, err := createSubscribeRequest(room) - subRequest = req - - if err != nil { - logger.Errorf("嘗試請求 %s 時出現錯誤: %v", httpUrl.String(), err) - retry() - return - } - - _, err = doRequest(req) + httpUrl, err := doSubscribeRequest(room) + listening = room if err != nil { logger.Errorf("嘗試設置訂閱時出現錯誤: %v", err) @@ -125,17 +130,6 @@ func subscribeAll(room []string, ctx context.Context, done context.CancelFunc, p unSubscribe(httpUrl) } -func doRequest(req *http.Request) (*http.Response, error) { - resp, err := http.DefaultClient.Do(req.Clone(context.Background())) - if err != nil { - return nil, err - } - if resp.StatusCode != 200 { - return nil, fmt.Errorf(resp.Status) - } - return resp, nil -} - func unSubscribe(httpUrl url.URL) { logger.Debugf("正在清除訂閱...") diff --git a/crawlers/bilibili/websocket.go b/crawlers/bilibili/websocket.go index edfc091..642c25a 100644 --- a/crawlers/bilibili/websocket.go +++ b/crawlers/bilibili/websocket.go @@ -88,9 +88,9 @@ func retryDelay(ctx context.Context, wg *sync.WaitGroup) { logger.Warnf("五秒後重連...") <-time.After(time.Second * 5) startWebSocket(ctx, wg) - if subRequest != nil { + if listening != nil { // 重新訂閱 - for _, err := doRequest(subRequest); err != nil; { + for _, err := doSubscribeRequest(listening); err != nil; { logger.Errorf("重新訂閱失敗: %v,五秒後重試...", err) <-time.After(time.Second * 5) } diff --git a/crawlers/bilibili/websocket_test.go b/crawlers/bilibili/websocket_test.go index 8bffad9..5708068 100644 --- a/crawlers/bilibili/websocket_test.go +++ b/crawlers/bilibili/websocket_test.go @@ -8,6 +8,12 @@ import ( "time" ) +var testListen = []string{ + "876396", + "22571958", + "21320551", +} + func TestWebSocket(t *testing.T) { // no need load because default settings is valid // file.LoadYaml("bilibili", bilibiliYaml) @@ -18,14 +24,16 @@ func TestWebSocket(t *testing.T) { wg := &sync.WaitGroup{} go startWebSocket(ctx, wg) - listen := []string{ - "876396", - "22571958", - "21320551", - } - - go subscribeAll(listen, ctx, cancel, nil) + go subscribeAll(testListen, ctx, cancel, nil) <-time.After(time.Second * 30) cancel() wg.Wait() } + +func TestReuseRequest(t *testing.T) { + for i := 0; i < 3; i++ { + if _, err := doSubscribeRequest(testListen); err != nil { + t.Fatal(err) + } + } +}