Skip to content

Commit

Permalink
feat(clients): allow batch size on objects helper [skip-bc] (#4172)
Browse files Browse the repository at this point in the history
  • Loading branch information
shortcuts authored Nov 27, 2024
1 parent 3b61cf7 commit 2325c61
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,26 @@ public partial interface ISearchClient
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="SaveObjectsAsync{T}(string, IEnumerable{T}, RequestOptions, CancellationToken)"/>
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objectIDs">The list of `objectIDs` to remove from the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default);
/// <inheritdoc cref="DeleteObjectsAsync(string, IEnumerable{String}, RequestOptions, CancellationToken)"/>
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default);

/// <summary>
/// Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
Expand All @@ -193,11 +195,12 @@ public partial interface ISearchClient
/// <param name="objects">The list of `objects` to update in the given Algolia `indexName`.</param>
/// <param name="createIfNotExists">To be provided if non-existing objects are passed, otherwise, the call will fail.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="PartialUpdateObjectsAsync{T}(string, IEnumerable{T}, bool, RequestOptions, CancellationToken)"/>
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Check if an index exists.
Expand Down Expand Up @@ -568,43 +571,45 @@ public List<BatchResponse> ChunkedBatch<T>(string indexName, IEnumerable<T> obje
/// <inheritdoc/>
public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects,
bool waitForTasks = false,
int batchSize = 1000,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, waitForTasks, batchSize, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null,
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, waitForTasks, options, cancellationToken));
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, waitForTasks, batchSize, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs,
bool waitForTasks = false,
int batchSize = 1000,
RequestOptions options = null,
CancellationToken cancellationToken = default)
{
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, waitForTasks, batchSize, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null,
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) =>
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, waitForTasks, options, cancellationToken));
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, waitForTasks, batchSize, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, waitForTasks, batchSize, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000,
RequestOptions options = null, CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, waitForTasks, options, cancellationToken));
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, waitForTasks, batchSize, options, cancellationToken));

private static async Task<List<TU>> CreateIterable<TU>(Func<TU, Task<TU>> executeQuery,
Func<TU, bool> stopCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ public suspend fun SearchClient.chunkedBatch(
* @param indexName The index in which to perform the request.
* @param objects The list of objects to index.
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -379,14 +380,15 @@ public suspend fun SearchClient.saveObjects(
indexName: String,
objects: List<JsonObject>,
waitForTask: Boolean = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = Action.AddObject,
waitForTask = waitForTask,
batchSize = 1000,
batchSize = batchSize,
requestOptions = requestOptions,
)
}
Expand All @@ -397,6 +399,7 @@ public suspend fun SearchClient.saveObjects(
* @param indexName The index in which to perform the request.
* @param objectIDs The list of objectIDs to delete from the index.
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -405,14 +408,15 @@ public suspend fun SearchClient.deleteObjects(
indexName: String,
objectIDs: List<String>,
waitForTask: Boolean = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objectIDs.map { id -> JsonObject(mapOf("objectID" to Json.encodeToJsonElement(id))) },
action = Action.DeleteObject,
waitForTask = waitForTask,
batchSize = 1000,
batchSize = batchSize,
requestOptions = requestOptions,
)
}
Expand All @@ -424,6 +428,7 @@ public suspend fun SearchClient.deleteObjects(
* @param objects The list of objects to update in the index.
* @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call will fail..
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -433,14 +438,15 @@ public suspend fun SearchClient.partialUpdateObjects(
objects: List<JsonObject>,
createIfNotExists: Boolean,
waitForTask: Boolean = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTask = waitForTask,
batchSize = 1000,
batchSize = batchSize,
requestOptions = requestOptions,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ package object extension {
* The list of objects to save.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -263,9 +265,10 @@ package object extension {
indexName: String,
objects: Seq[Any],
waitForTasks: Boolean = false,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objects, Action.AddObject, waitForTasks, 1000, requestOptions)
chunkedBatch(indexName, objects, Action.AddObject, waitForTasks, batchSize, requestOptions)
}

/** Helper: Deletes every objects for the given objectIDs. The `chunkedBatch` helper is used under the hood, which
Expand All @@ -277,6 +280,8 @@ package object extension {
* The list of objectIDs to delete.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -286,14 +291,15 @@ package object extension {
indexName: String,
objectIDs: Seq[String],
waitForTasks: Boolean = false,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objectIDs.map(id => new { val objectID: String = id }),
Action.DeleteObject,
waitForTasks,
1000,
batchSize,
requestOptions
)
}
Expand All @@ -309,6 +315,8 @@ package object extension {
* To be provided if non-existing objects are passed, otherwise, the call will fail.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -319,14 +327,15 @@ package object extension {
objects: Seq[Any],
createIfNotExists: Boolean = false,
waitForTasks: Boolean = false,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objects,
if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTasks,
1000,
batchSize,
requestOptions
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,20 +468,22 @@ public extension SearchClient {
/// - parameter indexName: The name of the index where to save the objects
/// - parameter objects: The new objects
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter batchSize: The maximum number of objects to include in a batch
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func saveObjects(
indexName: String,
objects: [some Encodable],
waitForTasks: Bool = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: .addObject,
waitForTasks: waitForTasks,
batchSize: 1000,
batchSize: batchSize,
requestOptions: requestOptions
)
}
Expand All @@ -491,20 +493,22 @@ public extension SearchClient {
/// - parameter indexName: The name of the index to delete objectIDs from
/// - parameter objectIDs: The objectIDs to delete
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter batchSize: The maximum number of objects to include in a batch
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func deleteObjects(
indexName: String,
objectIDs: [String],
waitForTasks: Bool = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objectIDs.map { AnyCodable(["objectID": $0]) },
action: .deleteObject,
waitForTasks: waitForTasks,
batchSize: 1000,
batchSize: batchSize,
requestOptions: requestOptions
)
}
Expand All @@ -516,21 +520,23 @@ public extension SearchClient {
/// - parameter createIfNotExists: To be provided if non-existing objects are passed, otherwise, the call will
/// fail..
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter batchSize: The maximum number of objects to include in a batch
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func partialUpdateObjects(
indexName: String,
objects: [some Encodable],
createIfNotExists: Bool = false,
waitForTasks: Bool = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: createIfNotExists ? .partialUpdateObject : .partialUpdateObjectNoCreate,
waitForTasks: waitForTasks,
batchSize: 1000,
batchSize: batchSize,
requestOptions: requestOptions
)
}
Expand Down
6 changes: 6 additions & 0 deletions specs/search/helpers/deleteObjects.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ method:
required: false
schema:
type: boolean
- in: query
name: batchSize
description: The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
required: false
schema:
type: integer
- in: query
name: requestOptions
description: The request options to pass to the `batch` method.
Expand Down
Loading

0 comments on commit 2325c61

Please sign in to comment.