diff --git a/cmd/commands.go b/cmd/commands.go new file mode 100644 index 00000000..1c2bfcae --- /dev/null +++ b/cmd/commands.go @@ -0,0 +1,21 @@ +package main + +import "github.com/spf13/cobra" + +var ( + rootCmd = &cobra.Command{ + Use: "silo", + Short: "silo storage engine.", + Long: ``, + SilenceErrors: true, + SilenceUsage: true, + } +) + +func init() { + // rootCmd.PersistentFlags().StringVarP(&Input, "input", "i", "", "Input file name") +} + +func Execute() error { + return rootCmd.Execute() +} diff --git a/cmd/connect.go b/cmd/connect.go new file mode 100644 index 00000000..f547281e --- /dev/null +++ b/cmd/connect.go @@ -0,0 +1,125 @@ +package main + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/loopholelabs/silo/internal/expose" + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/protocol" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" +) + +var ( + cmdConnect = &cobra.Command{ + Use: "connect", + Short: "Start up connect", + Long: ``, + Run: runConnect, + } +) + +var connect_addr string +var connect_dev string +var connect_size int + +func init() { + rootCmd.AddCommand(cmdConnect) + cmdConnect.Flags().StringVarP(&connect_addr, "addr", "a", "localhost:5170", "Address to serve from") + cmdConnect.Flags().StringVarP(&connect_dev, "dev", "d", "", "Device eg nbd1") + cmdConnect.Flags().IntVarP(&connect_size, "size", "s", 1024*1024*10, "Size") +} + +func runConnect(ccmd *cobra.Command, args []string) { + fmt.Printf("Starting silo connect %s at %s size %d\n", connect_dev, connect_addr, connect_size) + + // Setup some statistics output + http.Handle("/metrics", promhttp.Handler()) + go http.ListenAndServe(":4114", nil) + + block_size := 4096 + + var p storage.ExposedStorage + + cr := func(s int) storage.StorageProvider { + return sources.NewMemoryStorage(s) + } + // Setup some sharded memory storage (for concurrent write speed) + destStorage := modules.NewShardedStorage(connect_size, connect_size/1024, cr) + // Wrap it in metrics + destWaiting := modules.NewWaitingCache(destStorage, block_size) + destStorageMetrics := modules.NewMetrics(destWaiting) + + con, err := net.Dial("tcp", connect_addr) + if err != nil { + panic("Error connecting") + } + + pro := protocol.NewProtocolRW(context.TODO(), con, con) + dest := modules.NewFromProtocol(777, destStorageMetrics, pro) + + go pro.Handle() + + go dest.HandleSend(context.TODO()) + go dest.HandleReadAt() + go dest.HandleWriteAt() + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + + if connect_dev != "" { + fmt.Printf("\nShutting down cleanly...\n") + shutdown(connect_dev, p) + } + destStorageMetrics.ShowStats("Source") + os.Exit(1) + }() + + if connect_dev != "" { + d := expose.NewDispatch() + p, err = setup(connect_dev, d, destStorageMetrics, false) + if err != nil { + fmt.Printf("Error during setup %v\n", err) + return + } + fmt.Printf("Ready on %s...\n", connect_dev) + } + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + // Show some stats... + destStorageMetrics.ShowStats("Dest") + + s := destStorageMetrics.Snapshot() + prom_read_ops.Set(float64(s.Read_ops)) + prom_read_bytes.Set(float64(s.Read_bytes)) + prom_read_time.Set(float64(s.Read_time)) + prom_read_errors.Set(float64(s.Read_errors)) + + prom_write_ops.Set(float64(s.Write_ops)) + prom_write_bytes.Set(float64(s.Write_bytes)) + prom_write_time.Set(float64(s.Write_time)) + prom_write_errors.Set(float64(s.Write_errors)) + + prom_flush_ops.Set(float64(s.Flush_ops)) + prom_flush_time.Set(float64(s.Flush_time)) + prom_flush_errors.Set(float64(s.Flush_errors)) + + } + } + +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 00000000..e435a4f4 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,10 @@ +package main + +import "fmt" + +func main() { + err := Execute() + if err != nil && err.Error() != "" { + fmt.Println(err) + } +} diff --git a/cmd/serve.go b/cmd/serve.go new file mode 100644 index 00000000..1871cac7 --- /dev/null +++ b/cmd/serve.go @@ -0,0 +1,254 @@ +package main + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "os/exec" + "os/signal" + "syscall" + "time" + + "github.com/loopholelabs/silo/internal/expose" + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/blocks" + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/protocol" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" +) + +var ( + cmdServe = &cobra.Command{ + Use: "serve", + Short: "Start up serve", + Long: ``, + Run: runServe, + } +) + +var serve_addr string +var serve_dev string +var serve_size int + +func init() { + rootCmd.AddCommand(cmdServe) + cmdServe.Flags().StringVarP(&serve_addr, "addr", "a", ":5170", "Address to serve from") + cmdServe.Flags().StringVarP(&serve_dev, "dev", "d", "", "Device eg nbd1") + cmdServe.Flags().IntVarP(&serve_size, "size", "s", 1024*1024*10, "Size") +} + +var ( + prom_read_ops = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_read_ops", Help: "silo_read_ops"}) + prom_read_bytes = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_read_bytes", Help: "silo_read_bytes"}) + prom_read_time = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_read_time", Help: "silo_read_time"}) + prom_read_errors = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_read_errors", Help: "silo_read_errors"}) + + prom_write_ops = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_write_ops", Help: "silo_write_ops"}) + prom_write_bytes = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_write_bytes", Help: "silo_write_bytes"}) + prom_write_time = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_write_time", Help: "silo_write_time"}) + prom_write_errors = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_write_errors", Help: "silo_write_errors"}) + + prom_flush_ops = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_flush_ops", Help: "silo_flush_ops"}) + prom_flush_time = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_flush_time", Help: "silo_flush_time"}) + prom_flush_errors = promauto.NewGauge(prometheus.GaugeOpts{Name: "silo_flush_errors", Help: "silo_flush_errors"}) +) + +func runServe(ccmd *cobra.Command, args []string) { + fmt.Printf("Starting silo serve %s at %s size %d\n", serve_dev, serve_addr, serve_size) + + // Setup some statistics output + http.Handle("/metrics", promhttp.Handler()) + go http.ListenAndServe(":2112", nil) + + block_size := 4096 + num_blocks := (serve_size + block_size - 1) / block_size + + var p storage.ExposedStorage + + cr := func(s int) storage.StorageProvider { + return sources.NewMemoryStorage(s) + } + // Setup some sharded memory storage (for concurrent write speed) + source := modules.NewShardedStorage(serve_size, serve_size/1024, cr) + // Wrap it in metrics + + sourceMetrics := modules.NewMetrics(source) + sourceDirty := modules.NewFilterReadDirtyTracker(sourceMetrics, block_size) + sourceMonitor := modules.NewVolatilityMonitor(sourceDirty, block_size, 10*time.Second) + sourceStorage := modules.NewLockable(sourceMonitor) + + // Start monitoring blocks. + orderer := blocks.NewPriorityBlockOrder(num_blocks, sourceMonitor) + + for i := 0; i < num_blocks; i++ { + orderer.Add(i) + } + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + + if serve_dev != "" { + fmt.Printf("\nShutting down cleanly...\n") + shutdown(serve_dev, p) + } + sourceMetrics.ShowStats("Source") + os.Exit(1) + }() + + if serve_dev != "" { + var err error + d := expose.NewDispatch() + p, err = setup(serve_dev, d, sourceStorage, true) + if err != nil { + fmt.Printf("Error during setup %v\n", err) + return + } + fmt.Printf("Ready...\n") + } + + // TODO: Setup listener here. When client connects, migrate to it. + + l, err := net.Listen("tcp", serve_addr) + if err != nil { + if serve_dev != "" { + fmt.Printf("\nShutting down cleanly...\n") + shutdown(serve_dev, p) + } + panic("Listener issue...") + } + + go func() { + fmt.Printf("Waiting for connection...\n") + c, err := l.Accept() + if err == nil { + fmt.Printf("GOT CONNECTION\n") + // Now we can migrate to the client... + + locker := func() { + // This could be used to pause VM/consumer etc... + sourceStorage.Lock() + } + unlocker := func() { + // Restart consumer + sourceStorage.Unlock() + } + + pro := protocol.NewProtocolRW(context.TODO(), c, c) + dest := modules.NewToProtocol(uint64(serve_size), 777, pro) + + go pro.Handle() + + mig := storage.NewMigrator(sourceDirty, + dest, + block_size, + locker, + unlocker, + orderer) + + // Now do the migration... + err = mig.Migrate() + fmt.Printf("MIGRATION DONE %v\n", err) + } + }() + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + // Show some stats... + sourceMetrics.ShowStats("Source") + fmt.Printf("Volatility %d\n", sourceMonitor.GetTotalVolatility()) + + s := sourceMetrics.Snapshot() + prom_read_ops.Set(float64(s.Read_ops)) + prom_read_bytes.Set(float64(s.Read_bytes)) + prom_read_time.Set(float64(s.Read_time)) + prom_read_errors.Set(float64(s.Read_errors)) + + prom_write_ops.Set(float64(s.Write_ops)) + prom_write_bytes.Set(float64(s.Write_bytes)) + prom_write_time.Set(float64(s.Write_time)) + prom_write_errors.Set(float64(s.Write_errors)) + + prom_flush_ops.Set(float64(s.Flush_ops)) + prom_flush_time.Set(float64(s.Flush_time)) + prom_flush_errors.Set(float64(s.Flush_errors)) + + } + } +} + +/** + * Setup a disk + * + */ +func setup(device string, dispatch expose.NBDDispatcher, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) { + p, err := expose.NewNBD(dispatch, fmt.Sprintf("/dev/%s", device)) + if err != nil { + return nil, err + } + + go func() { + err := p.Handle(prov) + if err != nil { + fmt.Printf("p.Handle returned %v\n", err) + } + }() + + p.WaitReady() + + err = os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600) + if err != nil { + return nil, fmt.Errorf("Error mkdir %v", err) + } + + if server { + cmd := exec.Command("mkfs.ext4", fmt.Sprintf("/dev/%s", device)) + err = cmd.Run() + if err != nil { + return nil, fmt.Errorf("Error mkfs.ext4 %v", err) + } + + cmd = exec.Command("mount", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device)) + err = cmd.Run() + if err != nil { + return nil, fmt.Errorf("Error mount %v", err) + } + } else { + cmd := exec.Command("mount", "-r", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device)) + err = cmd.Run() + if err != nil { + return nil, fmt.Errorf("Error mount %v", err) + } + } + + return p, nil +} + +func shutdown(device string, p storage.ExposedStorage) error { + fmt.Printf("shutdown %s\n", device) + cmd := exec.Command("umount", fmt.Sprintf("/dev/%s", device)) + err := cmd.Run() + if err != nil { + return err + } + err = os.Remove(fmt.Sprintf("/mnt/mount%s", device)) + if err != nil { + return err + } + + err = p.Shutdown() + if err != nil { + return err + } + return nil +} diff --git a/go.mod b/go.mod index 01bfff2b..80184e21 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,19 @@ go 1.20 require github.com/stretchr/testify v1.8.4 require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.18.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/spf13/cobra v1.8.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/sys v0.15.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fa4b6e68..7e8dc71a 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,39 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/storage/modules/from_protocol.go b/pkg/storage/modules/from_protocol.go index 5d689e40..88bed543 100644 --- a/pkg/storage/modules/from_protocol.go +++ b/pkg/storage/modules/from_protocol.go @@ -1,21 +1,45 @@ package modules import ( + "context" + "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/protocol" ) +type sendData struct { + id uint32 + data []byte +} + type FromProtocol struct { - dev uint32 - prov storage.StorageProvider - protocol protocol.Protocol + dev uint32 + prov storage.StorageProvider + protocol protocol.Protocol + send_queue chan sendData } func NewFromProtocol(dev uint32, prov storage.StorageProvider, protocol protocol.Protocol) *FromProtocol { return &FromProtocol{ - dev: dev, - prov: prov, - protocol: protocol, + dev: dev, + prov: prov, + protocol: protocol, + send_queue: make(chan sendData), + } +} + +// Send packets out +func (fp *FromProtocol) HandleSend(ctx context.Context) error { + for { + select { + case s := <-fp.send_queue: + _, err := fp.protocol.SendPacket(fp.dev, s.id, s.data) + if err != nil { + return err + } + case <-ctx.Done(): + return nil + } } } @@ -26,25 +50,25 @@ func (fp *FromProtocol) HandleReadAt() error { if err != nil { return err } + offset, length, err := protocol.DecodeReadAt(data) + if err != nil { + return err + } // Handle them in goroutines - go func(gdata []byte, gid uint32) { - offset, length, err := protocol.DecodeReadAt(gdata) - if err != nil { - // TODO: Report this to somewhere - } - buff := make([]byte, length) - n, err := fp.prov.ReadAt(buff, offset) + go func(goffset int64, glength int32, gid uint32) { + buff := make([]byte, glength) + n, err := fp.prov.ReadAt(buff, goffset) rar := &protocol.ReadAtResponse{ Bytes: n, Error: err, Data: buff, } - _, err = fp.protocol.SendPacket(fp.dev, gid, protocol.EncodeReadAtResponse(rar)) - if err != nil { - // TODO: Report this to somewhere + fp.send_queue <- sendData{ + id: gid, + data: protocol.EncodeReadAtResponse(rar), } - }(data, id) + }(offset, length, id) } } @@ -56,21 +80,22 @@ func (fp *FromProtocol) HandleWriteAt() error { return err } + offset, write_data, err := protocol.DecodeWriteAt(data) + if err != nil { + return err + } + // Handle in a goroutine - go func(gdata []byte, gid uint32) { - offset, data, err := protocol.DecodeWriteAt(gdata) - if err != nil { - // TODO - } - n, err := fp.prov.WriteAt(data, offset) + go func(goffset int64, gdata []byte, gid uint32) { + n, err := fp.prov.WriteAt(gdata, goffset) war := &protocol.WriteAtResponse{ Bytes: n, Error: err, } - _, err = fp.protocol.SendPacket(fp.dev, gid, protocol.EncodeWriteAtResponse(war)) - if err != nil { - // TODO + fp.send_queue <- sendData{ + id: gid, + data: protocol.EncodeWriteAtResponse(war), } - }(data, id) + }(offset, write_data, id) } } diff --git a/pkg/storage/modules/metrics.go b/pkg/storage/modules/metrics.go index 5f2f7d78..0946805e 100644 --- a/pkg/storage/modules/metrics.go +++ b/pkg/storage/modules/metrics.go @@ -27,6 +27,20 @@ type Metrics struct { metric_flush_errors uint64 } +type MetricsSnapshot struct { + Read_ops uint64 + Read_bytes uint64 + Read_time uint64 + Read_errors uint64 + Write_ops uint64 + Write_bytes uint64 + Write_time uint64 + Write_errors uint64 + Flush_ops uint64 + Flush_time uint64 + Flush_errors uint64 +} + func NewMetrics(prov storage.StorageProvider) *Metrics { return &Metrics{ prov: prov, @@ -72,7 +86,22 @@ func (i *Metrics) ShowStats(prefix string) { flush_avg_time, atomic.LoadUint64(&i.metric_flush_errors), ) +} +func (i *Metrics) Snapshot() *MetricsSnapshot { + return &MetricsSnapshot{ + Read_ops: atomic.LoadUint64(&i.metric_read_ops), + Read_bytes: atomic.LoadUint64(&i.metric_read_bytes), + Read_time: atomic.LoadUint64(&i.metric_read_time), + Read_errors: atomic.LoadUint64(&i.metric_read_errors), + Write_ops: atomic.LoadUint64(&i.metric_write_ops), + Write_bytes: atomic.LoadUint64(&i.metric_write_bytes), + Write_time: atomic.LoadUint64(&i.metric_write_time), + Write_errors: atomic.LoadUint64(&i.metric_write_errors), + Flush_ops: atomic.LoadUint64(&i.metric_flush_ops), + Flush_time: atomic.LoadUint64(&i.metric_flush_time), + Flush_errors: atomic.LoadUint64(&i.metric_flush_errors), + } } func (i *Metrics) ResetMetrics() { diff --git a/pkg/storage/modules/volatility_monitor.go b/pkg/storage/modules/volatility_monitor.go index 7f583ada..fffe4a3b 100644 --- a/pkg/storage/modules/volatility_monitor.go +++ b/pkg/storage/modules/volatility_monitor.go @@ -20,6 +20,7 @@ func (bd *blockData) Add(expiry time.Duration) { n := time.Now().UnixNano() // Either add it on, or replace an expired one... + // TODO: Should probably periodically do a cleanup for i := 0; i < len(bd.log); i++ { if bd.log[i] < n-int64(expiry) { bd.log[i] = n @@ -53,6 +54,7 @@ type VolatilityMonitor struct { block_data map[uint]*blockData block_data_lock sync.Mutex available util.Bitfield + total_data *blockData } func NewVolatilityMonitor(prov storage.StorageProvider, block_size int, expiry time.Duration) *VolatilityMonitor { @@ -65,6 +67,7 @@ func NewVolatilityMonitor(prov storage.StorageProvider, block_size int, expiry t block_data: make(map[uint]*blockData), available: *util.NewBitfield(num_blocks), expiry: expiry, + total_data: &blockData{log: make([]int64, 0)}, } } @@ -136,6 +139,16 @@ func (i *VolatilityMonitor) GetVolatility(block int) int { return 0 } +/** + * Get a reading for a specific block + * + */ +func (i *VolatilityMonitor) GetTotalVolatility() int { + i.block_data_lock.Lock() + defer i.block_data_lock.Unlock() + return i.total_data.Count(i.expiry) +} + func (i *VolatilityMonitor) ReadAt(buffer []byte, offset int64) (int, error) { return i.prov.ReadAt(buffer, offset) } @@ -162,6 +175,8 @@ func (i *VolatilityMonitor) WriteAt(buffer []byte, offset int64) (int, error) { bd.Add(i.expiry) i.block_data_lock.Unlock() } + // Always measure this... + i.total_data.Add(i.expiry) // Add to the total volatility counter } return n, err diff --git a/pkg/storage/protocol/protocol_rw.go b/pkg/storage/protocol/protocol_rw.go index 4dd50454..f267d0f3 100644 --- a/pkg/storage/protocol/protocol_rw.go +++ b/pkg/storage/protocol/protocol_rw.go @@ -1,22 +1,27 @@ package protocol import ( + "context" "encoding/binary" + "fmt" "io" "sync" "sync/atomic" ) type ProtocolRW struct { + ctx context.Context r io.Reader w io.Writer + w_lock sync.Mutex tx_id uint32 waiters map[uint32]Waiters waiters_lock sync.Mutex } -func NewProtocolRW(r io.Reader, w io.Writer) *ProtocolRW { +func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer) *ProtocolRW { return &ProtocolRW{ + ctx: ctx, r: r, w: w, waiters: make(map[uint32]Waiters), @@ -34,7 +39,11 @@ func (p *ProtocolRW) SendPacket(dev uint32, id uint32, data []byte) (uint32, err binary.LittleEndian.PutUint32(buffer, dev) binary.LittleEndian.PutUint32(buffer[4:], id) binary.LittleEndian.PutUint32(buffer[8:], uint32(len(data))) + p.w_lock.Lock() + defer p.w_lock.Unlock() + _, err := p.w.Write(buffer) + if err != nil { return 0, err } @@ -45,15 +54,19 @@ func (p *ProtocolRW) SendPacket(dev uint32, id uint32, data []byte) (uint32, err // Read a packet func (p *ProtocolRW) readPacket() (uint32, uint32, []byte, error) { buffer := make([]byte, 4+4+4) - _, err := p.r.Read(buffer) + + _, err := io.ReadFull(p.r, buffer) if err != nil { return 0, 0, nil, err } dev := binary.LittleEndian.Uint32(buffer) id := binary.LittleEndian.Uint32(buffer[4:]) length := binary.LittleEndian.Uint32(buffer[8:]) + data := make([]byte, length) - _, err = p.r.Read(data) + + _, err = io.ReadFull(p.r, data) + if err != nil { return 0, 0, nil, err } @@ -66,7 +79,9 @@ func (p *ProtocolRW) Handle() error { if err != nil { return err } - // Now queue it up... + // Now queue it up in a channel + + fmt.Printf("DEV %d ID %d DATA %d\n", dev, id, len(data)) cmd := data[0] @@ -94,10 +109,6 @@ func (p *ProtocolRW) Handle() error { p.waiters_lock.Unlock() - // Send it to any listeners - // If this matches something being waited for, route it there. - // TODO: Don't always do this, expire, etc etc - if IsResponse(cmd) { wq_id <- packetinfo{ id: id, @@ -129,12 +140,13 @@ func (mp *ProtocolRW) WaitForPacket(dev uint32, id uint32) ([]byte, error) { } mp.waiters_lock.Unlock() - // Wait for the packet to appear on the channel - p := <-wq - - // TODO: Could remove the channel now idk... we'll see... - - return p.data, nil + select { + case p := <-wq: + // TODO: Remove the channel, as we only expect a SINGLE response with this ID. + return p.data, nil + case <-mp.ctx.Done(): + return nil, mp.ctx.Err() + } } func (mp *ProtocolRW) WaitForCommand(dev uint32, cmd byte) (uint32, []byte, error) { @@ -154,7 +166,11 @@ func (mp *ProtocolRW) WaitForCommand(dev uint32, cmd byte) (uint32, []byte, erro } mp.waiters_lock.Unlock() - // Wait for the packet to appear on the channel - p := <-wq - return p.id, p.data, nil + select { + case p := <-wq: + // TODO: Remove the channel, as we only expect a SINGLE response with this ID. + return p.id, p.data, nil + case <-mp.ctx.Done(): + return 0, nil, mp.ctx.Err() + } } diff --git a/testing/protocol_test.go b/testing/protocol_test.go index b7a14bf3..89d15acf 100644 --- a/testing/protocol_test.go +++ b/testing/protocol_test.go @@ -1,6 +1,7 @@ package testing import ( + "context" "crypto/rand" "io" "testing" @@ -26,6 +27,7 @@ func TestProtocolWriteAt(t *testing.T) { // Now do some things and make sure they happen... // TODO: Shutdown... + go destFromProtocol.HandleSend(context.TODO()) go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt() @@ -67,6 +69,7 @@ func TestProtocolReadAt(t *testing.T) { // Now do some things and make sure they happen... // TODO: Shutdown... + go destFromProtocol.HandleSend(context.TODO()) go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt() @@ -89,8 +92,8 @@ func TestProtocolRWWriteAt(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - prSource := protocol.NewProtocolRW(r1, w2) - prDest := protocol.NewProtocolRW(r2, w1) + prSource := protocol.NewProtocolRW(context.TODO(), r1, w2) + prDest := protocol.NewProtocolRW(context.TODO(), r2, w1) sourceToProtocol := modules.NewToProtocol(uint64(size), 1, prSource) @@ -103,6 +106,7 @@ func TestProtocolRWWriteAt(t *testing.T) { // Now do some things and make sure they happen... // TODO: Shutdown... + go destFromProtocol.HandleSend(context.TODO()) go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt() @@ -139,8 +143,8 @@ func TestProtocolRWReadAt(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - prSource := protocol.NewProtocolRW(r1, w2) - prDest := protocol.NewProtocolRW(r2, w1) + prSource := protocol.NewProtocolRW(context.TODO(), r1, w2) + prDest := protocol.NewProtocolRW(context.TODO(), r2, w1) sourceToProtocol := modules.NewToProtocol(uint64(size), 1, prSource) @@ -153,6 +157,7 @@ func TestProtocolRWReadAt(t *testing.T) { // Now do some things and make sure they happen... // TODO: Shutdown... + go destFromProtocol.HandleSend(context.TODO()) go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt()