Skip to content

Commit

Permalink
Another version of send_bulk_index_request
Browse files Browse the repository at this point in the history
  • Loading branch information
felipeelia committed Feb 28, 2022
1 parent 7b38323 commit 1395043
Showing 1 changed file with 79 additions and 17 deletions.
96 changes: 79 additions & 17 deletions includes/classes/Indexable.php
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,27 @@ public function bulk_index_dynamically( $object_ids ) {
* @return array[WP_Error|array]
*/
protected function send_bulk_index_request( $documents ) {
static $min_buffer_size = MB_IN_BYTES / 8;
static $max_buffer_size = 1.5 * MB_IN_BYTES;
static $min_buffer_size = MB_IN_BYTES / 2;
static $max_buffer_size = 150 * MB_IN_BYTES;
static $current_buffer_size;

error_log( '' );
error_log( '' );
error_log( '' );
error_log( '==============================================' );
error_log( '============ STARTING A NEW BATCH ============' );
error_log( '==============================================' );
error_log( '' );

error_log( '' );
error_log( 'Documents count: ' . count( $documents ) );
error_log( 'Documents avg size: ' . ( mb_strlen( implode( '', $documents ) ) / count( $documents ) ) );
error_log( '' );
error_log( '==============================================' );
error_log( '' );

$incremental_step = MB_IN_BYTES / 2;

if ( ! $current_buffer_size ) {
$current_buffer_size = $min_buffer_size;
}
Expand All @@ -413,31 +430,43 @@ protected function send_bulk_index_request( $documents ) {

$body = [];

$requests = 0;

/*
* This script will use two main arrays: $body and $documents, being $body the
* documents to be sent in the next request and $documents the list of docs to be indexed.
* The do-while loop will stop if all documents are sent or if a request fails even sending
* a buffer as small as possible.
*/
do {
// Move the next document to the body array.
$body[] = array_shift( $documents );

// If $documents is empty, we do want to send the request with the final docs (already in $body at this point.)
// Body is still too small, continue.
if ( mb_strlen( implode( '', $body ) ) < $current_buffer_size && ! empty( $documents ) ) {
continue;
}
$next_document = array_shift( $documents );

if ( mb_strlen( implode( '', $body ) ) > $max_buffer_size ) {
// The last document added to body made it too big, so let's give it back.
array_unshift( $documents, array_pop( $body ) );
// If the next document alone takes the entire current buffer size,
// let's add it back to the pipe and send what we have first
if ( mb_strlen( $next_document ) > $current_buffer_size && count( $body ) > 0 ) {
array_unshift( $documents, $next_document );
} else {
if ( mb_strlen( $next_document ) > $max_buffer_size ) {
error_log( 'Indexable too big. Request not sent.' );
error_log( '==============================================' );
$results[] = new \WP_Error( 'ep_too_big_request_skipped', 'Indexable too big. Request not sent.' );
continue;
}
$body[] = $next_document;
if ( mb_strlen( implode( '', $body ) ) < $current_buffer_size && ! empty( $documents ) ) {
continue;
}
if ( mb_strlen( implode( '', $body ) ) > $max_buffer_size ) {
// The last document added to body made it too big, so let's give it back.
array_unshift( $documents, array_pop( $body ) );
}
}

// Try the request.
timer_start();
$result = Elasticsearch::factory()->bulk_index( $this->get_index_name(), $this->slug, implode( '', $body ) );
$request_time = timer_stop();
$requests++;

// @todo This needs to be removed before merge.
error_log( 'count( $body ): ' . count( $body ) );
Expand All @@ -452,29 +481,62 @@ protected function send_bulk_index_request( $documents ) {

// It failed, possibly adjust the buffer size and try again.
if ( is_wp_error( $result ) ) {
// Too many requests, wait.
if ( 429 === $result->get_error_code() ) {
sleep( 2 );
}

if ( 413 !== $result->get_error_code() ) {
$results[] = $result;
continue;
}

error_log( $result->get_error_code() . ' - ' . $result->get_error_message() );
error_log( '==============================================' );

if ( count( $body ) === 1 ) {
error_log( 'ONE POST TOO BIG' );
error_log( '==============================================' );
$max_buffer_size = min( $max_buffer_size, mb_strlen( implode( '', $body ) ) );
$results[] = $result;
$body = [];
continue;
}

// As the buffer is as small as possible, return the error.
if ( $current_buffer_size === $min_buffer_size ) {
if ( mb_strlen( implode( '', $body ) ) === $min_buffer_size ) {
error_log( 'THIS REALLY FAILED' );
error_log( '==============================================' );

$results[] = $result;
break;
continue;
}

// We have a too big buffer. Remove one doc from the body, and set both max and current as its size.
array_unshift( $documents, array_pop( $body ) );
$max_buffer_size = mb_strlen( implode( '', $body ) );

$max_buffer_size = count( $body ) ?
max( $min_buffer_size, mb_strlen( implode( '', $body ) ) ) :
$min_buffer_size;

$current_buffer_size = $max_buffer_size;
continue;
}

// Things worked so we can try to bump the buffer size.
if ( $current_buffer_size < $max_buffer_size && mb_strlen( implode( '', $body ) ) > $current_buffer_size ) {
$current_buffer_size = mb_strlen( implode( '', $body ) );
$current_buffer_size = min( ( $current_buffer_size + $incremental_step ), $max_buffer_size );
}

$results[] = $result;

$body = [];
} while ( ! empty( $documents ) );

error_log( '' );
error_log( '# of requests: ' . $requests );
error_log( '==============================================' );

return $results;
}

Expand Down

0 comments on commit 1395043

Please sign in to comment.