Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point in time snapshots #1471

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions internal/pitsnapshot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Point in time Snapshots

A point in time snapshot refers to the copy of the existing data which is representative of
the data in the memory at that specific time.

## Goals
- Don't affect the throughput performance of the current request processing layer.
- Ability to take multiple snapshot instances simultaneously.
- Ability to snapshot and restore on systems with different shards
- Shouldn't depend on existing data files apart from the in-memory data structures

## Design

### Recap
DiceDB runs multiple `ShardThreads` based on the number of CPUs in the machine and there is
one `Store` object for every thread. Since DiceDB follows a shared nothing architecture, it
should be possible to snapshot and restore data with varying CPU counts.

The `Store` object keeps a reference to the `Object` object in a map. The `Object` object is
where the data is being stored.

### Implementing Copy on write
The snapshotting technique would be similar to the copy-on-write mechanism, ie, additional data
wouldn't have to be stored till the data has to be modified. This means additional memory would
only be required if there are changes to the underyling data.

### Impact on current latency benchmarks
- For reads, there should be minimal latency change since there are no references to the `get`
methods even when snapshotting is running. One thing which may impact the read latency is that
it has to iterate through all the keys, so an implicit lock inside the datastructure may be
required.
- For writes, if a snapshot is going on, then it has to write in 2 places and an additional read
to a map.

### Flow

The initiation flow:
```bash
ShardThread::CallSnapshotter -> Snapshotter::Start -> Store::StartSnapshot -> SnapshotMap::Buffer
-> PITFlusher::Flush
```

When the iteration is over
```bash
Store::StopSnapshot -> SnapshotMap::FlushAllData -> PITFlusher::FlushAllData -> Snapshotter::Close
```

### Changes for ShardThread and Store
The snapshot would start on every `ShardThread` and fetch the `Store` object. Every `Store` object
needs to implement the interface `SnapshotStore` which is contains the `StartSnapshot` and `StopSnapshot`
methods.
The `StartSnapshot` and `StopSnapshot` methods would be called on the store from the snapshotter object.

#### StartSnapshot
When the `StartSnapshot` method is called, the `Store` should keep note of the `SnapshotID` in a map.
There can be multiple instances of snapshots for every store as well.
For any read or write operation which is performed, the `Store` object should check if a snapshot is being
run at that instance. If no snapshot is being run, then continue as usual.
If a snapshot is being run, then for any subsequent write operation, store the previous data in the snapshot's
object, maybe a map. Let's call this the `SnapshotMap`. If there are multiple write operations to the same object
and the data already exists in the `SnapshotMap`, then skip doing anything for the snapshot.
Similarly, for reads, if a snapshot is being run, if the incoming request is from a snapshot layer, then check
if there is anything in the `SnapshotMap` for the key. If no, then return the current value from the `Store`.

It should fetch the list of keys in its store attribute and iterate through them.

#### StopSnapshot
When the iteration through all the keys by the `Store` object is done, the `StopSnapshot` method is called by the
`Store`. The `StopSnapshot` lets the `SnapshotMap` know that there are no more updates coming. The `SnapshotMap`
then talks to the `PITFLusher` to finish syncing all the chunks to disk and then closes the main snapshot
process.

### Point-in-time Flusher
The `PITFlusher` serializes the store updates from the `SnapshotMap` to binary format, currently `gob`.
It serializes and appends to a file.

## Test cases and benchmarks
- Snapshot data less than the buffer size without any subsequent writes
- Snapshot data less than the buffer size with localized subsequent writes
- Snapshot data less than the buffer size with spread out subsequent writes
- Snapshot data more than the buffer size without any subsequent writes
- Snapshot data more than the buffer size with localized subsequent writes
- Snapshot data more than the buffer size with spread out subsequent writes
- Ensure current `get` path is not affected
94 changes: 94 additions & 0 deletions internal/pitsnapshot/dummy_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package pitsnapshot

import (
"log"
"sync"

"github.com/dicedb/dice/internal/store"
)

type (
DummyStore struct {
store map[string]string
ongoingSnapshots map[uint64]store.Snapshotter
snapLock *sync.RWMutex
stLock *sync.RWMutex
}
)

func NewDummyStore() *DummyStore {
return &DummyStore{
store: make(map[string]string),
ongoingSnapshots: make(map[uint64]store.Snapshotter),
snapLock: &sync.RWMutex{},
stLock: &sync.RWMutex{},
}
}

func (store *DummyStore) Set(key, value string) {

Check failure on line 28 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

importShadow: shadow of imported from 'github.com/dicedb/dice/internal/store' package 'store' (gocritic)

Check failure on line 28 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

empty-lines: extra empty line at the start of a block (revive)

// Take a snapshot of the existing data since it's going to be overridden
store.snapLock.RLock()
for _, snapshot := range store.ongoingSnapshots {
snapshot.TempAdd(key, value)

Check failure on line 33 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `snapshot.TempAdd` is not checked (errcheck)
}
store.snapLock.RUnlock()

store.stLock.Lock()
defer store.stLock.Unlock()

store.store[key] = value
}

func (store *DummyStore) Get(key string) string {

Check failure on line 43 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

importShadow: shadow of imported from 'github.com/dicedb/dice/internal/store' package 'store' (gocritic)
store.stLock.RLock()
defer store.stLock.RUnlock()

return store.store[key]
}

func (store *DummyStore) StartSnapshot(snapshotID uint64, snapshot store.Snapshotter) (err error) {

Check failure on line 50 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

importShadow: shadow of imported from 'github.com/dicedb/dice/internal/store' package 'store' (gocritic)
log.Println("Starting snapshot for snapshotID", snapshotID)

store.snapLock.Lock()
store.ongoingSnapshots[snapshotID] = snapshot
store.snapLock.Unlock()

store.stLock.RLock()
keys := make([]string, 0, len(store.store))
for k := range store.store {
keys = append(keys, k)
}
store.stLock.RUnlock()

for _, k := range keys {

Check failure on line 64 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

empty-lines: extra empty line at the start of a block (revive)

store.stLock.RLock()
v := store.store[k]
store.stLock.RUnlock()

// Check if the data is overridden
tempVal, _ := snapshot.TempGet(k)
if tempVal != nil {
v = tempVal.(string)
}
err = snapshot.Store(k, v)
if err != nil {
log.Println("Error storing data in snapshot", "error", err)
}
}
store.StopSnapshot(snapshotID)

Check failure on line 80 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `store.StopSnapshot` is not checked (errcheck)
return

Check failure on line 81 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

naked return in func `StartSnapshot` with 32 lines of code (nakedret)
}

func (store *DummyStore) StopSnapshot(snapshotID uint64) (err error) {

Check failure on line 84 in internal/pitsnapshot/dummy_store.go

View workflow job for this annotation

GitHub Actions / lint

importShadow: shadow of imported from 'github.com/dicedb/dice/internal/store' package 'store' (gocritic)
store.snapLock.Lock()
if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent {
if err = snapshot.Close(); err != nil {
log.Println("Error closing snapshot", "error", err)
}
delete(store.ongoingSnapshots, snapshotID)
}
store.snapLock.Unlock()
return
}
128 changes: 128 additions & 0 deletions internal/pitsnapshot/flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package pitsnapshot

import (
"context"
"fmt"
"log"
"os"
)

const (
FlushFilePath = "/tmp/flusher-%d.ddb"
Delimiter = "\n"
)

type (
PITFlusher struct {
ctx context.Context
snapshotID uint64
updatesCh chan []StoreMapUpdate
exitCh chan bool
dlq [][]StoreMapUpdate

totalKeys uint64
flushFile *os.File
}
)

func NewPITFlusher(ctx context.Context, snapshotID uint64, exitCh chan bool) (pf *PITFlusher, err error) {
pf = &PITFlusher{
ctx: ctx,
snapshotID: snapshotID,
exitCh: exitCh,
updatesCh: make(chan []StoreMapUpdate, 10*BufferSize),
}
if err = pf.setup(); err != nil {
return
}
return
}

func (pf *PITFlusher) setup() (err error) {
filePath := fmt.Sprintf(FlushFilePath, pf.snapshotID)
if _, err = os.Stat(filePath); os.IsExist(err) {
return
}
if pf.flushFile, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644); err != nil {
return
}
return
}

func (pf *PITFlusher) Start() (err error) {
for {
select {
case <-pf.ctx.Done():
return
case updates := <-pf.updatesCh:
// TODO: Store the failed updates somewhere
if err = pf.Flush(updates); err != nil {
//log.Println("error in flushing updates, pushing to DLQ", err)

Check failure on line 60 in internal/pitsnapshot/flusher.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
pf.dlq = append(pf.dlq, updates)
continue
}
continue
}
}
return
}

func (pf *PITFlusher) Flush(updates []StoreMapUpdate) (err error) {
var (
serializedUpdates []byte
storeUpdate StoreMapUpdate
)
pf.totalKeys += uint64(len(updates))
if serializedUpdates, err = storeUpdate.Serialize(updates); err != nil {
return
}
serializedUpdates = append(serializedUpdates, Delimiter...)
if _, err = pf.flushFile.Write(serializedUpdates); err != nil {
return
}
return
}

func (pf *PITFlusher) clearDLQ() (err error) {
for {
select {
case updates := <-pf.updatesCh:
if err := pf.Flush(updates); err != nil {
log.Println("Error in flushing updates while draining", err)
pf.dlq = append(pf.dlq, updates)
}
default:
// Channel is empty
goto Done
}
}
Done:
totalKeysInDLQ := 0
for _, updates := range pf.dlq {
var (
update []StoreMapUpdate
)
update = updates
totalKeysInDLQ += len(update)
if err = pf.Flush(update); err != nil {
return
}
}
if totalKeysInDLQ > 0 {
log.Println("Total keys in DLQ", totalKeysInDLQ, len(pf.updatesCh))
}
return
}

// Close is called when the overlying SnapshotMap's channel is closed.
// After the SnapshotMap is closed, there is one final push to update all the pending
// data to the flusher
func (pf *PITFlusher) Close() (err error) {
if err = pf.clearDLQ(); err != nil {
log.Println("error in clearing DLQ", err)
}
//log.Println("Closing the flusher for snapshot", pf.snapshotID, ", total keys flushed", pf.totalKeys)
pf.flushFile.Close()
pf.exitCh <- true
return
}
88 changes: 88 additions & 0 deletions internal/pitsnapshot/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package pitsnapshot

import (
"context"
"log"
"time"

"github.com/dicedb/dice/internal/store"
)

const (
BufferSize = 1000
)

type (
SnapshotStore interface {
StartSnapshot(uint64, store.Snapshotter) error
StopSnapshot(uint64) error
}
PointInTimeSnapshot struct {
ctx context.Context
cancelFunc context.CancelFunc

ID uint64

store SnapshotStore
totalStoreShot uint8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we using this variable for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't required now, will remove it.


SnapshotMap *SnapshotMap

flusher *PITFlusher

StartedAt time.Time
EndedAt time.Time

exitCh chan bool
}
)

func NewPointInTimeSnapshot(ctx context.Context, store SnapshotStore) (pit *PointInTimeSnapshot, err error) {
pit = &PointInTimeSnapshot{
ctx: ctx,
ID: uint64(time.Now().Nanosecond()),
StartedAt: time.Now(),

store: store,
exitCh: make(chan bool, 5),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multiple store instances there'd be snapshot run triggered at one point of time, in that case why'd we need multiple exitCh chan.
From my understanding flow is like below:

  • Each Store has one snapshot
  • Each snapshot has one flusher
  • Each flusher sends exit signal after flush completed for all objects in that store

Also my assumption is each store will create it's own snapshotter and won't be shared among stores is that correct?
Please correct me if I'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the size of the exit channel can be 1 as well.
Yes, this is right. However, this PR will not cover the SnapshotManager that will spawn a snapshotter process for each of the stores.

Also my assumption is each store will create it's own snapshotter and won't be shared among stores is that correct?

}
pit.ctx, pit.cancelFunc = context.WithCancel(ctx)
if pit.flusher, err = NewPITFlusher(pit.ctx, pit.ID, pit.exitCh); err != nil {
return
}
if pit.SnapshotMap, err = NewSnapshotMap(pit.ctx, pit.flusher); err != nil {
return
}
return
}

func (pit *PointInTimeSnapshot) processStoreUpdates() (err error) {
for {
select {
case <-pit.ctx.Done():
pit.Close()
return
case <-pit.exitCh:
pit.Close()
return
}
}
return
Comment on lines +60 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shall we update this to?

func (pit *PointInTimeSnapshot) processStoreUpdates() error {
	select {
	case <-pit.ctx.Done():
	case <-pit.exitCh:
	}
	pit.Close()
	return nil
}

}

func (pit *PointInTimeSnapshot) Run() (err error) {
go pit.flusher.Start()
if err = pit.store.StartSnapshot(pit.ID, pit.SnapshotMap); err != nil {
return
}
go pit.processStoreUpdates()
return
}

func (pit *PointInTimeSnapshot) Close() (err error) {
pit.EndedAt = time.Now()
pit.cancelFunc()
log.Println("Closing snapshot", pit.ID, ". Total time taken",
pit.EndedAt.Sub(pit.StartedAt), "for total keys", pit.SnapshotMap.totalKeys)
return
}
Loading
Loading