Skip to content

Commit

Permalink
Expose the ability to iterate over records in MultihasIndexSorted
Browse files Browse the repository at this point in the history
Implement API that allows iteration over multihashes and offsets in
`MultihasIndexSorted`. The API is specific to this index type;
therefore, the type is now exported along with its constructor funtion.

Write test that asserts `GetAll` and `ForEach` behave consistently.

Relates to #95
  • Loading branch information
masih committed Sep 7, 2021
1 parent ccffb5c commit 1398bba
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 17 deletions.
4 changes: 2 additions & 2 deletions v2/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type (
// Load inserts a number of records into the index.
Load([]Record) error

// Get looks up all blocks matching a given CID,
// GetAll looks up all blocks matching a given CID,
// calling a function for each one of their offsets.
//
// If the function returns false, GetAll stops.
Expand Down Expand Up @@ -73,7 +73,7 @@ func New(codec multicodec.Code) (Index, error) {
case multicodec.CarIndexSorted:
return newSorted(), nil
case multicodec.CarMultihashIndexSorted:
return newMultihashSorted(), nil
return NewMultihashSorted(), nil
default:
return nil, fmt.Errorf("unknwon index codec: %v", codec)
}
Expand Down
26 changes: 26 additions & 0 deletions v2/index/indexsorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/multiformats/go-multihash"
)

var _ Index = (*multiWidthIndex)(nil)

type (
digestRecord struct {
digest []byte
Expand Down Expand Up @@ -120,6 +122,21 @@ func (s *singleWidthIndex) Load(items []Record) error {
return nil
}

func (s *singleWidthIndex) forEachDigest(f func(digest []byte, offset uint64) error) error {
segmentCount := len(s.index) / int(s.width)
for i := 0; i < segmentCount; i++ {
digestStart := i * int(s.width)
offsetEnd := (i + 1) * int(s.width)
digestEnd := offsetEnd - 8
digest := s.index[digestStart:digestEnd]
offset := binary.LittleEndian.Uint64(s.index[digestEnd:offsetEnd])
if err := f(digest, offset); err != nil {
return err
}
}
return nil
}

func (m *multiWidthIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
d, err := multihash.Decode(c.Hash())
if err != nil {
Expand Down Expand Up @@ -210,6 +227,15 @@ func (m *multiWidthIndex) Load(items []Record) error {
return nil
}

func (m *multiWidthIndex) forEachDigest(f func(digest []byte, offset uint64) error) error {
for _, swi := range *m {
if err := swi.forEachDigest(f); err != nil {
return err
}
}
return nil
}

func newSorted() Index {
m := make(multiWidthIndex)
return &m
Expand Down
46 changes: 34 additions & 12 deletions v2/index/mhindexsorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/multiformats/go-multihash"
)

var _ Index = (*MultihashIndexSorted)(nil)

type (
// multihashIndexSorted maps multihash code (i.e. hashing algorithm) to multiWidthCodedIndex.
// MultihashIndexSorted maps multihash code (i.e. hashing algorithm) to multiWidthCodedIndex.
// This index ignores any Record with multihash.IDENTITY.
multihashIndexSorted map[uint64]*multiWidthCodedIndex
MultihashIndexSorted map[uint64]*multiWidthCodedIndex
// multiWidthCodedIndex stores multihash code for each multiWidthIndex.
multiWidthCodedIndex struct {
multiWidthIndex
Expand Down Expand Up @@ -41,11 +43,21 @@ func (m *multiWidthCodedIndex) Unmarshal(r io.Reader) error {
return m.multiWidthIndex.Unmarshal(r)
}

func (m *multihashIndexSorted) Codec() multicodec.Code {
func (m *multiWidthCodedIndex) forEach(f func(mh multihash.Multihash, offset uint64) error) error {
return m.multiWidthIndex.forEachDigest(func(digest []byte, offset uint64) error {
mh, err := multihash.Encode(digest, m.code)
if err != nil {
return err
}
return f(mh, offset)
})
}

func (m *MultihashIndexSorted) Codec() multicodec.Code {
return multicodec.CarMultihashIndexSorted
}

func (m *multihashIndexSorted) Marshal(w io.Writer) error {
func (m *MultihashIndexSorted) Marshal(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil {
return err
}
Expand All @@ -63,7 +75,7 @@ func (m *multihashIndexSorted) Marshal(w io.Writer) error {
return nil
}

func (m *multihashIndexSorted) sortedMultihashCodes() []uint64 {
func (m *MultihashIndexSorted) sortedMultihashCodes() []uint64 {
codes := make([]uint64, 0, len(*m))
for code := range *m {
codes = append(codes, code)
Expand All @@ -74,7 +86,7 @@ func (m *multihashIndexSorted) sortedMultihashCodes() []uint64 {
return codes
}

func (m *multihashIndexSorted) Unmarshal(r io.Reader) error {
func (m *MultihashIndexSorted) Unmarshal(r io.Reader) error {
var l int32
if err := binary.Read(r, binary.LittleEndian, &l); err != nil {
return err
Expand All @@ -89,11 +101,11 @@ func (m *multihashIndexSorted) Unmarshal(r io.Reader) error {
return nil
}

func (m *multihashIndexSorted) put(mwci *multiWidthCodedIndex) {
func (m *MultihashIndexSorted) put(mwci *multiWidthCodedIndex) {
(*m)[mwci.code] = mwci
}

func (m *multihashIndexSorted) Load(records []Record) error {
func (m *MultihashIndexSorted) Load(records []Record) error {
// TODO optimize load by avoiding multihash decoding twice.
// This implementation decodes multihashes twice: once here to group by code, and once in the
// internals of multiWidthIndex to group by digest length. The code can be optimized by
Expand Down Expand Up @@ -132,7 +144,7 @@ func (m *multihashIndexSorted) Load(records []Record) error {
return nil
}

func (m *multihashIndexSorted) GetAll(cid cid.Cid, f func(uint64) bool) error {
func (m *MultihashIndexSorted) GetAll(cid cid.Cid, f func(uint64) bool) error {
hash := cid.Hash()
dmh, err := multihash.Decode(hash)
if err != nil {
Expand All @@ -145,14 +157,24 @@ func (m *multihashIndexSorted) GetAll(cid cid.Cid, f func(uint64) bool) error {
return mwci.GetAll(cid, f)
}

func (m *multihashIndexSorted) get(dmh *multihash.DecodedMultihash) (*multiWidthCodedIndex, error) {
// ForEach calls f for every multihash and its associated offset stored by this index.
func (m *MultihashIndexSorted) ForEach(f func(mh multihash.Multihash, offset uint64) error) error {
for _, mwci := range *m {
if err := mwci.forEach(f); err != nil {
return err
}
}
return nil
}

func (m *MultihashIndexSorted) get(dmh *multihash.DecodedMultihash) (*multiWidthCodedIndex, error) {
if codedIdx, ok := (*m)[dmh.Code]; ok {
return codedIdx, nil
}
return nil, ErrNotFound
}

func newMultihashSorted() Index {
index := make(multihashIndexSorted)
func NewMultihashSorted() *MultihashIndexSorted {
index := make(MultihashIndexSorted)
return &index
}
51 changes: 48 additions & 3 deletions v2/index_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,53 @@ func TestMultihashIndexSortedConsistencyWithIndexSorted(t *testing.T) {
}
}

func generateMultihashSortedIndex(t *testing.T, path string) index.Index {
func TestMultihashSorted_ForEachIsConsistentWithGetAll(t *testing.T) {
path := "testdata/sample-v1.car"
f, err := os.Open(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })

br, err := carv2.NewBlockReader(f)
require.NoError(t, err)
subject := generateMultihashSortedIndex(t, path)

gotForEach := make(map[string]uint64)
err = subject.ForEach(func(mh multihash.Multihash, offset uint64) error {
gotForEach[mh.String()] = offset
return nil
})
require.NoError(t, err)

for {
b, err := br.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
c := b.Cid()
dmh, err := multihash.Decode(c.Hash())
require.NoError(t, err)
if dmh.Code == multihash.IDENTITY {
continue
}

wantMh := c.Hash()

var wantOffset uint64
err = subject.GetAll(c, func(u uint64) bool {
wantOffset = u
return false
})
require.NoError(t, err)

s := wantMh.String()
gotOffset, ok := gotForEach[s]
require.True(t, ok)
require.Equal(t, wantOffset, gotOffset)
}
}

func generateMultihashSortedIndex(t *testing.T, path string) *index.MultihashIndexSorted {
f, err := os.Open(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
Expand All @@ -219,8 +265,7 @@ func generateMultihashSortedIndex(t *testing.T, path string) index.Index {
require.NoError(t, err)
require.Equal(t, uint64(1), header.Version)

idx, err := index.New(multicodec.CarMultihashIndexSorted)
require.NoError(t, err)
idx := index.NewMultihashSorted()
records := make([]index.Record, 0)

var sectionOffset int64
Expand Down

0 comments on commit 1398bba

Please sign in to comment.