Skip to content

Commit

Permalink
slicer: Reuse PayloadStream for Slice method
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
  • Loading branch information
smallhive committed Jul 11, 2023
1 parent ea65cad commit 8358b6a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 141 deletions.
162 changes: 21 additions & 141 deletions object/slicer/slicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,6 @@ func NewSession(w ObjectWriter, token session.Object) *Slicer {
return slicer
}

// fillChildMetadata writes the metadata to all child parts of object of the same stream.
func (x *Slicer) fillChildMetadata(obj *object.Object, containerID cid.ID, epoch uint64, userID user.ID) {
currentVersion := version.Current()
obj.SetVersion(&currentVersion)
obj.SetContainerID(containerID)
obj.SetCreationEpoch(epoch)
obj.SetOwnerID(&userID)
obj.SetType(object.TypeRegular)
obj.SetSessionToken(x.sessionToken)
}

// childPayloadSizeLimit returns configured size limit of the child object's
// payload which defaults to 1MB.
func childPayloadSizeLimit(opts Options) uint64 {
Expand Down Expand Up @@ -146,149 +135,36 @@ func headerData(header object.Object) (cid.ID, user.ID, error) {
func (x *Slicer) Slice(header object.Object, data io.Reader, signer neofscrypto.Signer, opts Options) (oid.ID, error) {
var rootID oid.ID

containerID, owner, err := headerData(header)
if err != nil {
return rootID, err
}

if x.sessionToken != nil {
header.SetSessionToken(x.sessionToken)
// session issuer is a container owner.
issuer := x.sessionToken.Issuer()
owner = issuer
header.SetOwnerID(&owner)
}

// fill extra info to the header.
currentVersion := version.Current()
header.SetVersion(&currentVersion)
header.SetCreationEpoch(opts.currentNeoFSEpoch)

objectPayloadLimit := childPayloadSizeLimit(opts)

var offset uint64
var isSplit bool
var childMeta dynamicObjectMetadata
var writtenChildren []oid.ID
var childHeader object.Object
rootMeta := newDynamicObjectMetadata(opts.withHomoChecksum)
bChunk := make([]byte, objectPayloadLimit+1)
var n int
bChunk := make([]byte, objectPayloadLimit)

writer, err := x.InitPayloadStream(header, signer, opts)
if err != nil {
return rootID, fmt.Errorf("init writter: %w", err)
}

for {
n, err := data.Read(bChunk[offset:])
if err == nil {
if last := offset + uint64(n); last <= objectPayloadLimit {
rootMeta.accumulateNextPayloadChunk(bChunk[offset:last])
if isSplit {
childMeta.accumulateNextPayloadChunk(bChunk[offset:last])
}
offset = last
// data is not over, and we expect more bytes to form next object
continue
}
} else {
n, err = data.Read(bChunk)
if err != nil {
if !errors.Is(err, io.EOF) {
return rootID, fmt.Errorf("read payload chunk: %w", err)
}

// there will be no more data

toSend := offset + uint64(n)
if toSend <= objectPayloadLimit {
rootID, err = flushObjectMetadata(signer, rootMeta, &header)
if err != nil {
return rootID, fmt.Errorf("form root object: %w", err)
}

if isSplit {
// when splitting, root object's header is written into its last child
childHeader.SetParent(&header)
childHeader.SetPreviousID(writtenChildren[len(writtenChildren)-1])

childID, err := writeInMemObject(signer, x.w, childHeader, bChunk[:toSend], childMeta)
if err != nil {
return rootID, fmt.Errorf("write child object: %w", err)
}

writtenChildren = append(writtenChildren, childID)
} else {
// root object is single (full < limit), so send it directly
rootID, err = writeInMemObject(signer, x.w, header, bChunk[:toSend], rootMeta)
if err != nil {
return rootID, fmt.Errorf("write single root object: %w", err)
}

return rootID, nil
}

break
}

// otherwise, form penultimate object, then do one more iteration for
// simplicity: according to io.Reader, we'll get io.EOF again, but the overflow
// will no longer occur, so we'll finish the loop
}

// according to buffer size, here we can overflow the object payload limit, e.g.
// 1. full=11B,limit=10B,read=11B (no objects created yet)
// 2. full=21B,limit=10B,read=11B (one object has been already sent with size=10B)

toSend := offset + uint64(n)
overflow := toSend > objectPayloadLimit
if overflow {
toSend = objectPayloadLimit
}

// we could read some data even in case of io.EOF, so don't forget pick up the tail
if n > 0 {
rootMeta.accumulateNextPayloadChunk(bChunk[offset:toSend])
if isSplit {
childMeta.accumulateNextPayloadChunk(bChunk[offset:toSend])
}
}

if overflow {
isSplitCp := isSplit // we modify it in next condition below but need after it
if !isSplit {
// we send only child object below, but we can get here at the beginning (see
// option 1 described above), so we need to pre-init child resources
isSplit = true
x.fillChildMetadata(&childHeader, containerID, opts.currentNeoFSEpoch, owner)

childHeader.SetSplitID(object.NewSplitID())
childMeta = rootMeta
// we do shallow copy of rootMeta because below we take this into account and do
// not corrupt it
} else {
childHeader.SetPreviousID(writtenChildren[len(writtenChildren)-1])
}
// no more data to read

childID, err := writeInMemObject(signer, x.w, childHeader, bChunk[:toSend], childMeta)
if err != nil {
return rootID, fmt.Errorf("write child object: %w", err)
if err = writer.Close(); err != nil {
return rootID, fmt.Errorf("writer close: %w", err)
}

writtenChildren = append(writtenChildren, childID)

// shift overflow bytes to the beginning
if !isSplitCp {
childMeta = newDynamicObjectMetadata(opts.withHomoChecksum) // to avoid rootMeta corruption
}
childMeta.reset()
childMeta.accumulateNextPayloadChunk(bChunk[toSend:])
rootMeta.accumulateNextPayloadChunk(bChunk[toSend:])
offset = uint64(copy(bChunk, bChunk[toSend:]))
rootID = writer.ID()
break
}
}

// linking object
childMeta.reset()
childHeader.ResetPreviousID()
childHeader.SetChildren(writtenChildren...)

_, err = writeInMemObject(signer, x.w, childHeader, nil, childMeta)
if err != nil {
return rootID, fmt.Errorf("write linking object: %w", err)
if _, err = writer.Write(bChunk[:n]); err != nil {
return oid.ID{}, err
}
}

return rootID, nil
Expand All @@ -310,6 +186,10 @@ func (x *Slicer) InitPayloadStream(header object.Object, signer neofscrypto.Sign
header.SetOwnerID(&owner)
}

header.SetCreationEpoch(opts.currentNeoFSEpoch)
currentVersion := version.Current()
header.SetVersion(&currentVersion)

res := &PayloadWriter{
isHeaderWriteStep: true,
headerObject: header,
Expand Down
6 changes: 6 additions & 0 deletions object/slicer/slicer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ func testSlicer(tb testing.TB, size, sizeLimit uint64) {
// check writer with random written chunk's size
checker.chainCollector = newChainCollector(tb)

// we need recreate object, because it may already have splitID and another info.
hdr = object.Object{}
hdr.SetContainerID(checker.input.container)
hdr.SetOwnerID(&checker.input.owner)
hdr.SetAttributes(in.attributes...)

w, err := s.InitPayloadStream(hdr, in.signer, opts)
require.NoError(tb, err)

Expand Down

0 comments on commit 8358b6a

Please sign in to comment.