Skip to content

Commit

Permalink
client: Pass header as explicit param in ObjectPutInit
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
  • Loading branch information
smallhive committed Jun 7, 2023
1 parent 8d9a8fe commit e19e49a
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 60 deletions.
42 changes: 24 additions & 18 deletions client/object_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (x *PrmObjectPutInit) UseSigner(signer neofscrypto.Signer) {
x.signer = signer
}

// Signer returns associated with request signer.
func (x PrmObjectPutInit) Signer() neofscrypto.Signer {
return x.signer
}

// WithBearerToken attaches bearer token to be used for the operation.
// Should be called once before any writing steps.
func (x *PrmObjectPutInit) WithBearerToken(t bearer.Token) {
Expand All @@ -91,6 +96,11 @@ func (x *PrmObjectPutInit) WithinSession(t session.Object) {
x.meta.SetSessionToken(&tv2)
}

// IsSessionSet checks is session within which object should be stored is set.
func (x PrmObjectPutInit) IsSessionSet() bool {
return x.meta.GetSessionToken() != nil
}

// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectPutInit) MarkLocal() {
x.meta.SetTTL(1)
Expand All @@ -104,9 +114,9 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}

// WriteHeader writes header of the object. Result means success.
// writeHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
func (x *ObjectWriter) writeHeader(hdr object.Object) error {
v2Hdr := hdr.ToV2()

x.partInit.SetObjectID(v2Hdr.GetObjectID())
Expand All @@ -119,11 +129,11 @@ func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
x.err = signServiceMessage(x.signer, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
return x.err
}

x.err = x.stream.Write(&x.req)
return x.err == nil
return x.err
}

// WritePayloadChunk writes chunk of the object payload. Result means success.
Expand Down Expand Up @@ -233,7 +243,7 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
//
// Returns errors:
// - [ErrMissingSigner]
func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*ObjectWriter, error) {
func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, prm PrmObjectPutInit) (*ObjectWriter, error) {
var w ObjectWriter

signer, err := c.getSigner(prm.signer)
Expand All @@ -256,6 +266,11 @@ func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*Obje
w.req.SetBody(new(v2object.PutRequestBody))
c.prepareRequest(&w.req, &prm.meta)

if err = w.writeHeader(hdr); err != nil {
_, _ = w.Close()
return nil, fmt.Errorf("header write: %w", err)
}

return &w, nil
}

Expand All @@ -267,23 +282,14 @@ type objectWriter struct {
func (x *objectWriter) InitDataStream(header object.Object) (io.Writer, error) {
var prm PrmObjectPutInit

stream, err := x.client.ObjectPutInit(x.context, prm)
stream, err := x.client.ObjectPutInit(x.context, header, prm)
if err != nil {
return nil, fmt.Errorf("init object stream: %w", err)
}

if stream.WriteHeader(header) {
return &payloadWriter{
stream: stream,
}, nil
}

_, err = stream.Close()
if err != nil {
return nil, err
}

return nil, errors.New("unexpected error")
return &payloadWriter{
stream: stream,
}, nil
}

type payloadWriter struct {
Expand Down
30 changes: 28 additions & 2 deletions pool/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,49 @@ package pool

import (
"context"
"fmt"

"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/reputation"
"github.com/nspcc-dev/neofs-sdk-go/session"
)

type containerSessionParams interface {
IsSessionSet() bool
WithinSession(session.Object)
}

func (p *Pool) actualSigner(signer neofscrypto.Signer) neofscrypto.Signer {
if signer != nil {
return signer
}

return p.signer
}

// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.
//
// See details in [client.Client.ObjectPutInit].
func (p *Pool) ObjectPutInit(ctx context.Context, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) {
func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) {
c, err := p.sdkClient()
if err != nil {
return nil, err
}

return c.ObjectPutInit(ctx, prm)
cnr, isSet := hdr.ContainerID()
if !isSet {
return nil, errContainerRequired
}

if err = p.withinContainerSession(ctx, c, cnr, p.actualSigner(prm.Signer()), session.VerbObjectPut, &prm); err != nil {
return nil, fmt.Errorf("session: %w", err)
}

return c.ObjectPutInit(ctx, hdr, prm)
}

// ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol.
Expand Down
64 changes: 31 additions & 33 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,56 +593,54 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
}

start := time.Now()
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
wObj, err := cl.ObjectPutInit(ctx, prm.hdr, cliPrm)
c.incRequests(time.Since(start), methodObjectPut)
c.updateErrorRate(err)
if err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}

if wObj.WriteHeader(prm.hdr) {
sz := prm.hdr.PayloadSize()

if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
sz := prm.hdr.PayloadSize()

if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure?

if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}

buf := make([]byte, sz)
if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure?

var n int
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}

for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
buf := make([]byte, sz)

continue
}
var n int

if errors.Is(err, io.EOF) {
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}

c.updateErrorRate(err)
return oid.ID{}, fmt.Errorf("read payload: %w", err)
continue
}

if errors.Is(err, io.EOF) {
break
}

c.updateErrorRate(err)
return oid.ID{}, fmt.Errorf("read payload: %w", err)
}
}

Expand Down
94 changes: 87 additions & 7 deletions pool/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,98 @@ package pool

import (
"context"
"errors"
"fmt"
"math"

"github.com/google/uuid"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/session"
)

// SessionCreate opens a session with the node server on the remote endpoint.
//
// See details in [client.Client.SessionCreate].
func (p *Pool) SessionCreate(ctx context.Context, prm client.PrmSessionCreate) (*client.ResSessionCreate, error) {
c, err := p.sdkClient()
var (
errContainerRequired = errors.New("container required")
)

func initSession(ctx context.Context, dst *session.Object, c *client.Client, dur uint64, signer neofscrypto.Signer) error {
ni, err := c.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return nil, err
return err
}

epoch := ni.CurrentEpoch()

var exp uint64
if math.MaxUint64-epoch < dur {
exp = math.MaxUint64
} else {
exp = epoch + dur
}

var prm client.PrmSessionCreate
prm.SetExp(exp)
prm.UseSigner(signer)

res, err := c.SessionCreate(ctx, prm)
if err != nil {
return err
}

var id uuid.UUID
if err = id.UnmarshalBinary(res.ID()); err != nil {
return fmt.Errorf("invalid session token ID: %w", err)
}

var key neofsecdsa.PublicKey
if err = key.Decode(res.PublicKey()); err != nil {
return fmt.Errorf("invalid public session key: %w", err)
}

return c.SessionCreate(ctx, prm)
dst.SetID(id)
dst.SetAuthKey(&key)
dst.SetExp(exp)

return nil
}

func (p *Pool) withinContainerSession(
ctx context.Context,
c *client.Client,
containerID cid.ID,
signer neofscrypto.Signer,
verb session.ObjectVerb,
params containerSessionParams,
) error {
if params.IsSessionSet() {
return nil
}

cacheKey := formCacheKey(fmt.Sprintf("%p", c), signer)

tok, ok := p.cache.Get(cacheKey)
if !ok {
// init new session
err := initSession(ctx, &tok, c, p.stokenDuration, signer)
if err != nil {
return fmt.Errorf("init session: %w", err)
}

// cache the opened session
p.cache.Put(cacheKey, tok)
}

tok.ForVerb(verb)
tok.BindContainer(containerID)

// sign the token
if err := tok.Sign(signer); err != nil {
return fmt.Errorf("sign token: %w", err)
}

params.WithinSession(tok)

return nil
}

0 comments on commit e19e49a

Please sign in to comment.