Skip to content

Commit

Permalink
Traversal-based car creation (#269)
Browse files Browse the repository at this point in the history
Add a writing/creation API in the car v2 package. Allows creation of file and streaming car files in a selector order.
  • Loading branch information
willscott committed Nov 30, 2021
1 parent fa995b9 commit c35591a
Show file tree
Hide file tree
Showing 18 changed files with 690 additions and 66 deletions.
9 changes: 6 additions & 3 deletions v2/blockstore/insertionindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,13 @@ func (ii *insertionIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
return nil
}

func (ii *insertionIndex) Marshal(w io.Writer) error {
func (ii *insertionIndex) Marshal(w io.Writer) (uint64, error) {
l := uint64(0)
if err := binary.Write(w, binary.LittleEndian, int64(ii.items.Len())); err != nil {
return err
return l, err
}
l += 8

var err error
iter := func(i llrb.Item) bool {
if err = cbor.Encode(w, i.(recordDigest).Record); err != nil {
Expand All @@ -115,7 +118,7 @@ func (ii *insertionIndex) Marshal(w io.Writer) error {
return true
}
ii.items.AscendGreaterOrEqual(ii.items.Min(), iter)
return err
return l, err
}

func (ii *insertionIndex) Unmarshal(r io.Reader) error {
Expand Down
2 changes: 1 addition & 1 deletion v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (b *ReadWrite) Finalize() error {
if err != nil {
return err
}
if err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil {
if _, err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil {
return err
}
if _, err := b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize)); err != nil {
Expand Down
12 changes: 5 additions & 7 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ go 1.16

require (
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.0.8-0.20210716091050-de6c03deae1c
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
github.com/klauspost/cpuid/v2 v2.0.8 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.14.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211119015904-892d2f92ba8b
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-varint v0.0.6
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/stretchr/testify v1.7.0
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/exp v0.0.0-20210615023648-acb5c1269671
Expand Down
104 changes: 82 additions & 22 deletions v2/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion v2/index/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func ExampleWriteTo() {
panic(err)
}
}()
err = index.WriteTo(idx, f)
_, err = index.WriteTo(idx, f)
if err != nil {
panic(err)
}
Expand Down
16 changes: 11 additions & 5 deletions v2/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/ipfs/go-cid"
)

// CarIndexNone is a sentinal value used as a multicodec code for the index indicating no index.
const CarIndexNone = 0x300000

type (
// Record is a pre-processed record of a car item and location.
Record struct {
Expand All @@ -40,7 +43,7 @@ type (
Codec() multicodec.Code

// Marshal encodes the index in serial form.
Marshal(w io.Writer) error
Marshal(w io.Writer) (uint64, error)
// Unmarshal decodes the index from its serial form.
Unmarshal(r io.Reader) error

Expand Down Expand Up @@ -118,13 +121,16 @@ func New(codec multicodec.Code) (Index, error) {
// WriteTo writes the given idx into w.
// The written bytes include the index encoding.
// This can then be read back using index.ReadFrom
func WriteTo(idx Index, w io.Writer) error {
func WriteTo(idx Index, w io.Writer) (uint64, error) {
buf := make([]byte, binary.MaxVarintLen64)
b := varint.PutUvarint(buf, uint64(idx.Codec()))
if _, err := w.Write(buf[:b]); err != nil {
return err
n, err := w.Write(buf[:b])
if err != nil {
return uint64(n), err
}
return idx.Marshal(w)

l, err := idx.Marshal(w)
return uint64(n) + l, err
}

// ReadFrom reads index from r.
Expand Down
6 changes: 4 additions & 2 deletions v2/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func TestWriteTo(t *testing.T) {
destF, err := os.Create(dest)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, destF.Close()) })
require.NoError(t, WriteTo(wantIdx, destF))
_, err = WriteTo(wantIdx, destF)
require.NoError(t, err)

// Seek to the beginning of the written out file.
_, err = destF.Seek(0, io.SeekStart)
Expand All @@ -126,6 +127,7 @@ func TestMarshalledIndexStartsWithCodec(t *testing.T) {

// Assert the first two bytes are the corresponding multicodec code.
buf := new(bytes.Buffer)
require.NoError(t, WriteTo(wantIdx, buf))
_, err = WriteTo(wantIdx, buf)
require.NoError(t, err)
require.Equal(t, varint.ToUvarint(uint64(multicodec.CarIndexSorted)), buf.Bytes()[:2])
}
28 changes: 17 additions & 11 deletions v2/index/indexsorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,18 @@ func (r recordSet) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}

func (s *singleWidthIndex) Marshal(w io.Writer) error {
func (s *singleWidthIndex) Marshal(w io.Writer) (uint64, error) {
l := uint64(0)
if err := binary.Write(w, binary.LittleEndian, s.width); err != nil {
return err
return 0, err
}
l += 4
if err := binary.Write(w, binary.LittleEndian, int64(len(s.index))); err != nil {
return err
return l, err
}
// TODO: we could just w.Write(s.index) here and avoid overhead
_, err := io.Copy(w, bytes.NewBuffer(s.index))
return err
l += 8
n, err := w.Write(s.index)
return l + uint64(n), err
}

func (s *singleWidthIndex) Unmarshal(r io.Reader) error {
Expand Down Expand Up @@ -158,10 +160,12 @@ func (m *multiWidthIndex) Codec() multicodec.Code {
return multicodec.CarIndexSorted
}

func (m *multiWidthIndex) Marshal(w io.Writer) error {
func (m *multiWidthIndex) Marshal(w io.Writer) (uint64, error) {
l := uint64(0)
if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil {
return err
return l, err
}
l += 4

// The widths are unique, but ranging over a map isn't deterministic.
// As per the CARv2 spec, we must order buckets by digest length.
Expand All @@ -176,11 +180,13 @@ func (m *multiWidthIndex) Marshal(w io.Writer) error {

for _, width := range widths {
bucket := (*m)[width]
if err := bucket.Marshal(w); err != nil {
return err
n, err := bucket.Marshal(w)
l += n
if err != nil {
return l, err
}
}
return nil
return l, nil
}

func (m *multiWidthIndex) Unmarshal(r io.Reader) error {
Expand Down
20 changes: 12 additions & 8 deletions v2/index/mhindexsorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func newMultiWidthCodedIndex() *multiWidthCodedIndex {
}
}

func (m *multiWidthCodedIndex) Marshal(w io.Writer) error {
func (m *multiWidthCodedIndex) Marshal(w io.Writer) (uint64, error) {
if err := binary.Write(w, binary.LittleEndian, m.code); err != nil {
return err
return 8, err
}
return m.multiWidthIndex.Marshal(w)
n, err := m.multiWidthIndex.Marshal(w)
return 8 + n, err
}

func (m *multiWidthCodedIndex) Unmarshal(r io.Reader) error {
Expand All @@ -59,22 +60,25 @@ func (m *MultihashIndexSorted) Codec() multicodec.Code {
return multicodec.CarMultihashIndexSorted
}

func (m *MultihashIndexSorted) Marshal(w io.Writer) error {
func (m *MultihashIndexSorted) Marshal(w io.Writer) (uint64, error) {
if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil {
return err
return 4, err
}
// The codes are unique, but ranging over a map isn't deterministic.
// As per the CARv2 spec, we must order buckets by digest length.
// TODO update CARv2 spec to reflect this for the new index type.
codes := m.sortedMultihashCodes()
l := uint64(4)

for _, code := range codes {
mwci := (*m)[code]
if err := mwci.Marshal(w); err != nil {
return err
n, err := mwci.Marshal(w)
l += n
if err != nil {
return l, err
}
}
return nil
return l, nil
}

func (m *MultihashIndexSorted) sortedMultihashCodes() []uint64 {
Expand Down
2 changes: 1 addition & 1 deletion v2/index/mhindexsorted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMultiWidthCodedIndex_MarshalUnmarshal(t *testing.T) {

// Marshal the index.
buf := new(bytes.Buffer)
err = subject.Marshal(buf)
_, err = subject.Marshal(buf)
require.NoError(t, err)

// Unmarshal it back to another instance of mh sorted index.
Expand Down
61 changes: 61 additions & 0 deletions v2/internal/loader/counting_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package loader

import (
"bytes"
"io"

"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking"
"github.com/multiformats/go-varint"
)

// counter tracks how much data has been read.
type counter struct {
totalRead uint64
}

func (c *counter) Size() uint64 {
return c.totalRead
}

// ReadCounter provides an externally consumable interface to the
// additional data tracked about the linksystem.
type ReadCounter interface {
Size() uint64
}

type countingReader struct {
r io.Reader
c *counter
}

func (c *countingReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.c.totalRead += uint64(n)
return n, err
}

// CountingLinkSystem wraps an ipld linksystem with to track the size of
// data loaded in a `counter` object. Each time nodes are loaded from the
// link system which trigger block reads, the size of the block as it would
// appear in a CAR file is added to the counter (included the size of the
// CID and the varint length for the block data).
func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) {
c := counter{}
clc := ls
clc.StorageReadOpener = func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) {
r, err := ls.StorageReadOpener(lc, l)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)
n, err := buf.ReadFrom(r)
if err != nil {
return nil, err
}
size := varint.ToUvarint(uint64(n) + uint64(len(l.Binary())))
c.totalRead += uint64(len(size)) + uint64(len(l.Binary()))
return &countingReader{buf, &c}, nil
}
return clc, &c
}
Loading

0 comments on commit c35591a

Please sign in to comment.