Skip to content

Commit

Permalink
fix(price): update routines concurrency logic (bittorrent#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
laocheng-cheng authored and N3frill committed Jan 13, 2022
1 parent 78dfb6b commit e6c42fe
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 37 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ cmd/btfs/fs-repo-migrations.bk
cmd/btfs/update-darwin-amd64

cmd/.DS_Store
cmd/btfs/s
cmd/btfs/s
cmd/btfs/btfs
16 changes: 12 additions & 4 deletions core/commands/storage/upload/upload/do_pay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ func payInCheque(rss *sessions.RenterSession) error {
return err
}

//this is old price's rate [Compatible with older versions]
rateObj, err := chain.SettleObject.OracleService.CurrentRate()
realAmount, err := getRealAmount(c.SignedGuardContract.Amount)
if err != nil {
return err
}
amount := c.SignedGuardContract.Amount
realAmount := big.NewInt(0).Mul(big.NewInt(amount), rateObj)

host := c.SignedGuardContract.HostPid
contractId := c.SignedGuardContract.ContractId
Expand All @@ -41,3 +38,14 @@ func payInCheque(rss *sessions.RenterSession) error {

return nil
}

func getRealAmount(amount int64) (*big.Int, error) {
//this is old price's rate [Compatible with older versions]
rateObj, err := chain.SettleObject.OracleService.CurrentRate()
if err != nil {
return nil, err
}

realAmount := big.NewInt(0).Mul(big.NewInt(amount), rateObj)
return realAmount, nil
}
24 changes: 20 additions & 4 deletions core/commands/storage/upload/upload/do_submit.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package upload

import (
"math/big"

"context"
"fmt"
"github.com/bittorrent/go-btfs/settlement/swap/vault"

"github.com/bittorrent/go-btfs/chain"
Expand Down Expand Up @@ -43,12 +43,28 @@ func doSubmit(rss *sessions.RenterSession) error {
return err
}

AvailableBalance, err := chain.SettleObject.VaultService.AvailableBalance(rss.Ctx)
err = checkAvailableBalance(rss.Ctx, amount)
if err != nil {
return err
}

return nil
}

func checkAvailableBalance(ctx context.Context, amount int64) error {
realAmount, err := getRealAmount(amount)
if err != nil {
return err
}

AvailableBalance, err := chain.SettleObject.VaultService.AvailableBalance(ctx)
if err != nil {
return err
}

if AvailableBalance.Cmp(big.NewInt(amount)) < 0 {
fmt.Printf("check, balance=%v, realAmount=%v \n", AvailableBalance, realAmount)
if AvailableBalance.Cmp(realAmount) < 0 {
fmt.Println("check, err: ", vault.ErrInsufficientFunds)
return vault.ErrInsufficientFunds
}
return nil
Expand Down
29 changes: 13 additions & 16 deletions core/commands/storage/upload/upload/recieve_cheque.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,23 @@ var StorageUploadChequeCmd = &cmds.Command{

encodedCheque := req.Arguments[0]
contractId := req.Arguments[2]

fmt.Printf("receive cheque, requestPid:%s contractId:%+v \n", requestPid.String(), contractId)

go func() {
// decode and deal the cheque
err = swapprotocol.SwapProtocol.Handler(context.Background(), requestPid.String(), encodedCheque, price)
if err != nil {
fmt.Println("swapprotocol.SwapProtocol.Handler, error:", err)
return
}
// decode and deal the cheque
err = swapprotocol.SwapProtocol.Handler(context.Background(), requestPid.String(), encodedCheque, price)
if err != nil {
fmt.Println("receive cheque, swapprotocol.SwapProtocol.Handler, error:", err)
return err
}

// if receive cheque of contractId, set shard paid status.
if len(contractId) > 0 {
err := setPaidStatus(ctxParams, contractId)
if err != nil {
fmt.Println("setPaidStatus: contractId error:", contractId, err)
return
}
// if receive cheque of contractId, set shard paid status.
if len(contractId) > 0 {
err := setPaidStatus(ctxParams, contractId)
if err != nil {
fmt.Println("receive cheque, setPaidStatus: contractId error:", contractId, err)
return err
}
}()
}

return nil
},
Expand Down
11 changes: 10 additions & 1 deletion core/commands/storage/upload/upload/upload_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ import (

func UploadShard(rss *sessions.RenterSession, hp helper.IHostsProvider, price int64, shardSize int64,
storageLength int,
offlineSigning bool, renterId peer.ID, fileSize int64, shardIndexes []int, rp *RepairParams) {
offlineSigning bool, renterId peer.ID, fileSize int64, shardIndexes []int, rp *RepairParams) error {

expectTotalPay := helper.TotalPay(shardSize, price, storageLength) * int64(len(rss.ShardHashes))
err := checkAvailableBalance(rss.Ctx, expectTotalPay)
if err != nil {
return err
}

for index, shardHash := range rss.ShardHashes {
go func(i int, h string) {
err := backoff.Retry(func() error {
Expand Down Expand Up @@ -144,4 +151,6 @@ func UploadShard(rss *sessions.RenterSession, hp helper.IHostsProvider, price in
}
}
}(rss, len(rss.ShardHashes))

return nil
}
6 changes: 3 additions & 3 deletions settlement/swap/swapprotocol/swapprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *Service) Handler(ctx context.Context, requestPid string, encodedCheque

// InitiateCheque attempts to send a cheque to a peer.
func (s *Service) EmitCheque(ctx context.Context, peer string, amount *big.Int, contractId string, issue IssueFunc) (balance *big.Int, err error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

sentAmount := amount
Expand All @@ -123,7 +123,7 @@ func (s *Service) EmitCheque(ctx context.Context, peer string, amount *big.Int,
log.Warnf("get handshakeInfo from peer %v (%v) error", peerhostPid)
return ErrGetBeneficiary
}
ctx, _ := context.WithTimeout(context.Background(), 20*time.Second)
ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
ctxParams, err := helper.ExtractContextParams(Req, Env)
if err != nil {
return err
Expand Down Expand Up @@ -199,7 +199,7 @@ func (s *Service) EmitCheque(ctx context.Context, peer string, amount *big.Int,
wg.Add(1)
go func() {
err = func() error {
ctx, _ := context.WithTimeout(context.Background(), 20*time.Second)
ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
ctxParams, err := helper.ExtractContextParams(Req, Env)
if err != nil {
return err
Expand Down
11 changes: 3 additions & 8 deletions settlement/swap/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ func lastIssuedChequeKey(beneficiary common.Address) string {
}

func (s *service) reserveTotalIssued(ctx context.Context, amount *big.Int) (*big.Int, error) {
s.lock.Lock()
defer s.lock.Unlock()

availableBalance, err := s.AvailableBalance(ctx)
if err != nil {
return nil, err
Expand All @@ -218,8 +215,6 @@ func (s *service) reserveTotalIssued(ctx context.Context, amount *big.Int) (*big
}

func (s *service) unreserveTotalIssued(amount *big.Int) {
s.lock.Lock()
defer s.lock.Unlock()
s.totalIssuedReserved = s.totalIssuedReserved.Sub(s.totalIssuedReserved, amount)
}

Expand All @@ -228,6 +223,9 @@ func (s *service) unreserveTotalIssued(amount *big.Int) {
// The available balance which is available after sending the cheque is passed
// to the caller for it to be communicated over metrics.
func (s *service) Issue(ctx context.Context, beneficiary common.Address, amount *big.Int, sendChequeFunc SendChequeFunc) (*big.Int, error) {
s.lock.Lock()
defer s.lock.Unlock()

availableBalance, err := s.reserveTotalIssued(ctx, amount)
if err != nil {
return nil, err
Expand Down Expand Up @@ -278,9 +276,6 @@ func (s *service) Issue(ctx context.Context, beneficiary common.Address, amount
return nil, err
}

s.lock.Lock()
defer s.lock.Unlock()

// store the history issued cheque
err = s.chequeStore.StoreSendChequeRecord(s.address, beneficiary, amount)
if err != nil {
Expand Down

0 comments on commit e6c42fe

Please sign in to comment.