Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postage batch store #922

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r), batch)
// create stamper
// get the postage batch data from batch store - is done under the hood by the following call
stamper := s.postage.GetStamper(batch)

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r), stamper)
address, err := builder.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
if err != nil {
logger.Debugf("bytes upload: split write all: %v", err)
Expand Down
64 changes: 64 additions & 0 deletions pkg/postage/batchstore/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package batchstore

import (
"encoding"
"encoding/binary"
"fmt"
"math/big"

"github.com/ethersphere/bee/pkg/storage"
)

var stateKey = "stateKey"

// state implements BinaryMarshaler interface
var _ encoding.BinaryMarshaler = (*state)(nil)

// state represents the current state of the reserve
type state struct {
block uint64 // the block number of the last postage event
total *big.Int // cumulative amount paid per stamp
price *big.Int // bzz/chunk/block normalised price
}

// MarshalBinary serialises the state to be used by the state store
func (st *state) MarshalBinary() ([]byte, error) {
buf := make([]byte, 9)
binary.BigEndian.PutUint64(buf, st.block)
totalBytes := st.total.Bytes()
if totalBytes > uint8.MaxValue {
return nil, fmt.Errorf("too many bytes?")
}
buf[8] = uint8(len(totalBytes))
buf = append(buf, totalBytes...)
return append(buf, st.price.Bytes()...), nil
}

// UnmarshalBinary deserialises the state to be used by the state store
func (st *state) UnmarshalBinary(buf []byte) error {
st.block = binary.BigEndian.Uint64(buf[:8])
totalLen := int(buf[8])
st.total = new(big.Int).SetBytes(buf[9 : 9+totalLen])
st.price = new(big.Int).SetBytes(buf[9+totalLen:])
return nil
}

// loads the state from statestore, initialises if not found
func (st *state) load(store storage.StateStorer) error {
err := store.Get(stateKey, st)
if err == storage.ErrNotFound {
st.total = big.NewInt(0)
st.price = big.NewInt(0)
return nil
}
// do we want to also persist here? ie. getOrCreate
return err
}

func (st *state) save(store storage.StateStorer) error {
return store.Put(stateKey, st)
}
11 changes: 11 additions & 0 deletions pkg/postage/batchstore/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package batchstore_test

import "testing"

func TestStateMarshalling(t *testing.T) {

}
126 changes: 126 additions & 0 deletions pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package batchstore

import (
"math/big"
"sync"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
)

var (
batchKeyPrefix = "batchKeyPrefix"
valueKeyPrefix = "valueKeyPrefix"
)

var _ postage.EventUpdater = (*Store)(nil)

// Store is a local store for postage batches
type Store struct {
store storage.StateStorer // state store backend to persist batches
mu sync.Mutex // mutex to lock statestore during atomic changes
state *state // the current state
logger logging.Logger
}

// New constructs a new postage batch store
func New(store storage.StateStorer, logger logging.Logger) (*Store, error) {
// initialise state from statestore or start with 0-s
st := &state{}
if err := st.load(store); err != nil {
return nil, err
}
s := &Store{
store: store,
logger: logger,
}
return s, nil
}

// settle retrieves the current state
// - sets the cumulative outpayment normalised, cno+=price*period
// - sets the new block number
// caller holds the store mutex
func (s *Store) settle(block uint64) {
period := int64(block - s.state.block)
s.state.block = block
s.state.total.Add(s.state.total, new(big.Int).Mul(s.state.price, big.NewInt(period)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reintroduces a lot of the reorg issues we wanted to get rid of. instead of doing this price update thing here we should refetch this value using the totalOutPayment getter when there is a PriceUpdate event on the oracle contract.

}

func (s *Store) get(id []byte) (*postage.Batch, error) {
b := &postage.Batch{}
err := s.store.Get(batchKey(id), b)
return b, err
}

func (s *Store) put(b *postage.Batch) error {
return s.store.Put(batchKey(b.ID), b)
}

func (s *Store) replace(id []byte, oldValue, newValue *big.Int) error {
err := s.store.Delete(valueKey(oldValue))
if err != nil {
return err
}
return s.store.Put(valueKey(newValue), id)
}

func (s *Store) Create(id []byte, owner []byte, value *big.Int, depth uint8) error {
b := &postage.Batch{
ID: id,
Start: s.state.block,
Owner: owner,
Depth: depth,
}

panic("@zelig - should we have b.Add(value)?")
value, err := s.balance(b, value)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value can cime durectly from the event]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need the relace call bothg in topup and increasedepth

if err != nil {
return err
}
return s.put(b)
}

func (s *Store) TopUp(id []byte, value *big.Int) error {
b, err := s.get(id)
if err != nil {
return err
}
value, err := s.balance(b, value)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value can cime durectly from the event]

if err != nil {
return err
}
return s.put(b)
}

func (s *Store) UpdateDepth(id []byte, depth uint8) error {
b, err := s.get(id)
if err != nil {
return err
}
b.Depth = depth
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the unit of balance? is this total amount paid into it, or per chunk (like the normalised balance). If the latter, changing the depth also changes the balance.

return s.put(b)
}

func (s *Store) UpdatePrice(price *big.Int) error {
s.state.price = price
return nil
}

// batchKey returns the index key for the batch ID used in the by-ID batch index
func batchKey(id []byte) string {
return batchKeyPrefix + string(id)
}

// valueKey returns the index key for the batch value used in the by-value (priority) batch index
func valueKey(v *big.Int) string {
key := make([]byte, 32)
value := v.Bytes()
copy(key[32-len(value):], value)
return valueKeyPrefix + string(key)
}
11 changes: 11 additions & 0 deletions pkg/postage/batchstore/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package batchstore_test

import "testing"

func TestStore(t *testing.T) {

}
41 changes: 41 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package postage

import (
"math/big"

"github.com/ethereum/go-ethereum/core/types"
)

// EventUpdater interface definitions reflect the updates triggered by events emitted by
// the postage contract on the blockchain
type EventUpdater interface {
Create(id []byte, owner []byte, amount *big.Int, depth uint8) error
TopUp(id []byte, amount *big.Int) error
UpdateDepth(id []byte, depth uint8) error
UpdatePrice(price *big.Int) error
}

// Event is the interface subsuming all postage contract blockchain events
//
// postage contract event | golang Event | Update call on EventUpdater
// ------------------------+---------------------------+---------------------------
// BatchCreated | batchCreatedEvent | Create
// BatchTopUp | batchTopUpEvent | TopUp
// BatchDepthIncrease | batchDepthIncreaseEvent | UpdateDepth
// PriceUpdate | priceUpdateEvent | UpdatePrice
type Event interface {
Update(s EventUpdater) error
}

// Events provides an iterator for postage events
type Events interface {
Each(from uint64, update func(block uint64, ev Event) error) func()
}

// Listener provides a blockchain event iterator
type Listener interface {
// - it starts at block from
// - it terminates with no error when quit channel is closed
// - if the update function returns an error, the call returns with that error
Listen(from uint64, quit chan struct{}, update func(types.Log) error) error
}