Skip to content

Commit

Permalink
Develop (#24)
Browse files Browse the repository at this point in the history
* Return placeholder image faster

* add new metrics

* Optimization merge transformations before performing

* Placeholder as structure
  • Loading branch information
aldor007 authored Aug 5, 2018
1 parent 426ff95 commit 115ffff
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 87 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 0.12.0
* Feature: Add new monitoring metrics (time of image generation and count of it)
* Feature: Do error placeholder in background (returns it faster to user)
* Feature: Try to merge transformations before performing them
* 0.11.2
* Bugfix: Fix compress plugin (don't compress on range or condition)
* 0.11.1
Expand Down
19 changes: 16 additions & 3 deletions cmd/mort/mort.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

const (
// Version of mort
Version = "0.11.2"
Version = "0.12.0"
// BANNER just fancy command line banner
BANNER = `
/\/\ ___ _ __| |_
Expand Down Expand Up @@ -117,19 +117,32 @@ func configureMonitoring(mortConfig *config.Config) {
p.RegisterHistogramVec("storage_time", prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "mort_storage_time",
Help: "mort storage times",
Buckets: []float64{10, 50, 100, 200, 300, 400, 500, 1000, 2000, 3000, 4000, 5000, 6000, 10000, 30000, 60000, 70000, 80000},
Buckets: []float64{10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500., 1000., 2000., 3000., 4000., 5000., 6000., 10000., 30000., 60000., 70000., 80000.},
},
[]string{"method", "storage"},
))

p.RegisterHistogramVec("response_time", prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "mort_response_time",
Help: "mort response times",
Buckets: []float64{10, 50, 100, 200, 300, 400, 500, 1000, 2000, 3000, 4000, 5000, 6000, 10000, 30000, 60000, 70000, 80000},
Buckets: []float64{10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500., 1000., 2000., 3000., 4000., 5000., 6000., 10000., 30000., 60000., 70000., 80000.},
},
[]string{"method"},
))

p.RegisterCounterVec("request_type", prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "mort_request_type_count",
Help: "mort count of given request type",
},
[]string{"type"},
))

p.RegisterHistogram("generation_time", prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "mort_generation_time",
Help: "mort generation times",
Buckets: []float64{10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500., 1000., 2000., 3000., 4000., 5000., 6000., 10000., 30000., 60000., 70000., 80000.},
}))

monitoring.RegisterReporter(p)
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/aldor007/mort/pkg/helpers"
"github.com/aldor007/mort/pkg/monitoring"
"net/http"
)

// Config contains configuration for buckets etc
Expand Down Expand Up @@ -245,12 +246,14 @@ func (c *Config) validateServer() error {
c.Server.QueueLen = 5
}

if c.Server.Placeholder != "" {
var err error
c.Server.PlaceholderBuf, err = helpers.FetchObject(c.Server.Placeholder)
if c.Server.PlaceholderStr != "" {
buf, err := helpers.FetchObject(c.Server.PlaceholderStr)
if err != nil {
return err
}

c.Server.Placeholder.Buf = buf
c.Server.Placeholder.ContentType = http.DetectContentType(buf)
}

return nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ type Server struct {
QueueLen int `yaml:"queueLen"`
Listen []string `yaml:"listens"`
Monitoring string `yaml:"monitoring"`
Placeholder string `yaml:"placeholder"`
PlaceholderStr string `yaml:"placeholder"`
Plugins map[string]interface{} `yaml:"plugins,omitempty"`
PlaceholderBuf []byte
Placeholder struct {
Buf []byte
ContentType string
} `yaml:"-"`
}
3 changes: 3 additions & 0 deletions pkg/engine/image_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func NewImageEngine(res *response.Response) *ImageEngine {

// Process main ImageEngine function that create new image (stored in response object)
func (c *ImageEngine) Process(obj *object.FileObject, trans []transforms.Transforms) (*response.Response, error) {
t := monitoring.Report().Timer("generation_time")
defer t.Done()

buf, err := c.parent.ReadBody()
if err != nil {
return response.NewError(500, err), err
Expand Down
4 changes: 2 additions & 2 deletions pkg/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type Reporter interface {

// Timer is used for time measurement
type Timer struct {
start time.Time
start time.Time
metric string
done func(start time.Time, metric string)
done func(start time.Time, metric string)
}

// Done end time measurement
Expand Down
2 changes: 1 addition & 1 deletion pkg/monitoring/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (p *PrometheusReporter) Histogram(metric string, val float64) {
func (p *PrometheusReporter) Timer(metric string) Timer {
t := Timer{time.Now(), metric, func(start time.Time, metricName string) {
timeDiff := time.Since(start)
p.Histogram(metricName, float64(timeDiff.Nanoseconds()/1000.0))
p.Histogram(metricName, float64(timeDiff.Nanoseconds())/1000.0)
}}

return t
Expand Down
79 changes: 30 additions & 49 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,53 +111,37 @@ func (r *RequestProcessor) processChan(ctx context.Context) {
}

func (r *RequestProcessor) replyWithError(obj *object.FileObject, sc int, err error) *response.Response {
if !obj.HasTransform() || obj.Debug || r.serverConfig.Placeholder == "" {
if !obj.HasTransform() || obj.Debug || r.serverConfig.PlaceholderStr == "" {
return response.NewError(sc, err)
}

key := r.serverConfig.Placeholder + strconv.FormatUint(obj.Transforms.Hash().Sum64(), 16)
key := r.serverConfig.PlaceholderStr + strconv.FormatUint(obj.Transforms.Hash().Sum64(), 16)
if cacheRes := r.fetchResponseFromCache(key, true); cacheRes != nil {
cacheRes.StatusCode = sc
return cacheRes
}

lockResult, locked := r.collapse.Lock(key)
if locked {
defer r.collapse.Release(key)
monitoring.Log().Info("Lock acquired for error response", zap.String("obj.Key", obj.Key))
parent := response.NewBuf(200, r.serverConfig.PlaceholderBuf)
transformsTab := []transforms.Transforms{obj.Transforms}

eng := engine.NewImageEngine(parent)
res, errProcess := eng.Process(obj, transformsTab)

if errProcess != nil {
return response.NewError(sc, err)
}

res.StatusCode = sc
resCpy, errCpy := res.Copy()
if errCpy != nil {
r.cache.Set(key, resCpy, time.Minute*10)
}
return res
}

timer := time.NewTimer(r.lockTimeout)

for {
go func() {
lockData, locked := r.collapse.Lock(key)
if locked {
defer r.collapse.Release(key)
monitoring.Log().Info("Lock acquired for error response", zap.String("obj.Key", obj.Key))
parent := response.NewBuf(200, r.serverConfig.Placeholder.Buf)
transformsTab := []transforms.Transforms{obj.Transforms}

eng := engine.NewImageEngine(parent)
res, _ := eng.Process(obj, transformsTab)
monitoring.Report().Inc("cache_ratio;status:set")
r.cache.Set(key, res, time.Minute*10)
} else {
lockData.Cancel <- true

select {
case <-timer.C:
return response.NewError(sc, err)
default:
if cacheRes := r.fetchResponseFromCache(key, false); cacheRes != nil {
lockResult.Cancel <- true
return cacheRes
}
}
}
}()

res := response.NewBuf(sc, r.serverConfig.Placeholder.Buf)
res.SetContentType(r.serverConfig.Placeholder.ContentType)
return res
}

func (r *RequestProcessor) process(req *http.Request, obj *object.FileObject) *response.Response {
Expand Down Expand Up @@ -331,6 +315,7 @@ func (r *RequestProcessor) handleGET(req *http.Request, obj *object.FileObject)

} else {
if res.StatusCode == 200 {
monitoring.Report().Inc("request_type;type:download")
if obj.CheckParent && parentObj != nil && parentRes.StatusCode == 200 {
return res
}
Expand Down Expand Up @@ -375,17 +360,6 @@ func (r *RequestProcessor) handleNotFound(obj, parentObj *object.FileObject, tra

defer parentRes.Close()

transLen := len(transformsTab)
if transLen > 1 {
// revers order of transforms
for i := 0; i < len(transformsTab)/2; i++ {
j := len(transformsTab) - i - 1
transformsTab[i], transformsTab[j] = transformsTab[j], transformsTab[i]
}

}

monitoring.Log().Info("Performing transforms", zap.String("obj.Bucket", obj.Bucket), zap.String("obj.Key", obj.Key), zap.Int("transformsLen", len(transformsTab)))
return r.processImage(obj, parentRes, transformsTab)
} else if obj.HasTransform() {
parentRes.Close()
Expand Down Expand Up @@ -430,7 +404,8 @@ func handleS3Get(req *http.Request, obj *object.FileObject) *response.Response {

}

func (r *RequestProcessor) processImage(obj *object.FileObject, parent *response.Response, transforms []transforms.Transforms) *response.Response {
func (r *RequestProcessor) processImage(obj *object.FileObject, parent *response.Response, transformsTab []transforms.Transforms) *response.Response {
monitoring.Report().Inc("request_type;type:transform")
ctx := obj.Ctx
taked := r.throttler.Take(ctx)
if !taked {
Expand All @@ -440,14 +415,20 @@ func (r *RequestProcessor) processImage(obj *object.FileObject, parent *response
}
defer r.throttler.Release()

transformsLen := len(transformsTab)
mergedTrans := transforms.Merge(transformsTab)
mergedLen := len(mergedTrans)

monitoring.Log().Info("Performing transforms", zap.String("obj.Bucket", obj.Bucket), zap.String("obj.Key", obj.Key), zap.Int("transformsLen", transformsLen), zap.Int("mergedLen", mergedLen))
eng := engine.NewImageEngine(parent)
res, err := eng.Process(obj, transforms)
res, err := eng.Process(obj, mergedTrans)
if err != nil {
return response.NewError(400, err)
}

resCpy, err := res.Copy()
if err == nil {
monitoring.Report().Inc("cache_ratio;status:set")
r.cache.Set(obj.Key, resCpy, time.Minute*2)
go func(objS object.FileObject, resS *response.Response) {
storage.Set(&objS, resS.Headers, resS.ContentLength, resS.Stream())
Expand Down
98 changes: 98 additions & 0 deletions pkg/transforms/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,101 @@ func TestTransforms_Watermark(t *testing.T) {

assert.NotNil(t, err)
}

func TestTransforms_Merge_Resize(t *testing.T) {
tab := make([]Transforms, 2)
tab[0].Resize(100, 0, false)

tab[1].Resize(0, 300, true)

result := Merge(tab)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].width, 100)
assert.Equal(t, result[0].height, 300)
assert.Equal(t, result[0].enlarge, true)
}

func TestTransforms_Merge_Crop(t *testing.T) {
tab := make([]Transforms, 2)
tab[0].Crop(4444, 0, "smart", false)

tab[1].Crop(0, 120, "smart", false)

result := Merge(tab)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].width, 4444)
assert.Equal(t, result[0].height, 120)
assert.Equal(t, result[0].enlarge, false)
assert.Equal(t, result[0].crop, true)
assert.Equal(t, result[0].gravity, bimg.GravitySmart)
}

func TestTransforms_Merge_Blur(t *testing.T) {
tab := make([]Transforms, 3)
tab[0].Blur(1., 3.)
tab[1].Blur(2., 4.)
tab[2].Blur(3., 3.)

result := Merge(tab)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].blur.sigma, 6.)
assert.Equal(t, result[0].blur.minAmpl, 10.)
}

func TestTransforms_Merge_Single(t *testing.T) {
tab := make([]Transforms, 1)
tab[0].Blur(1., 3.)

result := Merge(tab)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].blur.sigma, 1.)
assert.Equal(t, result[0].blur.minAmpl, 3.)
}

func TestTransforms_Merge_MultiTrans(t *testing.T) {
tab := make([]Transforms, 4)
tab[0].Blur(1., 3.)
tab[0].Quality(10)
tab[1].Interlace()
tab[2].StripMetadata()
tab[3].Format("webp")

result := Merge(tab)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].blur.sigma, 1.)
assert.Equal(t, result[0].blur.minAmpl, 3.)
assert.Equal(t, result[0].interlace, true)
assert.Equal(t, result[0].stripMetadata, true)
assert.Equal(t, result[0].format, bimg.WEBP)
assert.Equal(t, result[0].FormatStr, "webp")
assert.Equal(t, result[0].quality, 10)
}

func TestTransforms_Merge_Empty(t *testing.T) {
tab := make([]Transforms, 1)

result := Merge(tab)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].NotEmpty, false)
}

func TestTransforms_Merge_Watermark(t *testing.T) {
tab := make([]Transforms, 3)
tab[0].Blur(1., 3.)
tab[0].Watermark("image2", "top-left", 2.)
tab[1].Watermark("image", "top-left", 2.)
tab[2].Blur(3., 3.)

result := Merge(tab)

assert.Equal(t, len(result), 2)
assert.Equal(t, result[0].blur.sigma, 3.)
assert.Equal(t, result[0].blur.minAmpl, 3.)
assert.Equal(t, result[1].watermark.image, "image2")
}
Loading

0 comments on commit 115ffff

Please sign in to comment.