Skip to content

Commit

Permalink
feat(scala): browse helpers (#4062)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fluf22 authored Oct 31, 2024
1 parent 5a8605e commit 5c13605
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package algoliasearch.extension.internal

import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.concurrent.duration.Duration

private[algoliasearch] object Iterable {
case class Error[T](
validate: T => Boolean,
message: Option[T => String] = None
)

def createIterable[T](
execute: Option[T] => Future[T],
validate: T => Boolean,
aggregator: Option[T => Unit] = None,
timeout: () => Duration = () => Duration.Zero,
error: Option[Iterable.Error[T]] = None
)(implicit ec: ExecutionContext): Future[T] = {
def executor(previousResponse: Option[T] = None): Future[T] = {
execute(previousResponse).flatMap { response =>
// Call aggregator if defined
aggregator.foreach(agg => agg(response))

// Validate the response
if (validate(response)) {
Future.successful(response)
} else {
// Check for error validation
error match {
case Some(err) if err.validate(response) =>
err.message match {
case Some(errMsg) => Future.failed(new Exception(errMsg(response)))
case None => Future.failed(new Exception("An error occurred"))
}
case _ =>
// Sleep for timeout duration, then retry
blocking(Thread.sleep(timeout().toMillis))
executor(Some(response))
}
}
}
}

executor()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package algoliasearch
import algoliasearch.api.SearchClient
import algoliasearch.config.RequestOptions
import algoliasearch.exception.AlgoliaApiException
import algoliasearch.extension.internal.Iterable.createIterable
import algoliasearch.extension.internal.RetryUntil.{DEFAULT_DELAY, retryUntil}
import algoliasearch.search._

Expand Down Expand Up @@ -354,9 +355,6 @@ package object extension {
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val requests = objects.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"

for {
Expand Down Expand Up @@ -405,6 +403,12 @@ package object extension {
)
}

/** Check if an index exists.
* @param indexName
* The index name to check.
* @return
* A future containing a boolean indicating if the index exists.
*/
def indexExists(indexName: String)(implicit ec: ExecutionContext): Future[Boolean] = {
try {
client.getSettings(indexName)
Expand All @@ -415,5 +419,128 @@ package object extension {

Future.successful(true)
}

/** Browse objects in an index.
* @param indexName
* The index name to browse.
* @param browseParams
* The browse parameters.
* @param validate
* The validation function. Default is to check if the cursor is defined.
* @param aggregator
* The aggregation function. This is where you can aggregate the results.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the last browse response.
*/
def browseObjects(
indexName: String,
browseParams: BrowseParamsObject,
validate: BrowseResponse => Boolean = response => response.cursor.isEmpty,
aggregator: BrowseResponse => Unit,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[BrowseResponse] = {
createIterable(
execute = (previousResponse: Option[BrowseResponse]) =>
client.browse(
indexName,
Some(
browseParams.copy(
hitsPerPage = previousResponse.flatMap(_.hitsPerPage.orElse(Some(1000))),
cursor = previousResponse.flatMap(_.cursor)
)
),
requestOptions
),
validate = validate,
aggregator = Some(aggregator)
)
}

/** Browse rules in an index.
* @param indexName
* The index name to browse.
* @param searchRulesParams
* The search rules parameters.
* @param validate
* The validation function. Default is to check if the number of hits is less than the hits per page.
* @param aggregator
* The aggregation function. This is where you can aggregate the results.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the last search rules response.
*/
def browseRules(
indexName: String,
searchRulesParams: SearchRulesParams,
validate: Option[SearchRulesResponse => Boolean] = None,
aggregator: SearchRulesResponse => Unit,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[SearchRulesResponse] = {
val hitsPerPage = 1000

createIterable(
execute = (previousResponse: Option[SearchRulesResponse]) =>
client.searchRules(
indexName,
Some(
searchRulesParams.copy(
page = previousResponse.map(_.page + 1).orElse(Some(0)),
hitsPerPage = Some(hitsPerPage)
)
),
requestOptions
),
validate = validate.getOrElse((response: SearchRulesResponse) => response.hits.length < hitsPerPage),
aggregator = Some(aggregator)
)
}

/** Browse synonyms in an index.
* @param indexName
* The index name to browse.
* @param searchSynonymsParams
* The search synonyms parameters.
* @param validate
* The validation function. Default is to check if the number of hits is less than the hits per page.
* @param aggregator
* The aggregation function. This is where you can aggregate the results.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the last search synonyms response.
*/
def browseSynonyms(
indexName: String,
searchSynonymsParams: SearchSynonymsParams,
validate: Option[SearchSynonymsResponse => Boolean] = None,
aggregator: SearchSynonymsResponse => Unit,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[SearchSynonymsResponse] = {
val hitsPerPage = 1000
var page = searchSynonymsParams.page.getOrElse(0)

createIterable(
execute = (_: Option[SearchSynonymsResponse]) =>
try {
client.searchSynonyms(
indexName,
Some(
searchSynonymsParams.copy(
page = Some(page),
hitsPerPage = Some(hitsPerPage)
)
),
requestOptions
)
} finally {
page += 1
},
validate = validate.getOrElse((response: SearchSynonymsResponse) => response.hits.length < hitsPerPage),
aggregator = Some(aggregator)
)
}
}
}

0 comments on commit 5c13605

Please sign in to comment.