Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-61985: Add a Scorch event for Index() #2054

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (b *Batch) Index(id string, data interface{}) error {
if id == "" {
return ErrorEmptyID
}
if eventIndex, ok := b.index.(index.EventIndex); ok {
eventIndex.FireIndexEvent()
}
doc := document.NewDocument(id)
err := b.index.Mapping().MapDocument(doc, data)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions index/scorch/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ var EventKindMergeTaskIntroduction = EventKind(8)
// EventKindPreMergeCheck is fired before the merge begins to check if
// the caller should proceed with the merge.
var EventKindPreMergeCheck = EventKind(9)

// EventKindIndexStart is fired when Index() is invoked which
// creates a new Document object from an interface using the index mapping.
var EventKindIndexStart = EventKind(10)
12 changes: 8 additions & 4 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Scorch struct {

unsafeBatch bool

rootLock sync.RWMutex
rootLock sync.RWMutex

root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
Expand Down Expand Up @@ -376,6 +376,8 @@ func (s *Scorch) Delete(id string) error {
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()

// notify handlers that we're about to index a batch of data
s.fireEvent(EventKindBatchIntroductionStart, 0)
defer func() {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()
Expand Down Expand Up @@ -434,9 +436,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

indexStart := time.Now()

// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)

var newSegment segment.Segment
var bufBytes uint64
stats := newFieldStats()
Expand Down Expand Up @@ -878,3 +877,8 @@ func (s *Scorch) CopyReader() index.CopyReader {
s.rootLock.Unlock()
return rv
}

// external API to fire a scorch event (EventKindIndexStart) externally from bleve
func (s *Scorch) FireIndexEvent() {
s.fireEvent(EventKindIndexStart, 0)
}
15 changes: 15 additions & 0 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) {
return ErrorIndexClosed
}

i.FireIndexEvent()

doc := document.NewDocument(id)
err = i.m.MapDocument(doc, data)
if err != nil {
Expand Down Expand Up @@ -1112,3 +1114,16 @@ func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser,
return os.OpenFile(filepath.Join(string(f), dir, file),
os.O_RDWR|os.O_CREATE, 0600)
}

func (i *indexImpl) FireIndexEvent() {
// get the internal index implementation
internalIndex, err := i.Advanced()
if err != nil {
return
}
// check if the internal index implementation supports events
if internalEventIndex, ok := internalIndex.(index.EventIndex); ok {
// fire the Index() event
internalEventIndex.FireIndexEvent()
}
}
Loading