Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reads during snapshot #45

Merged
merged 1 commit into from
Dec 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ players := NewCollection(column.Options{
// Read the changes from the channel
go func(){
for commit := writer{
println("commit", commit.Type.String())
println("commit", commit.ID)
}
}()

Expand Down
19 changes: 0 additions & 19 deletions commit/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,25 +236,6 @@ func (r *Reader) Range(buf *Buffer, chunk Chunk, fn func(*Reader)) {
}
}

// MaxOffset returns the maximum offset for a chunk
func (r *Reader) MaxOffset(buf *Buffer, chunk Chunk) (max uint32) {
if buf == nil {
return
}

r.Range(buf, chunk, func(r *Reader) {
for r.Next() {
if max < r.Index() {
max = r.Index()
}
}
})

// Rewind after this, so we can re-use the reader after
r.Rewind()
return
}

// --------------------------- Next Iterator ----------------------------

// Next reads the current operation and returns false if there is no more
Expand Down
26 changes: 0 additions & 26 deletions commit/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,32 +234,6 @@ func TestReadFloatMixedSize(t *testing.T) {
})
}

func TestReaderMax(t *testing.T) {
buf := NewBuffer(0)
buf.Reset("test")
for i := uint32(0); i < 20000; i++ {
buf.PutUint64(Put, i, 2*uint64(i))
}

r := NewReader()
assert.Equal(t, 16383, int(r.MaxOffset(buf, 0)))
assert.Equal(t, 19999, int(r.MaxOffset(buf, 1)))
assert.Equal(t, 0, int(r.MaxOffset(nil, 0)))
}

func TestRewindAfterMax(t *testing.T) {
buf := NewBuffer(0)
buf.Reset("test")
for i := uint32(0); i < 10; i++ {
buf.PutUint64(Put, i, 2*uint64(i))
}

r := NewReader()
assert.Equal(t, 9, int(r.MaxOffset(buf, 0)))
assert.True(t, r.Next())
assert.Equal(t, 0, int(r.Index()))
}

func TestReadSize(t *testing.T) {
buf := NewBuffer(0)
buf.Reset("test")
Expand Down
6 changes: 2 additions & 4 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,11 @@ func (c *Collection) writeState(dst io.Writer) (int64, error) {

// Write each chunk
if err := writer.WriteRange(chunks, func(i int, w *iostream.Writer) error {
chunk := commit.Chunk(i)
return c.writeAtChunk(chunk, func(chunk commit.Chunk, fill bitmap.Bitmap) error {
return c.readChunk(commit.Chunk(i), func(lastCommit uint64, chunk commit.Chunk, fill bitmap.Bitmap) error {
offset := chunk.Min()

// Write the last written commit for this chunk
commitID := c.commits[chunk]
if err := writer.WriteUvarint(uint64(commitID)); err != nil {
if err := writer.WriteUvarint(lastCommit); err != nil {
return err
}

Expand Down
26 changes: 10 additions & 16 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,39 +430,38 @@ func (txn *Txn) commit() {

// Grow the size of the fill list
markers, changedRows := txn.findMarkers()
txn.commitCapacity()
if last, ok := txn.dirty.Max(); ok {
txn.commitCapacity(commit.Chunk(last))
}

// Commit chunk by chunk to reduce lock contentions
txn.rangeWrite(func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error {
txn.rangeWrite(func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) {
if changedRows {
txn.commitMarkers(chunk, fill, markers)
}

// Attemp to update, if nothing was changed we're done
updated := txn.commitUpdates(chunk)
if !changedRows && !updated {
return nil
return
}

// If there is a pending snapshot, append commit into a temp log
if dst, ok := txn.owner.isSnapshotting(); ok {
if err := dst.Append(commit.Commit{
dst.Append(commit.Commit{
ID: commitID,
Chunk: chunk,
Updates: txn.updates,
}); err != nil {
return err
}
})
}

if txn.logger != nil {
return txn.logger.Append(commit.Commit{
txn.logger.Append(commit.Commit{
ID: commitID,
Chunk: chunk,
Updates: txn.updates,
})
}
return nil
})
}

Expand Down Expand Up @@ -532,12 +531,7 @@ func (txn *Txn) findMarkers() (*commit.Buffer, bool) {
}

// commitCapacity grows all columns until they reach the max index
func (txn *Txn) commitCapacity() {
last, ok := txn.dirty.Max()
if !ok { // Empty
return
}

func (txn *Txn) commitCapacity(last commit.Chunk) {
txn.owner.lock.Lock()
defer txn.owner.lock.Unlock()
if len(txn.owner.commits) >= int(last+1) {
Expand All @@ -550,7 +544,7 @@ func (txn *Txn) commitCapacity() {
}

// Grow the fill list and all of the owner's columns
max := commit.Chunk(last).Max()
max := last.Max()
txn.owner.fill.Grow(max)
txn.owner.cols.Range(func(column *column) {
column.Grow(max)
Expand Down
24 changes: 12 additions & 12 deletions txn_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,38 @@ func (txn *Txn) rangeReadPair(column *column, f func(a, b bitmap.Bitmap)) {

// rangeWrite ranges over the dirty chunks and acquires exclusive latches along
// the way. This is used to commit a transaction.
func (txn *Txn) rangeWrite(fn func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error) {
func (txn *Txn) rangeWrite(fn func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap)) {
lock := txn.owner.slock
txn.dirty.Range(func(x uint32) {
chunk := commit.Chunk(x)
commitID := commit.Next()
lock.Lock(uint(chunk))

// Compute the fill and set the last commit ID
txn.owner.lock.Lock()
txn.owner.lock.RLock()
fill := chunk.OfBitmap(txn.owner.fill)
txn.owner.commits[chunk] = commitID
txn.owner.lock.Unlock()
txn.owner.commits[chunk] = commitID // OK, since we have a shard lock
txn.owner.lock.RUnlock()

// Call the delegate
fn(commitID, chunk, fill)
lock.Unlock(uint(chunk))
})
}

// writeAtChunk acquires appropriate locks for a chunk and executes a
// write callback
func (c *Collection) writeAtChunk(chunk commit.Chunk, fn func(chunk commit.Chunk, fill bitmap.Bitmap) error) (err error) {
// readChunk acquires appropriate locks for a chunk and executes a read callback
func (c *Collection) readChunk(chunk commit.Chunk, fn func(uint64, commit.Chunk, bitmap.Bitmap) error) (err error) {
lock := c.slock
lock.Lock(uint(chunk))
lock.RLock(uint(chunk))

// Compute the fill
c.lock.Lock()
c.lock.RLock()
fill := chunk.OfBitmap(c.fill)
c.lock.Unlock()
commitID := c.commits[chunk]
c.lock.RUnlock()

// Call the delegate
err = fn(chunk, fill)
lock.Unlock(uint(chunk))
err = fn(commitID, chunk, fill)
lock.RUnlock(uint(chunk))
return
}