-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point in time snapshots #1471
base: master
Are you sure you want to change the base?
Point in time snapshots #1471
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
# Point in time Snapshots | ||
|
||
A point in time snapshot refers to the copy of the existing data which is representative of | ||
the data in the memory at that specific time. | ||
|
||
## Goals | ||
- Don't affect the throughput performance of the current request processing layer. | ||
- Ability to take multiple snapshot instances simultaneously. | ||
- Ability to snapshot and restore on systems with different shards | ||
- Shouldn't depend on existing data files apart from the in-memory data structures | ||
|
||
## Design | ||
|
||
### Recap | ||
DiceDB runs multiple `ShardThreads` based on the number of CPUs in the machine and there is | ||
one `Store` object for every thread. Since DiceDB follows a shared nothing architecture, it | ||
should be possible to snapshot and restore data with varying CPU counts. | ||
|
||
The `Store` object keeps a reference to the `Object` object in a map. The `Object` object is | ||
where the data is being stored. | ||
|
||
### Implementing Copy on write | ||
The snapshotting technique would be similar to the copy-on-write mechanism, ie, additional data | ||
wouldn't have to be stored till the data has to be modified. This means additional memory would | ||
only be required if there are changes to the underyling data. | ||
|
||
### Impact on current latency benchmarks | ||
- For reads, there should be minimal latency change since there are no references to the `get` | ||
methods even when snapshotting is running. One thing which may impact the read latency is that | ||
it has to iterate through all the keys, so an implicit lock inside the datastructure may be | ||
required. | ||
- For writes, if a snapshot is going on, then it has to write in 2 places and an additional read | ||
to a map. | ||
|
||
### Flow | ||
|
||
The initiation flow: | ||
```bash | ||
ShardThread::CallSnapshotter -> Snapshotter::Start -> Store::StartSnapshot -> SnapshotMap::Buffer | ||
-> PITFlusher::Flush | ||
``` | ||
|
||
When the iteration is over | ||
```bash | ||
Store::StopSnapshot -> SnapshotMap::FlushAllData -> PITFlusher::FlushAllData -> Snapshotter::Close | ||
``` | ||
|
||
### Changes for ShardThread and Store | ||
The snapshot would start on every `ShardThread` and fetch the `Store` object. Every `Store` object | ||
needs to implement the interface `SnapshotStore` which is contains the `StartSnapshot` and `StopSnapshot` | ||
methods. | ||
The `StartSnapshot` and `StopSnapshot` methods would be called on the store from the snapshotter object. | ||
|
||
#### StartSnapshot | ||
When the `StartSnapshot` method is called, the `Store` should keep note of the `SnapshotID` in a map. | ||
There can be multiple instances of snapshots for every store as well. | ||
For any read or write operation which is performed, the `Store` object should check if a snapshot is being | ||
run at that instance. If no snapshot is being run, then continue as usual. | ||
If a snapshot is being run, then for any subsequent write operation, store the previous data in the snapshot's | ||
object, maybe a map. Let's call this the `SnapshotMap`. If there are multiple write operations to the same object | ||
and the data already exists in the `SnapshotMap`, then skip doing anything for the snapshot. | ||
Similarly, for reads, if a snapshot is being run, if the incoming request is from a snapshot layer, then check | ||
if there is anything in the `SnapshotMap` for the key. If no, then return the current value from the `Store`. | ||
|
||
It should fetch the list of keys in its store attribute and iterate through them. | ||
|
||
#### StopSnapshot | ||
When the iteration through all the keys by the `Store` object is done, the `StopSnapshot` method is called by the | ||
`Store`. The `StopSnapshot` lets the `SnapshotMap` know that there are no more updates coming. The `SnapshotMap` | ||
then talks to the `PITFLusher` to finish syncing all the chunks to disk and then closes the main snapshot | ||
process. | ||
|
||
### Point-in-time Flusher | ||
The `PITFlusher` serializes the store updates from the `SnapshotMap` to binary format, currently `gob`. | ||
It serializes and appends to a file. | ||
|
||
## Test cases and benchmarks | ||
- Snapshot data less than the buffer size without any subsequent writes | ||
- Snapshot data less than the buffer size with localized subsequent writes | ||
- Snapshot data less than the buffer size with spread out subsequent writes | ||
- Snapshot data more than the buffer size without any subsequent writes | ||
- Snapshot data more than the buffer size with localized subsequent writes | ||
- Snapshot data more than the buffer size with spread out subsequent writes | ||
- Ensure current `get` path is not affected |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package pitsnapshot | ||
|
||
import ( | ||
"log" | ||
"sync" | ||
|
||
"github.com/dicedb/dice/internal/store" | ||
) | ||
|
||
type ( | ||
DummyStore struct { | ||
store map[string]string | ||
ongoingSnapshots map[uint64]store.Snapshotter | ||
snapLock *sync.RWMutex | ||
stLock *sync.RWMutex | ||
} | ||
) | ||
|
||
func NewDummyStore() *DummyStore { | ||
return &DummyStore{ | ||
store: make(map[string]string), | ||
ongoingSnapshots: make(map[uint64]store.Snapshotter), | ||
snapLock: &sync.RWMutex{}, | ||
stLock: &sync.RWMutex{}, | ||
} | ||
} | ||
|
||
func (store *DummyStore) Set(key, value string) { | ||
Check failure on line 28 in internal/pitsnapshot/dummy_store.go
|
||
|
||
// Take a snapshot of the existing data since it's going to be overridden | ||
store.snapLock.RLock() | ||
for _, snapshot := range store.ongoingSnapshots { | ||
snapshot.TempAdd(key, value) | ||
} | ||
store.snapLock.RUnlock() | ||
|
||
store.stLock.Lock() | ||
defer store.stLock.Unlock() | ||
|
||
store.store[key] = value | ||
} | ||
|
||
func (store *DummyStore) Get(key string) string { | ||
store.stLock.RLock() | ||
defer store.stLock.RUnlock() | ||
|
||
return store.store[key] | ||
} | ||
|
||
func (store *DummyStore) StartSnapshot(snapshotID uint64, snapshot store.Snapshotter) (err error) { | ||
log.Println("Starting snapshot for snapshotID", snapshotID) | ||
|
||
store.snapLock.Lock() | ||
store.ongoingSnapshots[snapshotID] = snapshot | ||
store.snapLock.Unlock() | ||
|
||
store.stLock.RLock() | ||
keys := make([]string, 0, len(store.store)) | ||
for k := range store.store { | ||
keys = append(keys, k) | ||
} | ||
store.stLock.RUnlock() | ||
|
||
for _, k := range keys { | ||
|
||
store.stLock.RLock() | ||
v := store.store[k] | ||
store.stLock.RUnlock() | ||
|
||
// Check if the data is overridden | ||
tempVal, _ := snapshot.TempGet(k) | ||
if tempVal != nil { | ||
v = tempVal.(string) | ||
} | ||
err = snapshot.Store(k, v) | ||
if err != nil { | ||
log.Println("Error storing data in snapshot", "error", err) | ||
} | ||
} | ||
store.StopSnapshot(snapshotID) | ||
return | ||
} | ||
|
||
func (store *DummyStore) StopSnapshot(snapshotID uint64) (err error) { | ||
store.snapLock.Lock() | ||
if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent { | ||
if err = snapshot.Close(); err != nil { | ||
log.Println("Error closing snapshot", "error", err) | ||
} | ||
delete(store.ongoingSnapshots, snapshotID) | ||
} | ||
store.snapLock.Unlock() | ||
return | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package pitsnapshot | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"os" | ||
) | ||
|
||
const ( | ||
FlushFilePath = "/tmp/flusher-%d.ddb" | ||
Delimiter = "\n" | ||
) | ||
|
||
type ( | ||
PITFlusher struct { | ||
ctx context.Context | ||
snapshotID uint64 | ||
updatesCh chan []StoreMapUpdate | ||
exitCh chan bool | ||
dlq [][]StoreMapUpdate | ||
|
||
totalKeys uint64 | ||
flushFile *os.File | ||
} | ||
) | ||
|
||
func NewPITFlusher(ctx context.Context, snapshotID uint64, exitCh chan bool) (pf *PITFlusher, err error) { | ||
pf = &PITFlusher{ | ||
ctx: ctx, | ||
snapshotID: snapshotID, | ||
exitCh: exitCh, | ||
updatesCh: make(chan []StoreMapUpdate, 10*BufferSize), | ||
} | ||
if err = pf.setup(); err != nil { | ||
return | ||
} | ||
return | ||
} | ||
|
||
func (pf *PITFlusher) setup() (err error) { | ||
filePath := fmt.Sprintf(FlushFilePath, pf.snapshotID) | ||
if _, err = os.Stat(filePath); os.IsExist(err) { | ||
return | ||
} | ||
if pf.flushFile, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644); err != nil { | ||
return | ||
} | ||
return | ||
} | ||
|
||
func (pf *PITFlusher) Start() (err error) { | ||
for { | ||
select { | ||
case <-pf.ctx.Done(): | ||
return | ||
case updates := <-pf.updatesCh: | ||
// TODO: Store the failed updates somewhere | ||
if err = pf.Flush(updates); err != nil { | ||
//log.Println("error in flushing updates, pushing to DLQ", err) | ||
pf.dlq = append(pf.dlq, updates) | ||
continue | ||
} | ||
continue | ||
} | ||
} | ||
return | ||
} | ||
|
||
func (pf *PITFlusher) Flush(updates []StoreMapUpdate) (err error) { | ||
var ( | ||
serializedUpdates []byte | ||
storeUpdate StoreMapUpdate | ||
) | ||
pf.totalKeys += uint64(len(updates)) | ||
if serializedUpdates, err = storeUpdate.Serialize(updates); err != nil { | ||
return | ||
} | ||
serializedUpdates = append(serializedUpdates, Delimiter...) | ||
if _, err = pf.flushFile.Write(serializedUpdates); err != nil { | ||
return | ||
} | ||
return | ||
} | ||
|
||
func (pf *PITFlusher) clearDLQ() (err error) { | ||
for { | ||
select { | ||
case updates := <-pf.updatesCh: | ||
if err := pf.Flush(updates); err != nil { | ||
log.Println("Error in flushing updates while draining", err) | ||
pf.dlq = append(pf.dlq, updates) | ||
} | ||
default: | ||
// Channel is empty | ||
goto Done | ||
} | ||
} | ||
Done: | ||
totalKeysInDLQ := 0 | ||
for _, updates := range pf.dlq { | ||
var ( | ||
update []StoreMapUpdate | ||
) | ||
update = updates | ||
totalKeysInDLQ += len(update) | ||
if err = pf.Flush(update); err != nil { | ||
return | ||
} | ||
} | ||
if totalKeysInDLQ > 0 { | ||
log.Println("Total keys in DLQ", totalKeysInDLQ, len(pf.updatesCh)) | ||
} | ||
return | ||
} | ||
|
||
// Close is called when the overlying SnapshotMap's channel is closed. | ||
// After the SnapshotMap is closed, there is one final push to update all the pending | ||
// data to the flusher | ||
func (pf *PITFlusher) Close() (err error) { | ||
if err = pf.clearDLQ(); err != nil { | ||
log.Println("error in clearing DLQ", err) | ||
} | ||
//log.Println("Closing the flusher for snapshot", pf.snapshotID, ", total keys flushed", pf.totalKeys) | ||
pf.flushFile.Close() | ||
pf.exitCh <- true | ||
return | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package pitsnapshot | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"time" | ||
|
||
"github.com/dicedb/dice/internal/store" | ||
) | ||
|
||
const ( | ||
BufferSize = 1000 | ||
) | ||
|
||
type ( | ||
SnapshotStore interface { | ||
StartSnapshot(uint64, store.Snapshotter) error | ||
StopSnapshot(uint64) error | ||
} | ||
PointInTimeSnapshot struct { | ||
ctx context.Context | ||
cancelFunc context.CancelFunc | ||
|
||
ID uint64 | ||
|
||
store SnapshotStore | ||
totalStoreShot uint8 | ||
|
||
SnapshotMap *SnapshotMap | ||
|
||
flusher *PITFlusher | ||
|
||
StartedAt time.Time | ||
EndedAt time.Time | ||
|
||
exitCh chan bool | ||
} | ||
) | ||
|
||
func NewPointInTimeSnapshot(ctx context.Context, store SnapshotStore) (pit *PointInTimeSnapshot, err error) { | ||
pit = &PointInTimeSnapshot{ | ||
ctx: ctx, | ||
ID: uint64(time.Now().Nanosecond()), | ||
StartedAt: time.Now(), | ||
|
||
store: store, | ||
exitCh: make(chan bool, 5), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For multiple store instances there'd be snapshot run triggered at one point of time, in that case why'd we need multiple
Also my assumption is each store will create it's own snapshotter and won't be shared among stores is that correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the size of the exit channel can be 1 as well.
|
||
} | ||
pit.ctx, pit.cancelFunc = context.WithCancel(ctx) | ||
if pit.flusher, err = NewPITFlusher(pit.ctx, pit.ID, pit.exitCh); err != nil { | ||
return | ||
} | ||
if pit.SnapshotMap, err = NewSnapshotMap(pit.ctx, pit.flusher); err != nil { | ||
return | ||
} | ||
return | ||
} | ||
|
||
func (pit *PointInTimeSnapshot) processStoreUpdates() (err error) { | ||
for { | ||
select { | ||
case <-pit.ctx.Done(): | ||
pit.Close() | ||
return | ||
case <-pit.exitCh: | ||
pit.Close() | ||
return | ||
} | ||
} | ||
return | ||
Comment on lines
+60
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Shall we update this to?
|
||
} | ||
|
||
func (pit *PointInTimeSnapshot) Run() (err error) { | ||
go pit.flusher.Start() | ||
if err = pit.store.StartSnapshot(pit.ID, pit.SnapshotMap); err != nil { | ||
return | ||
} | ||
go pit.processStoreUpdates() | ||
return | ||
} | ||
|
||
func (pit *PointInTimeSnapshot) Close() (err error) { | ||
pit.EndedAt = time.Now() | ||
pit.cancelFunc() | ||
log.Println("Closing snapshot", pit.ID, ". Total time taken", | ||
pit.EndedAt.Sub(pit.StartedAt), "for total keys", pit.SnapshotMap.totalKeys) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we using this variable for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this isn't required now, will remove it.