Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
hihihuhu committed Feb 2, 2023
1 parent 611f7e7 commit 4da7ca7
Showing 1 changed file with 37 additions and 40 deletions.
77 changes: 37 additions & 40 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,20 @@ type InfoCache struct {
mu sync.RWMutex
// cache is sorted by SchemaVersion in descending order
cache []InfoSchema
// schema versions sorted by tiemstamp
sortedTS []versionAndTimestamp
}

// keep ts to version mapping
tsToVersion map[uint64]int64
versionToTs map[int64]uint64
// maintain the order everytime the cahce is updated
descSortedTs []uint64
type versionAndTimestamp struct {
version int64
timestamp int64
}

// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{
cache: make([]InfoSchema, 0, capacity),
tsToVersion: make(map[uint64]int64),
versionToTs: make(map[int64]uint64),
descSortedTs: make([]uint64, 0, capacity),
cache: make([]InfoSchema, 0, capacity),
sortedTS: make([]versionAndTimestamp, 0, capacity),
}
}

Expand All @@ -62,9 +61,7 @@ func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]InfoSchema, 0, capacity)
h.tsToVersion = make(map[uint64]int64)
h.versionToTs = make(map[int64]uint64)
h.descSortedTs = make([]uint64, 0, capacity)
h.sortedTS = make([]versionAndTimestamp, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand All @@ -87,11 +84,11 @@ func (h *InfoCache) GetVersionByTimestamp(ts uint64) (int64, error) {
}

func (h *InfoCache) getVersionByTimestampNoLock(ts uint64) (int64, error) {
i := sort.Search(len(h.descSortedTs), func(i int) bool {
return h.descSortedTs[i] <= ts
i := sort.Search(len(h.sortedTS), func(i int) bool {
return uint64(h.sortedTS[i].timestamp) <= ts
})
if i < len(h.descSortedTs) {
return h.tsToVersion[h.descSortedTs[i]], nil
if i < len(h.sortedTS) {
return h.sortedTS[i].version, nil
}

return 0, fmt.Errorf("no schema cached for timestamp %d", ts)
Expand Down Expand Up @@ -158,34 +155,34 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
defer h.mu.Unlock()

version := is.SchemaMetaVersion()
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
})

// maintain ts to version mapping
ts, ok := h.versionToTs[version]
if ok {
// version exists, only update ts if the new one is smaller
if snapshotTS < ts {
h.versionToTs[version] = snapshotTS
delete(h.tsToVersion, ts)
h.tsToVersion[snapshotTS] = version
pos := sort.Search(len(h.descSortedTs), func(i int) bool {
return h.descSortedTs[i] < ts
})
if pos > 0 {
h.descSortedTs[pos-1] = snapshotTS
}
i := sort.Search(len(h.sortedTS), func(i int) bool {
return h.sortedTS[i].version <= version
})
if i < len(h.sortedTS) && h.sortedTS[i].version == version {
// find the version, update the schema timestamp if the new timestamp is earlier
if snapshotTS < uint64(h.sortedTS[i].timestamp) {
h.sortedTS[i].timestamp = int64(snapshotTS)
}
} else {
// add the version
h.versionToTs[version] = snapshotTS
h.tsToVersion[snapshotTS] = version
h.descSortedTs = append(h.descSortedTs, snapshotTS)
if len(h.sortedTS) < cap(h.sortedTS) {
h.sortedTS = h.sortedTS[:len(h.sortedTS)+1]
copy(h.sortedTS[i+1:], h.sortedTS[i:])
h.sortedTS[i] = versionAndTimestamp{
version: version,
timestamp: int64(snapshotTS),
}
} else if i < len(h.sortedTS) {
copy(h.sortedTS[i+1:], h.sortedTS[i:])
h.sortedTS[i] = versionAndTimestamp{
version: version,
timestamp: int64(snapshotTS),
}
}
}
sort.Slice(h.descSortedTs, func(i, j int) bool {
// reverse the order
return h.descSortedTs[i] > h.descSortedTs[j]

i = sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
})

// cached entry
Expand Down

0 comments on commit 4da7ca7

Please sign in to comment.