Skip to content

Commit

Permalink
Merge pull request #73 from kayac/fix/fcm-error-handling
Browse files Browse the repository at this point in the history
FCM v1 error handling
  • Loading branch information
fujiwara authored Apr 8, 2024
2 parents 0999386 + 19897bb commit dfa8db0
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 60 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type SectionFCMv1 struct {
Enabled bool
ProjectID string
TokenSource oauth2.TokenSource
Endpoint string
}

// DefaultLoadConfig loads default /etc/gunfish.toml
Expand Down
2 changes: 2 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@ var (
OutputHookStdout bool
OutputHookStderr bool
)

var RetryBackoff = true
37 changes: 21 additions & 16 deletions fcmv1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (c *Client) Send(p Payload) ([]Result, error) {

if body.Error == nil && body.Name != "" {
return []Result{
Result{
{
StatusCode: res.StatusCode,
Token: p.Message.Token,
},
}, nil
} else if body.Error != nil {
return []Result{
Result{
{
StatusCode: res.StatusCode,
Token: p.Message.Token,
Error: body.Error,
Expand All @@ -70,22 +70,28 @@ func (c *Client) NewRequest(p Payload) (*http.Request, error) {
if err != nil {
return nil, err
}
token, err := c.tokenSource.Token()
if err != nil {
return nil, err
var bearer string
if ts := c.tokenSource; ts != nil {
token, err := c.tokenSource.Token()
if err != nil {
return nil, err
}
bearer = token.AccessToken
} else {
bearer = p.Message.Token
}
req, err := http.NewRequest("POST", c.endpoint.String(), bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+token.AccessToken)
req.Header.Set("Authorization", "Bearer "+bearer)
req.Header.Set("Content-Type", "application/json")

return req, nil
}

// NewClient establishes a http connection with fcm v1
func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint *url.URL, timeout time.Duration) (*Client, error) {
func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint string, timeout time.Duration) (*Client, error) {
client := &http.Client{
Timeout: timeout,
}
Expand All @@ -94,16 +100,15 @@ func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint *url.U
tokenSource: tokenSource,
}

if endpoint != nil {
c.endpoint = endpoint
} else {
ep, err := url.Parse(DefaultFCMEndpoint)
if err != nil {
return nil, err
}
ep.Path = path.Join(ep.Path, projectID, "messages:send")
c.endpoint = ep
if endpoint == "" {
endpoint = DefaultFCMEndpoint
}
ep, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
ep.Path = path.Join(ep.Path, projectID, "messages:send")
c.endpoint = ep

return c, nil
}
3 changes: 3 additions & 0 deletions fcmv1/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (
InvalidArgument = "INVALID_ARGUMENT"
Unregistered = "UNREGISTERED"
NotFound = "NOT_FOUND"
Internal = "INTERNAL"
Unavailable = "UNAVAILABLE"
QuotaExceeded = "QUOTA_EXCEEDED"
)

type Error struct {
Expand Down
64 changes: 64 additions & 0 deletions mock/fcmv1_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package mock

import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"strings"
"time"

"github.com/kayac/Gunfish/fcmv1"
)

func FCMv1MockServer(projectID string, verbose bool) *http.ServeMux {
mux := http.NewServeMux()
p := fmt.Sprintf("/v1/projects/%s/messages:send", projectID)
log.Println("fcmv1 mock server path:", p)
mux.HandleFunc(p, func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
defer func() {
if verbose {
log.Printf("reqtime:%f proto:%s method:%s path:%s host:%s", reqtime(start), r.Proto, r.Method, r.URL.Path, r.RemoteAddr)
}
}()

// sets the response time from FCM server
time.Sleep(time.Millisecond*200 + time.Millisecond*(time.Duration(rand.Int63n(200)-100)))
token := r.Header.Get("Authorization")
token = strings.TrimPrefix(token, "Bearer ")

w.Header().Set("Content-Type", ApplicationJSON)
switch token {
case fcmv1.InvalidArgument:
createFCMv1ErrorResponse(w, http.StatusBadRequest, fcmv1.InvalidArgument)
case fcmv1.Unregistered:
createFCMv1ErrorResponse(w, http.StatusNotFound, fcmv1.Unregistered)
case fcmv1.Unavailable:
createFCMv1ErrorResponse(w, http.StatusServiceUnavailable, fcmv1.Unavailable)
case fcmv1.Internal:
createFCMv1ErrorResponse(w, http.StatusInternalServerError, fcmv1.Internal)
case fcmv1.QuotaExceeded:
createFCMv1ErrorResponse(w, http.StatusTooManyRequests, fcmv1.QuotaExceeded)
default:
enc := json.NewEncoder(w)
enc.Encode(fcmv1.ResponseBody{
Name: "ok",
})
}
})

return mux
}

func createFCMv1ErrorResponse(w http.ResponseWriter, code int, status string) error {
w.WriteHeader(code)
enc := json.NewEncoder(w)
return enc.Encode(fcmv1.ResponseBody{
Error: &fcmv1.FCMError{
Status: status,
Message: "mock error:" + status,
},
})
}
80 changes: 36 additions & 44 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"math"
"os"
"os/exec"
"sync"
Expand Down Expand Up @@ -33,15 +33,13 @@ type Supervisor struct {

// Worker sends notification to apns.
type Worker struct {
ac *apns.Client
fcv1 *fcmv1.Client
queue chan Request
respq chan SenderResponse
wgrp *sync.WaitGroup
sn int
id int
errorHandler func(Request, *http.Response, error)
successHandler func(Request, *http.Response)
ac *apns.Client
fcv1 *fcmv1.Client
queue chan Request
respq chan SenderResponse
wgrp *sync.WaitGroup
sn int
id int
}

// SenderResponse is responses to worker from sender.
Expand Down Expand Up @@ -113,17 +111,22 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {
for cnt := 0; cnt < RetryOnceCount; cnt++ {
select {
case req := <-s.retryq:
reqs := &[]Request{req}
select {
case s.queue <- reqs:
LogWithFields(logrus.Fields{"type": "retry", "resend_cnt": req.Tries}).
Debugf("Enqueue to retry to send notification.")
default:
LogWithFields(logrus.Fields{"type": "retry"}).
Infof("Could not retry to enqueue because the supervisor queue is full.")
var delay time.Duration
if RetryBackoff {
delay = time.Duration(math.Pow(float64(req.Tries), 2)) * 100 * time.Millisecond
}
time.AfterFunc(delay, func() {
reqs := &[]Request{req}
select {
case s.queue <- reqs:
LogWithFields(logrus.Fields{"delay": delay, "type": "retry", "resend_cnt": req.Tries}).
Debugf("Enqueue to retry to send notification.")
default:
LogWithFields(logrus.Fields{"delay": delay, "type": "retry"}).
Infof("Could not retry to enqueue because the supervisor queue is full.")
}
})
default:
break
}
}
case <-s.exit:
Expand Down Expand Up @@ -172,7 +175,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {
return Supervisor{}, errors.New("FCM legacy is not supported")
}
if conf.FCMv1.Enabled {
fcv1, err = fcmv1.NewClient(conf.FCMv1.TokenSource, conf.FCMv1.ProjectID, nil, fcmv1.ClientTimeout)
fcv1, err = fcmv1.NewClient(conf.FCMv1.TokenSource, conf.FCMv1.ProjectID, conf.FCMv1.Endpoint, fcmv1.ClientTimeout)
if err != nil {
LogWithFields(logrus.Fields{
"type": "supervisor",
Expand All @@ -192,7 +195,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {

s.workers = append(s.workers, &worker)
s.wgrp.Add(1)
go s.spawnWorker(worker, conf)
go s.spawnWorker(worker)
LogWithFields(logrus.Fields{
"type": "worker",
"worker_id": i,
Expand Down Expand Up @@ -245,7 +248,7 @@ func (s *Supervisor) Shutdown() {
}).Infoln("Stoped supervisor.")
}

func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) {
func (s *Supervisor) spawnWorker(w Worker) {
atomic.AddInt64(&(srvStats.Workers), 1)
defer func() {
atomic.AddInt64(&(srvStats.Workers), -1)
Expand Down Expand Up @@ -367,23 +370,7 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com
if resp.Err != nil {
req := resp.Req
LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error())
if req.Tries < SendRetryCount {
req.Tries++
atomic.AddInt64(&(srvStats.RetryCount), 1)
logf["resend_cnt"] = req.Tries

select {
case retryq <- req:
LogWithFields(logf).
Debugf("Retry to enqueue into retryq because of http connection error with FCM.")
default:
LogWithFields(logf).
Warnf("Supervisor retry queue is full.")
}
} else {
LogWithFields(logf).
Warnf("Retry count is over than %d. Could not deliver notification.", SendRetryCount)
}
retry(retryq, req, resp.Err, logf)
return
}

Expand All @@ -395,12 +382,17 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com
LogWithFields(logf).Info("Succeeded to send a notification")
continue
}
// handle error response each registration_id
atomic.AddInt64(&(srvStats.ErrCount), 1)
switch err.Error() {
case fcmv1.Internal, fcmv1.Unavailable:
LogWithFields(logf).Warn("retrying:", err)
retry(retryq, resp.Req, err, logf)
case fcmv1.QuotaExceeded:
LogWithFields(logf).Warn("retrying after 1 min:", err)
time.AfterFunc(time.Minute, func() { retry(retryq, resp.Req, err, logf) })
case fcmv1.Unregistered, fcmv1.InvalidArgument, fcmv1.NotFound:
LogWithFields(logf).Errorf("calling error hook: %s", err)
atomic.AddInt64(&(srvStats.ErrCount), 1)
onResponse(result, errorResponseHandler.HookCmd(), cmdq)
LogWithFields(logf).Errorf("%s", err)
default:
LogWithFields(logf).Errorf("Unknown error message: %s", err)
}
Expand Down Expand Up @@ -438,7 +430,7 @@ func spawnSender(wq <-chan Request, respq chan<- SenderResponse, wgrp *sync.Wait
no := req.Notification.(apns.Notification)
start := time.Now()
results, err := ac.Send(no)
respTime := time.Now().Sub(start).Seconds()
respTime := time.Since(start).Seconds()
rs := make([]Result, 0, len(results))
for _, v := range results {
rs = append(rs, v)
Expand All @@ -459,7 +451,7 @@ func spawnSender(wq <-chan Request, respq chan<- SenderResponse, wgrp *sync.Wait
p := req.Notification.(fcmv1.Payload)
start := time.Now()
results, err := fcv1.Send(p)
respTime := time.Now().Sub(start).Seconds()
respTime := time.Since(start).Seconds()
rs := make([]Result, 0, len(results))
for _, v := range results {
rs = append(rs, v)
Expand Down
1 change: 1 addition & 0 deletions supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (tr TestResponseHandler) HookCmd() string {
func init() {
logrus.SetLevel(logrus.WarnLevel)
conf.Apns.Host = gunfish.MockServer
gunfish.RetryBackoff = false // for testing
}

func TestEnqueuRequestToSupervisor(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions test/gunfish_test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ cert_file = "{{ env `PROJECT_ROOT` `.` }}/test/server.crt"
key_file = "{{ env `PROJECT_ROOT` `.` }}/test/server.key"
request_per_sec = 2000
sender_num = 50

[fcm_v1]
# google_application_credentials = "{{ env `PROJECT_ROOT` `.` }}/credentials.json"
enabled = true
endpoint = "http://localhost:8888/v1/projects"
projectid = "test"
29 changes: 29 additions & 0 deletions test/tools/fcmv1mock/fcmv1mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"flag"
"fmt"
"log"
"net/http"

"github.com/kayac/Gunfish/mock"
)

func main() {
var (
port int
projectID string
verbose bool
)

flag.IntVar(&port, "port", 8888, "fcmv1 mock server port")
flag.StringVar(&projectID, "project-id", "test", "fcmv1 mock project id")
flag.BoolVar(&verbose, "verbose", false, "verbose flag")
flag.Parse()

mux := mock.FCMv1MockServer(projectID, verbose)
log.Println("start fcmv1mock server port:", port, "project_id:", projectID)
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), mux); err != nil {
log.Fatal(err)
}
}

0 comments on commit dfa8db0

Please sign in to comment.