Skip to content

Commit

Permalink
PASS project1
Browse files Browse the repository at this point in the history
  • Loading branch information
Metafora072 committed Jul 4, 2024
1 parent b223670 commit f36b0e8
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 8 deletions.
1 change: 0 additions & 1 deletion kv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"google.golang.org/grpc/keepalive"
)


var (
schedulerAddr = flag.String("scheduler", "", "scheduler address")
storeAddr = flag.String("addr", "", "store address")
Expand Down
84 changes: 80 additions & 4 deletions kv/server/raw_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,110 @@ package server
import (
"context"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"

"github.com/pingcap-incubator/tinykv/kv/storage"
)

// // Server is a TinyKV server, it 'faces outwards', sending and receiving messages from clients such as TinySQL.
// type Server struct {
// storage storage.Storage

// // (Used in 4B)
// Latches *latches.Latches

// // coprocessor API handler, out of course scope
// copHandler *coprocessor.CopHandler
// }

// The functions below are Server's Raw API. (implements TinyKvServer).
// Some helper methods can be found in sever.go in the current directory

// RawGet return the corresponding Get response based on RawGetRequest's CF and Key fields
func (server *Server) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) {
// Your Code Here (1).
return nil, nil
reader, err := server.storage.Reader(req.Context)
if err != nil {
return nil, err
}
value, err := reader.GetCF(req.Cf, req.Key)
if err != nil {
return nil, err
}
rawGetResponse := &kvrpcpb.RawGetResponse{
Value : value,
NotFound : (value == nil),
}
return rawGetResponse, nil
}

// RawPut puts the target data into storage and returns the corresponding response
func (server *Server) RawPut(_ context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) {
// Your Code Here (1).
// Hint: Consider using Storage.Modify to store data to be modified
return nil, nil
batch := storage.Modify {
Data : storage.Put{ Key: req.Key, Value: req.Value, Cf: req.Cf },
}
err := server.storage.Write(req.Context,[]storage.Modify{batch})
if err != nil {
return nil,err
}

rawPutResponse := &kvrpcpb.RawPutResponse{}
return rawPutResponse, nil
}

// RawDelete delete the target data from storage and returns the corresponding response
func (server *Server) RawDelete(_ context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
// Your Code Here (1).
// Hint: Consider using Storage.Modify to store data to be deleted
return nil, nil
batch := storage.Modify {
Data : storage.Delete{ Key: req.Key, Cf: req.Cf },
}
err := server.storage.Write(req.Context,[]storage.Modify{batch})
if err != nil {
return nil,err
}
rawDeleteResponse := &kvrpcpb.RawDeleteResponse{}
return rawDeleteResponse, nil
}

// RawScan scan the data starting from the start key up to limit. and return the corresponding result
func (server *Server) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
// Your Code Here (1).
// Hint: Consider using reader.IterCF
return nil, nil
reader, err := server.storage.Reader(req.Context)
if err != nil {
return nil,err
}


var kvs []*kvrpcpb.KvPair

it := reader.IterCF(req.Cf)
defer it.Close()

limit := req.Limit

for it.Seek(req.StartKey); it.Valid(); it.Next() {
item := it.Item()

value, _ := item.Value()

cur := &kvrpcpb.KvPair{
Key : item.Key(),
Value : value,
}

kvs = append(kvs,cur)
limit = limit - 1

if limit == 0 {
break
}
}

rawScanResponse := &kvrpcpb.RawScanResponse{
Kvs : kvs,
}
return rawScanResponse, nil
}
111 changes: 108 additions & 3 deletions kv/storage/standalone_storage/standalone_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,34 @@ import (
"github.com/pingcap-incubator/tinykv/kv/config"
"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"

"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/Connor1996/badger"
"log"
)

// StandAloneStorage is an implementation of `Storage` for a single-node TinyKV instance. It does not
// communicate with other nodes and all data is stored locally.
type StandAloneStorage struct {
db *badger.DB
// Your Data Here (1).
}


func NewStandAloneStorage(conf *config.Config) *StandAloneStorage {
// Your Code Here (1).
return nil
opts := badger.DefaultOptions
opts.Dir = conf.DBPath
opts.ValueDir = conf.DBPath

db,err := badger.Open(opts)
if err != nil {
log.Fatal(err)
}

return &StandAloneStorage{
db : db,
}
}

func (s *StandAloneStorage) Start() error {
Expand All @@ -24,15 +41,103 @@ func (s *StandAloneStorage) Start() error {

func (s *StandAloneStorage) Stop() error {
// Your Code Here (1).
return nil
return s.db.Close()
}

func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
// Your Code Here (1).
return nil, nil
txn := s.db.NewTransaction(false)
return &BadgerReader{txn}, nil
}

func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
// Your Code Here (1).
for _, modify := range batch {
key, value, cf := modify.Key(), modify.Value(), modify.Cf()
var err error
switch modify.Data.(type) {
case storage.Put:
// func PutCF(engine *badger.DB, cf string, key []byte, val []byte) error {
// return engine.Update(func(txn *badger.Txn) error {
// return txn.Set(KeyWithCF(cf, key), val)
// })
// }
err = engine_util.PutCF(s.db,cf,key,value)
case storage.Delete:
// func DeleteCF(engine *badger.DB, cf string, key []byte) error {
// return engine.Update(func(txn *badger.Txn) error {
// return txn.Delete(KeyWithCF(cf, key))
// })
// }
err = engine_util.DeleteCF(s.db,cf,key)
}
if err != nil {
return err
}
}
return nil
}

// BadgerReader <StorageReader>
type BadgerReader struct {
txn *badger.Txn
}


// type StorageReader interface {
// // When the key doesn't exist, return nil for the value
// GetCF(cf string, key []byte) ([]byte, error)
// IterCF(cf string) engine_util.DBIterator
// Close()
// }
func (br *BadgerReader) GetCF(cf string, key []byte) ([]byte, error) {
//func GetCFFromTxn(txn *badger.Txn, cf string, key []byte) (val []byte, err error)
value, err := engine_util.GetCFFromTxn(br.txn,cf,key)

if err == badger.ErrKeyNotFound {
return nil,nil
}
return value,err
}

// type BadgerIterator struct {
// iter *badger.Iterator
// prefix string
// }
// BadgerIterator <DBIterator>

// func NewCFIterator(cf string, txn *badger.Txn) *BadgerIterator {
// return &BadgerIterator{
// iter: txn.NewIterator(badger.DefaultIteratorOptions),
// prefix: cf + "_",
// }
// }

// type DBIterator interface {
// // Item returns pointer to the current key-value pair.
// Item() DBItem
// // Valid returns false when iteration is done.
// Valid() bool
// // Next would advance the iterator by one. Always check it.Valid() after a Next()
// // to ensure you have access to a valid it.Item().
// Next()
// // Seek would seek to the provided key if present. If absent, it would seek to the next smallest key
// // greater than provided.
// Seek([]byte)

// // Close the iterator
// Close()
// }

func (br *BadgerReader) IterCF(cf string) engine_util.DBIterator {
it := engine_util.NewCFIterator(cf,br.txn)
return it
}

func (br *BadgerReader) Close() {
br.txn.Discard()
}




0 comments on commit f36b0e8

Please sign in to comment.