Skip to content

Commit

Permalink
groot/riofs/plugin/http: first stab at a caching+concurrent http-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
sbinet committed Mar 11, 2022
1 parent 53a256f commit 0e0b993
Show file tree
Hide file tree
Showing 6 changed files with 704 additions and 1 deletion.
17 changes: 16 additions & 1 deletion groot/riofs/plugin/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"io"
"net/http"
"os"
"runtime"

"go-hep.org/x/hep/groot/internal/httpio"
"go-hep.org/x/hep/groot/riofs"
)

Expand All @@ -19,6 +21,20 @@ func init() {
}

func openFile(path string) (riofs.Reader, error) {
r, err := httpio.Open(path)
if err != nil {
// HTTP server may not support accept-range.
return tmpFileFrom(path)
}
rc, err := rcacheOf(&preader{r: r, n: runtime.NumCPU()})
if err != nil {
_ = r.Close()
return tmpFileFrom(path)
}
return rc, nil
}

func tmpFileFrom(path string) (riofs.Reader, error) {
resp, err := http.Get(path)
if err != nil {
return nil, err
Expand All @@ -40,7 +56,6 @@ func openFile(path string) (riofs.Reader, error) {
return nil, err
}
return &tmpFile{f}, nil

}

// tmpFile wraps a regular os.File to automatically remove it when closed.
Expand Down
98 changes: 98 additions & 0 deletions groot/riofs/plugin/http/preader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright ©2022 The go-hep Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package http

import (
"sync"

"go-hep.org/x/hep/groot/riofs"
)

type preader struct {
r reader
n int // number of workers
}

var (
_ riofs.Reader = (*preader)(nil)
)

const blkSize = 1 * 1024 * 1024 // TODO(sbinet): adjust size for multiple payloads?

func (r *preader) Close() error {
return r.r.Close()
}

func (r *preader) Read(p []byte) (int, error) {
return r.r.Read(p)
}

func (r *preader) ReadAt(p []byte, off int64) (int, error) {
switch sz := len(p); {
default:
return r.r.ReadAt(p, off)
case sz > blkSize:
return r.pread(p, off)
}
}

func (r *preader) pread(p []byte, off int64) (int, error) {
nblks := len(p) / blkSize
sps := make([]span, 0, nblks+1)
beg := off
end := off + int64(len(p))
for beg < end {
len := int64(blkSize)
if beg+len > end {
len = end - beg
}
sps = append(sps, span{
off: beg,
len: len,
})
beg += blkSize
}
out := make([]pread, len(sps))
wrk := make(chan int, r.n)
go func() {
defer close(wrk)
for i := range sps {
wrk <- i
}
}()

var wg sync.WaitGroup
wg.Add(r.n)
for i := 0; i < r.n; i++ {
go func() {
defer wg.Done()
for i := range wrk {
spn := sps[i]
beg := int64(i) * blkSize
end := beg + spn.len
out[i].n, out[i].err = r.r.ReadAt(p[beg:end], spn.off)
}
}()
}

wg.Wait()

var (
n int
err error
)
for _, o := range out {
n += o.n
if o.err != nil {
err = o.err
}
}
return n, err
}

type pread struct {
n int
err error
}
111 changes: 111 additions & 0 deletions groot/riofs/plugin/http/rcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright ©2022 The go-hep Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package http

import (
"io"
"os"
"sync"

"golang.org/x/sync/errgroup"
)

type reader interface {
io.Reader
io.ReaderAt
io.Closer
}

type store interface {
io.Reader
io.ReaderAt
io.Writer
io.WriterAt
io.Closer

Name() string
}

type rcache struct {
r reader
o store

mu sync.RWMutex
sps spans
}

func rcacheOf(r reader) (*rcache, error) {
f, err := os.CreateTemp("", "riofs-remote-")
if err != nil {
return nil, err
}

return &rcache{r: r, o: f}, nil
}

func (r *rcache) Close() error {
e1 := r.r.Close()
e2 := r.o.Close()
_ = os.RemoveAll(r.o.Name())
if e1 != nil {
return e1
}
return e2
}

func (r *rcache) Read(p []byte) (int, error) {
return r.r.Read(p)
}

func (r *rcache) ReadAt(p []byte, off int64) (int, error) {
sp := span{off: off, len: int64(len(p))}
oo := r.split(sp)
if len(oo) == 0 {
return r.o.ReadAt(p, off)
}
var (
grp errgroup.Group
ii int64
)
for i := range oo {
spa := oo[i]
beg := ii
end := ii + spa.len
ii = end
grp.Go(func() error {
return r.fetch(p[beg:end], spa)
})
}

err := grp.Wait()
if err != nil {
return 0, err
}

return r.o.ReadAt(p, off)
}

func (r *rcache) split(sp span) []span {
r.mu.RLock()
defer r.mu.RUnlock()

return split(sp, r.sps)
}

func (r *rcache) fetch(p []byte, sp span) error {
_, err := r.r.ReadAt(p, sp.off)
if err != nil {
return err
}
_, err = r.o.WriteAt(p, sp.off)
if err != nil {
return err
}
r.mu.Lock()
defer r.mu.Unlock()

r.sps.add(sp)
return nil
}
113 changes: 113 additions & 0 deletions groot/riofs/plugin/http/span.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright ©2022 The go-hep Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package http

import "sort"

type span struct {
off int64
len int64
}

func split(sp span, sps []span) []span {
if len(sps) == 0 {
return []span{sp}
}

var o []span

for _, v := range sps {
b1 := v.off
e1 := v.off + v.len
b2 := sp.off
e2 := sp.off + sp.len
switch {
case e1 <= b2:
// [ s1=v ]
// [ s2=sp ]
continue
case e2 <= b1:
// [ s1 ]
// [ s2=sp ]
o = append(o, sp)
sp.len = 0
case b2 < b1 && e1 <= e2:
// [ s1=v ]
// [ s2=sp ]
len := b1 - b2
o = append(o, span{
off: b2,
len: len,
})
sp.off = e1
sp.len -= v.len + len
case b2 < b1 && b1 < e2 && e2 < e1:
// [ s1=v ]
// [ s2=sp ]
len := b1 - b2
o = append(o, span{
off: b2,
len: len,
})
sp.len = 0
case b1 <= b2 && e2 <= e1:
// [ s1=v ]
// [s2=sp]
sp.len = 0
case b1 <= b2 && e1 < e2:
// [ s1=v ]
// [s2=sp ]
sp.off = e1
sp.len = e2 - e1
}
if sp.len == 0 {
break
}
}
if sp.len != 0 {
o = append(o, sp)
}

return o
}

type spans []span

func (p *spans) consolidate() {
for i := len(*p) - 1; i >= 1; i-- {
ii := &(*p)[i]
jj := &(*p)[i-1]
jend := jj.off + jj.len
iend := ii.off + ii.len
if jend < ii.off {
continue
}
if iend >= jend {
jj.len += iend - jend
}
p.remove(i)
}
}

func (p *spans) remove(i int) {
list := *p
*p = append(list[:i], list[i+1:]...)
}

func (p *spans) add(sp span) {
*p = append(*p, sp)
sort.Slice(*p, func(i, j int) bool {
pi := (*p)[i]
pj := (*p)[j]
if pi.off < pj.off {
return true
}
if pi.off == pj.off {
return pi.off+pi.len < pj.off+pj.len
}
return false
})
p.consolidate()
}
Loading

0 comments on commit 0e0b993

Please sign in to comment.