Skip to content

Commit

Permalink
Allow reads during snapshot (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar authored Dec 25, 2021
1 parent b772938 commit e00421d
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 78 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ players := NewCollection(column.Options{
// Read the changes from the channel
go func(){
for commit := writer{
println("commit", commit.Type.String())
println("commit", commit.ID)
}
}()

Expand Down
19 changes: 0 additions & 19 deletions commit/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,25 +236,6 @@ func (r *Reader) Range(buf *Buffer, chunk Chunk, fn func(*Reader)) {
}
}

// MaxOffset returns the maximum offset for a chunk
func (r *Reader) MaxOffset(buf *Buffer, chunk Chunk) (max uint32) {
if buf == nil {
return
}

r.Range(buf, chunk, func(r *Reader) {
for r.Next() {
if max < r.Index() {
max = r.Index()
}
}
})

// Rewind after this, so we can re-use the reader after
r.Rewind()
return
}

// --------------------------- Next Iterator ----------------------------

// Next reads the current operation and returns false if there is no more
Expand Down
26 changes: 0 additions & 26 deletions commit/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,32 +234,6 @@ func TestReadFloatMixedSize(t *testing.T) {
})
}

func TestReaderMax(t *testing.T) {
buf := NewBuffer(0)
buf.Reset("test")
for i := uint32(0); i < 20000; i++ {
buf.PutUint64(Put, i, 2*uint64(i))
}

r := NewReader()
assert.Equal(t, 16383, int(r.MaxOffset(buf, 0)))
assert.Equal(t, 19999, int(r.MaxOffset(buf, 1)))
assert.Equal(t, 0, int(r.MaxOffset(nil, 0)))
}

func TestRewindAfterMax(t *testing.T) {
buf := NewBuffer(0)
buf.Reset("test")
for i := uint32(0); i < 10; i++ {
buf.PutUint64(Put, i, 2*uint64(i))
}

r := NewReader()
assert.Equal(t, 9, int(r.MaxOffset(buf, 0)))
assert.True(t, r.Next())
assert.Equal(t, 0, int(r.Index()))
}

func TestReadSize(t *testing.T) {
buf := NewBuffer(0)
buf.Reset("test")
Expand Down
6 changes: 2 additions & 4 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,11 @@ func (c *Collection) writeState(dst io.Writer) (int64, error) {

// Write each chunk
if err := writer.WriteRange(chunks, func(i int, w *iostream.Writer) error {
chunk := commit.Chunk(i)
return c.writeAtChunk(chunk, func(chunk commit.Chunk, fill bitmap.Bitmap) error {
return c.readChunk(commit.Chunk(i), func(lastCommit uint64, chunk commit.Chunk, fill bitmap.Bitmap) error {
offset := chunk.Min()

// Write the last written commit for this chunk
commitID := c.commits[chunk]
if err := writer.WriteUvarint(uint64(commitID)); err != nil {
if err := writer.WriteUvarint(lastCommit); err != nil {
return err
}

Expand Down
26 changes: 10 additions & 16 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,39 +430,38 @@ func (txn *Txn) commit() {

// Grow the size of the fill list
markers, changedRows := txn.findMarkers()
txn.commitCapacity()
if last, ok := txn.dirty.Max(); ok {
txn.commitCapacity(commit.Chunk(last))
}

// Commit chunk by chunk to reduce lock contentions
txn.rangeWrite(func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error {
txn.rangeWrite(func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) {
if changedRows {
txn.commitMarkers(chunk, fill, markers)
}

// Attemp to update, if nothing was changed we're done
updated := txn.commitUpdates(chunk)
if !changedRows && !updated {
return nil
return
}

// If there is a pending snapshot, append commit into a temp log
if dst, ok := txn.owner.isSnapshotting(); ok {
if err := dst.Append(commit.Commit{
dst.Append(commit.Commit{
ID: commitID,
Chunk: chunk,
Updates: txn.updates,
}); err != nil {
return err
}
})
}

if txn.logger != nil {
return txn.logger.Append(commit.Commit{
txn.logger.Append(commit.Commit{
ID: commitID,
Chunk: chunk,
Updates: txn.updates,
})
}
return nil
})
}

Expand Down Expand Up @@ -532,12 +531,7 @@ func (txn *Txn) findMarkers() (*commit.Buffer, bool) {
}

// commitCapacity grows all columns until they reach the max index
func (txn *Txn) commitCapacity() {
last, ok := txn.dirty.Max()
if !ok { // Empty
return
}

func (txn *Txn) commitCapacity(last commit.Chunk) {
txn.owner.lock.Lock()
defer txn.owner.lock.Unlock()
if len(txn.owner.commits) >= int(last+1) {
Expand All @@ -550,7 +544,7 @@ func (txn *Txn) commitCapacity() {
}

// Grow the fill list and all of the owner's columns
max := commit.Chunk(last).Max()
max := last.Max()
txn.owner.fill.Grow(max)
txn.owner.cols.Range(func(column *column) {
column.Grow(max)
Expand Down
24 changes: 12 additions & 12 deletions txn_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,38 @@ func (txn *Txn) rangeReadPair(column *column, f func(a, b bitmap.Bitmap)) {

// rangeWrite ranges over the dirty chunks and acquires exclusive latches along
// the way. This is used to commit a transaction.
func (txn *Txn) rangeWrite(fn func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error) {
func (txn *Txn) rangeWrite(fn func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap)) {
lock := txn.owner.slock
txn.dirty.Range(func(x uint32) {
chunk := commit.Chunk(x)
commitID := commit.Next()
lock.Lock(uint(chunk))

// Compute the fill and set the last commit ID
txn.owner.lock.Lock()
txn.owner.lock.RLock()
fill := chunk.OfBitmap(txn.owner.fill)
txn.owner.commits[chunk] = commitID
txn.owner.lock.Unlock()
txn.owner.commits[chunk] = commitID // OK, since we have a shard lock
txn.owner.lock.RUnlock()

// Call the delegate
fn(commitID, chunk, fill)
lock.Unlock(uint(chunk))
})
}

// writeAtChunk acquires appropriate locks for a chunk and executes a
// write callback
func (c *Collection) writeAtChunk(chunk commit.Chunk, fn func(chunk commit.Chunk, fill bitmap.Bitmap) error) (err error) {
// readChunk acquires appropriate locks for a chunk and executes a read callback
func (c *Collection) readChunk(chunk commit.Chunk, fn func(uint64, commit.Chunk, bitmap.Bitmap) error) (err error) {
lock := c.slock
lock.Lock(uint(chunk))
lock.RLock(uint(chunk))

// Compute the fill
c.lock.Lock()
c.lock.RLock()
fill := chunk.OfBitmap(c.fill)
c.lock.Unlock()
commitID := c.commits[chunk]
c.lock.RUnlock()

// Call the delegate
err = fn(chunk, fill)
lock.Unlock(uint(chunk))
err = fn(commitID, chunk, fill)
lock.RUnlock(uint(chunk))
return
}

0 comments on commit e00421d

Please sign in to comment.