From 997f68b8c3e4352d817fdff372194e06702c5f7f Mon Sep 17 00:00:00 2001 From: dark-angel <70754989+inferno-umar@users.noreply.github.com> Date: Tue, 6 Feb 2024 01:10:10 +0530 Subject: [PATCH 1/8] fix: Elasticsearch: Request Entity Too Large #28117 Fix for gitea putting everything into one request and send it to elasticsearch as issued in #28117 --- .../code/elasticsearch/elasticsearch.go | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 2fadbfeb064e..0bb17fe39d6a 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -179,13 +179,44 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st reqs = append(reqs, b.addDelete(filename, repo)) } - if len(reqs) > 0 { - _, err := b.inner.Client.Bulk(). - Index(b.inner.VersionedIndexName()). - Add(reqs...). - Do(ctx) + queue_settings, err := setting.GetQueueSettings(setting.CfgProvider, "code_indexer") + if err != nil { + log.Error("Could not fetch queue code_indexer") return err } + + // Helper function + min := func(a, b int) int { + if a <= b { + return a + } + return b + } + + if len(reqs) > 0 { + var batch_head int + max_per_batch_req_count := len(reqs)/queue_settings.BatchLength + (len(reqs) % queue_settings.BatchLength) + + for i := 0; i < queue_settings.BatchLength; i++ { + + // Taking in another variable because (*elastic.BulkService).Do(ctx context.Context) clears out the requests slice upon successful batch + bulk_req := reqs[batch_head:min(batch_head+max_per_batch_req_count, len(reqs))] + + if len(bulk_req) > 0 { + bulk_service := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). + Add(bulk_req...) + + _, err := bulk_service.Do(ctx) + + batch_head += max_per_batch_req_count + + if err != nil { + return err + } + } + } + } return nil } From 3725ce5c231e8bb9ba0c89d6f0bbedfc45f84cf1 Mon Sep 17 00:00:00 2001 From: dark-angel <70754989+inferno-umar@users.noreply.github.com> Date: Tue, 6 Feb 2024 01:40:02 +0530 Subject: [PATCH 2/8] refac: linting --- .../code/elasticsearch/elasticsearch.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 0bb17fe39d6a..2bd85c2179cb 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -179,7 +179,7 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st reqs = append(reqs, b.addDelete(filename, repo)) } - queue_settings, err := setting.GetQueueSettings(setting.CfgProvider, "code_indexer") + queueSettings, err := setting.GetQueueSettings(setting.CfgProvider, "code_indexer") if err != nil { log.Error("Could not fetch queue code_indexer") return err @@ -194,22 +194,22 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st } if len(reqs) > 0 { - var batch_head int - max_per_batch_req_count := len(reqs)/queue_settings.BatchLength + (len(reqs) % queue_settings.BatchLength) + var batchHead int + maxPerBatchReqCount := len(reqs)/queueSettings.BatchLength + (len(reqs) % queueSettings.BatchLength) - for i := 0; i < queue_settings.BatchLength; i++ { + for i := 0; i < queueSettings.BatchLength; i++ { // Taking in another variable because (*elastic.BulkService).Do(ctx context.Context) clears out the requests slice upon successful batch - bulk_req := reqs[batch_head:min(batch_head+max_per_batch_req_count, len(reqs))] + bulkReq := reqs[batchHead:min(batchHead+maxPerBatchReqCount, len(reqs))] - if len(bulk_req) > 0 { - bulk_service := b.inner.Client.Bulk(). + if len(bulkReq) > 0 { + bulkService := b.inner.Client.Bulk(). Index(b.inner.VersionedIndexName()). - Add(bulk_req...) + Add(bulkReq...) - _, err := bulk_service.Do(ctx) + _, err := bulkService.Do(ctx) - batch_head += max_per_batch_req_count + batchHead += maxPerBatchReqCount if err != nil { return err From 32d53620091882975b5deaac06d58f5f5c19bbee Mon Sep 17 00:00:00 2001 From: umar Date: Tue, 6 Feb 2024 20:07:02 +0530 Subject: [PATCH 3/8] fix: Batching index size = 12 --- .../indexer/code/elasticsearch/elasticsearch.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 2bd85c2179cb..4653ca838e8a 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -179,13 +179,7 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st reqs = append(reqs, b.addDelete(filename, repo)) } - queueSettings, err := setting.GetQueueSettings(setting.CfgProvider, "code_indexer") - if err != nil { - log.Error("Could not fetch queue code_indexer") - return err - } - - // Helper function + // Helper function to support Go v1.2 and below min := func(a, b int) int { if a <= b { return a @@ -194,11 +188,12 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st } if len(reqs) > 0 { + esBatchSize := 12 // Hardcoded batch size for ElasticSearch index update var batchHead int - maxPerBatchReqCount := len(reqs)/queueSettings.BatchLength + (len(reqs) % queueSettings.BatchLength) - for i := 0; i < queueSettings.BatchLength; i++ { + maxPerBatchReqCount := len(reqs)/esBatchSize + (len(reqs) % esBatchSize) + for i := 0; i < esBatchSize; i++ { // Taking in another variable because (*elastic.BulkService).Do(ctx context.Context) clears out the requests slice upon successful batch bulkReq := reqs[batchHead:min(batchHead+maxPerBatchReqCount, len(reqs))] @@ -209,12 +204,12 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st _, err := bulkService.Do(ctx) - batchHead += maxPerBatchReqCount - if err != nil { return err } } + + batchHead += maxPerBatchReqCount } } return nil From 9a3a4a67680ffcecee69f3950bc017131ccbb480 Mon Sep 17 00:00:00 2001 From: umar Date: Tue, 6 Feb 2024 20:33:56 +0530 Subject: [PATCH 4/8] refac: gofumpt err fix --- modules/indexer/code/elasticsearch/elasticsearch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 4653ca838e8a..d6706806a77a 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -202,7 +202,8 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st Index(b.inner.VersionedIndexName()). Add(bulkReq...) - _, err := bulkService.Do(ctx) + _, err := bulkService. + Do(ctx) if err != nil { return err From 7f09c7b7e24eb754706af83b6893dcf17fbc600a Mon Sep 17 00:00:00 2001 From: umar Date: Tue, 6 Feb 2024 20:38:26 +0530 Subject: [PATCH 5/8] refac: gofumpt err re-fix --- modules/indexer/code/elasticsearch/elasticsearch.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index d6706806a77a..6cd044cc7c25 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -204,7 +204,6 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st _, err := bulkService. Do(ctx) - if err != nil { return err } From d9ec60ecdffc8eb03fa21bb822eaf9cee665a435 Mon Sep 17 00:00:00 2001 From: umar Date: Wed, 7 Feb 2024 08:52:51 +0530 Subject: [PATCH 6/8] fix: Batch ES requests in chunks of 50 or less --- .../code/elasticsearch/elasticsearch.go | 39 +++++-------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 6cd044cc7c25..20d9a8647e23 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -179,37 +179,18 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st reqs = append(reqs, b.addDelete(filename, repo)) } - // Helper function to support Go v1.2 and below - min := func(a, b int) int { - if a <= b { - return a - } - return b - } - if len(reqs) > 0 { - esBatchSize := 12 // Hardcoded batch size for ElasticSearch index update - var batchHead int - - maxPerBatchReqCount := len(reqs)/esBatchSize + (len(reqs) % esBatchSize) - - for i := 0; i < esBatchSize; i++ { - // Taking in another variable because (*elastic.BulkService).Do(ctx context.Context) clears out the requests slice upon successful batch - bulkReq := reqs[batchHead:min(batchHead+maxPerBatchReqCount, len(reqs))] - - if len(bulkReq) > 0 { - bulkService := b.inner.Client.Bulk(). - Index(b.inner.VersionedIndexName()). - Add(bulkReq...) - - _, err := bulkService. - Do(ctx) - if err != nil { - return err - } + esBatchSize := 50 + bulkService := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()) + + for i := 0; i < len(reqs); i = i + esBatchSize { + _, err := bulkService. + Add(reqs[i:min(i+esBatchSize, len(reqs))]...). + Do(ctx) + if err != nil { + return err } - - batchHead += maxPerBatchReqCount } } return nil From 055cb831c834ec33b486f6935eec2a591dbae72c Mon Sep 17 00:00:00 2001 From: umar Date: Wed, 7 Feb 2024 08:59:30 +0530 Subject: [PATCH 7/8] refac: lint --- modules/indexer/code/elasticsearch/elasticsearch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 20d9a8647e23..01371d6caaf7 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -184,7 +184,7 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st bulkService := b.inner.Client.Bulk(). Index(b.inner.VersionedIndexName()) - for i := 0; i < len(reqs); i = i + esBatchSize { + for i := 0; i < len(reqs); i += esBatchSize { _, err := bulkService. Add(reqs[i:min(i+esBatchSize, len(reqs))]...). Do(ctx) From 46c1c217aeadd7eba88d259125a3f20080f7e8e0 Mon Sep 17 00:00:00 2001 From: umar Date: Wed, 7 Feb 2024 10:17:10 +0530 Subject: [PATCH 8/8] fix: Moved BulkService inside loop --- modules/indexer/code/elasticsearch/elasticsearch.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 01371d6caaf7..0f70f1348552 100644 --- a/modules/indexer/code/elasticsearch/elasticsearch.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -181,11 +181,10 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st if len(reqs) > 0 { esBatchSize := 50 - bulkService := b.inner.Client.Bulk(). - Index(b.inner.VersionedIndexName()) for i := 0; i < len(reqs); i += esBatchSize { - _, err := bulkService. + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). Add(reqs[i:min(i+esBatchSize, len(reqs))]...). Do(ctx) if err != nil {