Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon/index: Add ability to disable bits in index. #4601

Merged
merged 5 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions exp/lighthorizon/index/types/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func (i *BitmapIndex) SetActive(index uint32) error {
return i.setActive(index)
}

func (i *BitmapIndex) SetInactive(index uint32) error {
i.mutex.Lock()
defer i.mutex.Unlock()
return i.setInactive(index)
}

// bitShiftLeft returns a byte with the bit set corresponding to the index. In
// other words, it flips the bit corresponding to the index's "position" mod-8.
func bitShiftLeft(index uint32) byte {
if index%8 == 0 {
return 1
Expand All @@ -55,10 +63,18 @@ func bitShiftLeft(index uint32) byte {
}
}

// rangeFirstBit returns the index of the first *possible* active bit in the
// bitmap. In other words, if you just have SetActive(12), this will return 9,
// because you have one byte (0b0001_0000) and the *first* value the bitmap can
// represent is 9.
func (i *BitmapIndex) rangeFirstBit() uint32 {
return (i.firstBit-1)/8*8 + 1
}

// rangeLastBit returns the index of the last *possible* active bit in the
// bitmap. In other words, if you just have SetActive(12), this will return 16,
// because you have one byte (0b0001_0000) and the *last* value the bitmap can
// represent is 16.
func (i *BitmapIndex) rangeLastBit() uint32 {
return i.rangeFirstBit() + uint32(len(i.bitmap))*8 - 1
}
Expand Down Expand Up @@ -113,6 +129,83 @@ func (i *BitmapIndex) setActive(index uint32) error {
return nil
}

func (i *BitmapIndex) setInactive(index uint32) error {
// Is this index even active in the first place?
if i.firstBit == 0 || index < i.rangeFirstBit() || index > i.rangeLastBit() {
return nil // not really an error
}

loc := (index - i.rangeFirstBit()) / 8 // which byte?
b := bitShiftLeft(index) // which bit w/in the byte?
i.bitmap[loc] &= ^b // unset only that bit

// If unsetting this bit made the first byte empty OR we unset the earliest
// set bit, we need to find the next "first" active bit.
if loc == 0 && i.firstBit == index {
// find the next active bit to set as the start
nextBit, err := i.nextActiveBit(index)
if err == io.EOF {
i.firstBit = 0
i.lastBit = 0
i.bitmap = []byte{}
} else if err != nil {
return err
} else {
i.firstBit = nextBit
}

for i.bitmap[0] == 0 {
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
i.bitmap = i.bitmap[1:] // trim first (now-empty) byte off
}
} else if int(loc) == len(i.bitmap)-1 {
if i.bitmap[loc] == 0 {
i.bitmap = i.bitmap[:loc-1] // trim last (now-empty) byte

// find the latest non-empty byte, to set as the new "end"
for j := len(i.bitmap) - 1; j >= 0; j-- {
if i.bitmap[j] == 0 {
continue
}

offset, _ := maxBitAfter(i.bitmap[j], 0)
byteOffset := uint32(len(i.bitmap)-1) * 8
i.lastBit = i.rangeFirstBit() + byteOffset + offset
i.bitmap = i.bitmap[:j+1]
break
}
} else {
// Otherwise, do we need to adjust the range?
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
var idx uint32 = 1
whichBit := index % 8
if whichBit != 0 {
idx = 8 - whichBit
}

_, ok := maxBitAfter(i.bitmap[loc], idx+1)

// Imagine we had 0b0011_0100 and we unset the last active bit.
// ^
// Then, we need to adjust our internal lastBit tracker to represent
// the ^ bit above. This means finding the first previous set bit.
if !ok { // no further bits are set
j := int(idx)
for ; j >= 0 && !ok; j-- {
_, ok = maxBitAfter(i.bitmap[loc], uint32(j))
}

// We know from the earlier conditional that *some* bit is set,
// so we know that j represents the index of the bit that's the
// new "last active" bit.
firstByte := i.rangeFirstBit()
byteOffset := uint32(len(i.bitmap)-1) * 8
i.lastBit = firstByte + byteOffset + uint32(j+1)
}
}
}

return nil
}

//lint:ignore U1000 Ignore unused function temporarily
func (i *BitmapIndex) isActive(index uint32) bool {
if index >= i.firstBit && index <= i.lastBit {
Expand Down
64 changes: 64 additions & 0 deletions exp/lighthorizon/index/types/bitmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,70 @@ func TestSetActive(t *testing.T) {
assert.Equal(t, uint32(26), index.lastBit)
}

// TestSetInactive ensures that you can flip active bits off and the bitmap
// compresses in size accordingly.
func TestSetInactive(t *testing.T) {
index := &BitmapIndex{}
index.SetActive(17)
index.SetActive(17 + 9)
index.SetActive(17 + 9 + 10)
assert.Equal(t, []byte{0b1000_0000, 0b0100_0000, 0b0001_0000}, index.bitmap)

// disabling bits should work
index.SetInactive(17)
assert.False(t, index.isActive(17))

// it should trim off the first byte now
assert.Equal(t, []byte{0b0100_0000, 0b0001_0000}, index.bitmap)
assert.EqualValues(t, 17+9, index.firstBit)
assert.EqualValues(t, 17+9+10, index.lastBit)

// it should compress empty bytes on shrink
index = &BitmapIndex{}
index.SetActive(1)
index.SetActive(1 + 2)
index.SetActive(1 + 9)
index.SetActive(1 + 9 + 8 + 9)
assert.Equal(t, []byte{0b1010_0000, 0b0100_0000, 0b0000_0000, 0b0010_0000}, index.bitmap)

// ...from the left
index.SetInactive(1)
assert.Equal(t, []byte{0b0010_0000, 0b0100_0000, 0b0000_0000, 0b0010_0000}, index.bitmap)
index.SetInactive(3)
assert.Equal(t, []byte{0b0100_0000, 0b0000_0000, 0b0010_0000}, index.bitmap)
assert.EqualValues(t, 1+9, index.firstBit)
assert.EqualValues(t, 1+9+8+9, index.lastBit)

// ...and the right
index.SetInactive(1 + 9 + 8 + 9)
assert.Equal(t, []byte{0b0100_0000}, index.bitmap)
assert.EqualValues(t, 1+9, index.firstBit)
assert.EqualValues(t, 1+9, index.lastBit)

// ensure right-hand compression it works for multiple bytes, too
index = &BitmapIndex{}
index.SetActive(2)
index.SetActive(2 + 2)
index.SetActive(2 + 9)
index.SetActive(2 + 9 + 8 + 9)
index.SetActive(2 + 9 + 8 + 10)
assert.Equal(t, []byte{0b0101_0000, 0b0010_0000, 0b0000_0000, 0b0001_1000}, index.bitmap)

index.setInactive(2 + 9 + 8 + 10)
assert.Equal(t, []byte{0b0101_0000, 0b0010_0000, 0b0000_0000, 0b0001_0000}, index.bitmap)
assert.EqualValues(t, 2+9+8+9, index.lastBit)

index.setInactive(2 + 9 + 8 + 9)
assert.Equal(t, []byte{0b0101_0000, 0b0010_0000}, index.bitmap)
assert.EqualValues(t, 2, index.firstBit)
assert.EqualValues(t, 2+9, index.lastBit)

index.setInactive(2 + 2)
assert.Equal(t, []byte{0b0100_0000, 0b0010_0000}, index.bitmap)
assert.EqualValues(t, 2, index.firstBit)
assert.EqualValues(t, 2+9, index.lastBit)
}

func TestNextActive(t *testing.T) {
t.Run("empty", func(t *testing.T) {
index := &BitmapIndex{}
Expand Down
105 changes: 101 additions & 4 deletions exp/lighthorizon/tools/explorer.go → exp/lighthorizon/tools/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -27,7 +28,7 @@ var (
func AddIndexCommands(parent *cobra.Command) *cobra.Command {
cmd := &cobra.Command{
Use: "index",
Long: "Lets you view details about an index source.",
Long: "Lets you view details about an index source and modify it.",
Example: `
index view file:///tmp/indices
index view file:///tmp/indices GAGJZWQ5QT34VK3U6W6YKRYFIK6YSAXQC6BHIIYLG6X3CE5QW2KAYNJR
Expand Down Expand Up @@ -163,9 +164,36 @@ view gcs://indices --limit=10 GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGL
},
}

purge := &cobra.Command{
Use: "purge <index path> <start ledger> <end ledger>",
Long: "Purges all indices for the given ledger range.",
Example: `purge s3://indices 10000 10005`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 3 {
return cmd.Usage()
}

path := args[0]
start, err := strconv.ParseUint(args[1], 10, 32)
if err != nil {
return cmd.Usage()
}
end, err := strconv.ParseUint(args[2], 10, 32)
if err != nil {
return cmd.Usage()
}

r := historyarchive.Range{Low: uint32(start), High: uint32(end)}
log.Infof("Purging all indices from %s for ledger range: [%d, %d].",
path, r.Low, r.High)

return purgeIndex(path, r)
},
}

view.Flags().Uint("limit", 10, "a maximum number of accounts or checkpoints to show")
view.Flags().String("index-name", "", "filter for a particular index")
cmd.AddCommand(stats, view)
cmd.AddCommand(stats, view, purge)

if parent == nil {
return cmd
Expand All @@ -179,12 +207,15 @@ func getIndex(path, account, indexName string, limit uint) map[uint32][]string {

store, err := index.Connect(path)
if err != nil {
log.Fatal(err)
log.Fatalf("Failed to connect to index store at %s: %v", path, err)
return nil
}

indices, err := store.Read(account)
if err != nil {
log.Fatal(err)
log.Fatalf("Failed to read indices for %s from index store at %s: %v",
account, path, err)
return nil
}

// It's better to summarize activity and then group it by index rather than
Expand Down Expand Up @@ -257,3 +288,69 @@ func showAccounts(path string, limit uint) []string {

return accounts
}

func purgeIndex(path string, r historyarchive.Range) error {
freq := historyarchive.DefaultCheckpointFrequency
store, err := index.Connect(path)
if err != nil {
log.Fatalf("Failed to connect to index store at %s: %v", path, err)
return err
}

accounts, err := store.ReadAccounts()
if err != nil {
log.Fatalf("Failed read accounts: %v", err)
return err
}

purged := 0
for _, account := range accounts {
L := log.WithField("account", account)

indices, err := store.Read(account)
if err != nil {
L.Errorf("Failed to read indices: %v", err)
continue
}

for name, index := range indices {
var err error
active := uint32(0)
for err == nil {
if active*freq < r.Low { // too low, skip ahead
active, err = index.NextActiveBit(active + 1)
continue
} else if active*freq > r.High { // too high, we're done
break
}

L.WithField("index", name).
Debugf("Purged checkpoint %d (ledgers %d through %d).",
active, active*freq, (active+1)*freq-1)

purged++

index.SetInactive(active)
active, err = index.NextActiveBit(active)
}

if err != nil && err != io.EOF {
L.WithField("index", name).
Errorf("Iterating over index failed: %v", err)
continue
}

}

store.AddParticipantToIndexesNoBackend(account, indices)
if err := store.Flush(); err != nil {
log.WithField("account", account).
Errorf("Flushing index failed: %v", err)
continue
}
}

log.Infof("Purged %d values across %d accounts from all indices at %s.",
purged, len(accounts), path)
return nil
}
58 changes: 58 additions & 0 deletions exp/lighthorizon/tools/index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package tools

import (
"path/filepath"
"testing"

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/keypair"
"github.com/stellar/go/support/log"
"github.com/stretchr/testify/require"
)

const (
freq = historyarchive.DefaultCheckpointFrequency
)

func TestIndexPurge(t *testing.T) {
log.SetLevel(log.DebugLevel)

tempFile := "file://" + filepath.Join(t.TempDir(), "index-store")
accounts := []string{keypair.MustRandom().Address()}

idx, err := index.Connect(tempFile)
require.NoError(t, err)

for _, chk := range []uint32{14, 15, 16, 17, 20, 25, 123} {
require.NoError(t, idx.AddParticipantsToIndexes(chk, "test", accounts))
}

idx.Flush() // saves to disk

// Try purging the index
err = purgeIndex(tempFile, historyarchive.Range{Low: 15 * freq, High: 22 * freq})
require.NoError(t, err)

// Check to make sure it worked.
idx, err = index.Connect(tempFile)
require.NoError(t, err)

// Ensure that the index is in the expected state.
indices, err := idx.Read(accounts[0])
require.NoError(t, err)
require.Contains(t, indices, "test")

index := indices["test"]
i, err := index.NextActiveBit(0)
require.NoError(t, err)
require.EqualValues(t, 14, i)

i, err = index.NextActiveBit(15)
require.NoError(t, err)
require.EqualValues(t, 25, i)

i, err = index.NextActiveBit(i + 1)
require.NoError(t, err)
require.EqualValues(t, 123, i)
}