Skip to content

Commit

Permalink
exp/lighthorizon/index/cmd: Fix index single watch, slow down the ret…
Browse files Browse the repository at this point in the history
…ry on not-found ledgers (#4582)
  • Loading branch information
sreuland authored Sep 13, 2022
1 parent 7a5cc31 commit c9c58d2
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 15 deletions.
9 changes: 4 additions & 5 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,16 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
//
// [1]: https://stellarfoundation.slack.com/archives/C02B04RMK/p1654903342555669

// We sleep with linear backoff starting with 1s. Ledgers get posted
// We sleep with linear backoff starting with 6s. Ledgers get posted
// every 5-7s on average, but to be extra careful, let's give it a full
// minute before we give up entirely.
timedCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

sleepTime := time.Second

sleepTime := (6 * time.Second)
outer:
for {
time.Sleep(sleepTime)
select {
case <-timedCtx.Done():
return errors.Wrap(timedCtx.Err(), "awaiting next ledger failed")
Expand All @@ -318,8 +318,7 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
}

if os.IsNotExist(buildErr) {
time.Sleep(sleepTime)
sleepTime += 2
sleepTime += (time.Second * 2)
continue
}

Expand Down
2 changes: 1 addition & 1 deletion support/storage/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func checkResp(r *http.Response) error {
if r.StatusCode >= 200 && r.StatusCode < 400 {
return nil
} else {
return fmt.Errorf("Bad HTTP response '%s' for %s '%s'",
return fmt.Errorf("bad HTTP response '%s' for %s '%s'",
r.Status, r.Request.Method, r.Request.URL.String())
}
}
Expand Down
48 changes: 39 additions & 9 deletions support/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,48 @@ import (
"context"
"io"
"net/http"
"os"
"path"

log "github.com/sirupsen/logrus"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stellar/go/support/errors"
)

type s3HttpProxy interface {
Send(*s3.GetObjectInput) (io.ReadCloser, error)
}

type defaultS3HttpProxy struct {
*S3Storage
}

func (proxy *defaultS3HttpProxy) Send(params *s3.GetObjectInput) (io.ReadCloser, error) {
req, resp := proxy.svc.GetObjectRequest(params)
if proxy.unsignedRequests {
req.Handlers.Sign.Clear() // makes this request unsigned
}
req.SetContext(proxy.ctx)
logReq(req.HTTPRequest)
err := req.Send()
logResp(req.HTTPResponse)

return resp.Body, err
}

type S3Storage struct {
ctx context.Context
svc s3iface.S3API
bucket string
prefix string
unsignedRequests bool
writeACLrule string
s3Http s3HttpProxy
}

func NewS3Storage(
Expand Down Expand Up @@ -67,19 +91,16 @@ func (b *S3Storage) GetFile(pth string) (io.ReadCloser, error) {
Key: aws.String(key),
}

req, resp := b.svc.GetObjectRequest(params)
if b.unsignedRequests {
req.Handlers.Sign.Clear() // makes this request unsigned
}
req.SetContext(b.ctx)
logReq(req.HTTPRequest)
err := req.Send()
logResp(req.HTTPResponse)
resp, err := b.s3HttpProxy().Send(params)

if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, os.ErrNotExist
}
return nil, err
}

return resp.Body, nil
return resp, nil
}

func (b *S3Storage) Head(pth string) (*http.Response, error) {
Expand Down Expand Up @@ -237,3 +258,12 @@ func (b *S3Storage) CanListFiles() bool {
func (b *S3Storage) Close() error {
return nil
}

func (b *S3Storage) s3HttpProxy() s3HttpProxy {
if b.s3Http != nil {
return b.s3Http
}
return &defaultS3HttpProxy{
S3Storage: b,
}
}
62 changes: 62 additions & 0 deletions support/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ package storage

import (
"context"
"errors"
"io"
"os"
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stretchr/testify/assert"
Expand All @@ -19,6 +24,19 @@ type MockS3 struct {
s3iface.S3API
}

type MockS3HttpProxy struct {
mock.Mock
s3HttpProxy
}

func (m *MockS3HttpProxy) Send(input *s3.GetObjectInput) (io.ReadCloser, error) {
args := m.Called(input)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(io.ReadCloser), args.Error(1)
}

func TestWriteACLRuleOverride(t *testing.T) {

mockS3 := &MockS3{}
Expand Down Expand Up @@ -50,3 +68,47 @@ func TestWriteACLRuleDefault(t *testing.T) {
aclRule := s3Storage.GetACLWriteRule()
assert.Equal(t, aclRule, s3.ObjectCannedACLPublicRead)
}

func TestGetFileNotFound(t *testing.T) {
mockS3 := &MockS3{}
mockS3HttpProxy := &MockS3HttpProxy{}

mockS3HttpProxy.On("Send", mock.Anything).Return(nil,
awserr.New(s3.ErrCodeNoSuchKey, "message", errors.New("not found")))

s3Storage := S3Storage{
ctx: context.Background(),
svc: mockS3,
bucket: "bucket",
prefix: "prefix",
unsignedRequests: false,
writeACLrule: "",
s3Http: mockS3HttpProxy,
}

_, err := s3Storage.GetFile("path")

assert.Equal(t, err, os.ErrNotExist)
}

func TestGetFileFound(t *testing.T) {
mockS3 := &MockS3{}
mockS3HttpProxy := &MockS3HttpProxy{}
testCloser := io.NopCloser(strings.NewReader(""))

mockS3HttpProxy.On("Send", mock.Anything).Return(testCloser, nil)

s3Storage := S3Storage{
ctx: context.Background(),
svc: mockS3,
bucket: "bucket",
prefix: "prefix",
unsignedRequests: false,
writeACLrule: "",
s3Http: mockS3HttpProxy,
}

closer, err := s3Storage.GetFile("path")
assert.Nil(t, err)
assert.Equal(t, closer, testCloser)
}

0 comments on commit c9c58d2

Please sign in to comment.