Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(browse): Fix browse pagination and multi-browse path issue #2984

Merged
merged 11 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions gms/api/src/main/idl/com.linkedin.entity.entities.restspec.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@
"type" : "int"
} ],
"returns" : "com.linkedin.metadata.query.AutoCompleteResult"
}, {
"name" : "batchGetNumEntities",
"parameters" : [ {
"name" : "entities",
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }"
} ],
"returns" : "{ \"type\" : \"map\", \"values\" : \"long\" }"
}, {
"name" : "batchIngest",
"parameters" : [ {
Expand Down Expand Up @@ -80,6 +87,13 @@
"type" : "com.linkedin.common.Urn"
} ],
"returns" : "{ \"type\" : \"array\", \"items\" : \"string\" }"
}, {
"name" : "getNumEntities",
"parameters" : [ {
"name" : "entity",
"type" : "string"
} ],
"returns" : "long"
}, {
"name" : "ingest",
"parameters" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4340,6 +4340,13 @@
"type" : "int"
} ],
"returns" : "com.linkedin.metadata.query.AutoCompleteResult"
}, {
"name" : "batchGetNumEntities",
"parameters" : [ {
"name" : "entities",
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }"
} ],
"returns" : "{ \"type\" : \"map\", \"values\" : \"long\" }"
}, {
"name" : "batchIngest",
"parameters" : [ {
Expand Down Expand Up @@ -4373,6 +4380,13 @@
"type" : "com.linkedin.common.Urn"
} ],
"returns" : "{ \"type\" : \"array\", \"items\" : \"string\" }"
}, {
"name" : "getNumEntities",
"parameters" : [ {
"name" : "entity",
"type" : "string"
} ],
"returns" : "long"
}, {
"name" : "ingest",
"parameters" : [ {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.entity.client;

import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntitiesDoBatchGetNumEntitiesRequestBuilder;
import com.linkedin.entity.EntitiesDoGetNumEntitiesRequestBuilder;
import com.linkedin.entity.EntitiesDoSetWritableRequestBuilder;
import com.linkedin.restli.client.Response;
import org.slf4j.Logger;
Expand Down Expand Up @@ -266,4 +268,18 @@ public void setWritable(boolean canWrite) throws RemoteInvocationException {
ENTITIES_REQUEST_BUILDERS.actionSetWritable().valueParam(canWrite);
sendClientRequest(requestBuilder.build());
}

@Nonnull
public long getNumEntities(@Nonnull String entityName) throws RemoteInvocationException {
EntitiesDoGetNumEntitiesRequestBuilder requestBuilder =
ENTITIES_REQUEST_BUILDERS.actionGetNumEntities().entityParam(entityName);
return sendClientRequest(requestBuilder.build()).getEntity();
}

@Nonnull
public Map<String, Long> batchGetNumEntities(@Nonnull List<String> entityName) throws RemoteInvocationException {
EntitiesDoBatchGetNumEntitiesRequestBuilder requestBuilder =
ENTITIES_REQUEST_BUILDERS.actionBatchGetNumEntities().entitiesParam(new StringArray(entityName));
return sendClientRequest(requestBuilder.build()).getEntity();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.LongMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.Entity;
Expand Down Expand Up @@ -30,6 +31,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -101,8 +103,7 @@ public Task<Entity> get(@Nonnull String urnStr, @QueryParam(PARAM_ASPECTS) @Opti

@RestMethod.BatchGet
@Nonnull
public Task<Map<String, Entity>> batchGet(
@Nonnull Set<String> urnStrs,
public Task<Map<String, Entity>> batchGet(@Nonnull Set<String> urnStrs,
@QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) throws URISyntaxException {
log.info("BATCH GET {}", urnStrs.toString());
final Set<Urn> urns = new HashSet<>();
Expand Down Expand Up @@ -153,12 +154,9 @@ public Task<Void> batchIngest(@ActionParam(PARAM_ENTITIES) @Nonnull Entity[] ent

@Action(name = ACTION_SEARCH)
@Nonnull
public Task<SearchResult> search(
@ActionParam(PARAM_ENTITY) @Nonnull String entityName,
@ActionParam(PARAM_INPUT) @Nonnull String input,
@ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter,
@ActionParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion,
@ActionParam(PARAM_START) int start,
public Task<SearchResult> search(@ActionParam(PARAM_ENTITY) @Nonnull String entityName,
@ActionParam(PARAM_INPUT) @Nonnull String input, @ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter,
@ActionParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, @ActionParam(PARAM_START) int start,
@ActionParam(PARAM_COUNT) int count) {

log.info("GET SEARCH RESULTS for {} with query {}", entityName, input);
Expand All @@ -167,24 +165,18 @@ public Task<SearchResult> search(

@Action(name = ACTION_AUTOCOMPLETE)
@Nonnull
public Task<AutoCompleteResult> autocomplete(
@ActionParam(PARAM_ENTITY) @Nonnull String entityName,
@ActionParam(PARAM_QUERY) @Nonnull String query,
@ActionParam(PARAM_FIELD) @Optional @Nullable String field,
@ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter,
@ActionParam(PARAM_LIMIT) int limit) {
public Task<AutoCompleteResult> autocomplete(@ActionParam(PARAM_ENTITY) @Nonnull String entityName,
@ActionParam(PARAM_QUERY) @Nonnull String query, @ActionParam(PARAM_FIELD) @Optional @Nullable String field,
@ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter, @ActionParam(PARAM_LIMIT) int limit) {

return RestliUtils.toTask(() -> _searchService.autoComplete(entityName, query, field, filter, limit));
}

@Action(name = ACTION_BROWSE)
@Nonnull
public Task<BrowseResult> browse(
@ActionParam(PARAM_ENTITY) @Nonnull String entityName,
@ActionParam(PARAM_PATH) @Nonnull String path,
@ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter,
@ActionParam(PARAM_START) int start,
@ActionParam(PARAM_LIMIT) int limit) {
public Task<BrowseResult> browse(@ActionParam(PARAM_ENTITY) @Nonnull String entityName,
@ActionParam(PARAM_PATH) @Nonnull String path, @ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter,
@ActionParam(PARAM_START) int start, @ActionParam(PARAM_LIMIT) int limit) {

log.info("GET BROWSE RESULTS for {} at path {}", entityName, path);
return RestliUtils.toTask(() -> _searchService.browse(entityName, path, filter, start, limit));
Expand All @@ -210,4 +202,17 @@ public Task<Void> setWriteable(@ActionParam(PARAM_VALUE) @Optional("true") @Nonn
return null;
});
}

@Action(name = "getNumEntities")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to disrupt this PR too much - but what do you think about naming this getTotalEntityCount? getNumEntities is a bit too vague IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@Nonnull
public Task<Long> getNumEntities(@ActionParam(PARAM_ENTITY) @Nonnull String entityName) {
return RestliUtils.toTask(() -> _searchService.docCount(entityName));
}

@Action(name = "batchGetNumEntities")
@Nonnull
public Task<LongMap> batchGetNumEntities(@ActionParam(PARAM_ENTITIES) @Nonnull String[] entityNames) {
return RestliUtils.toTask(() -> new LongMap(
Arrays.stream(entityNames).collect(Collectors.toMap(Function.identity(), _searchService::docCount))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public interface SearchService {
*/
void clear();

/**
* Get the number of documents corresponding to the entity
*
* @param entityName name of the entity
*/
long docCount(@Nonnull String entityName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this hold up given Timeseries aspects? Can we call this getSnapshotDocCount or something of the type to illustrate this divergence?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeseries service doesn't implement SearchService. This is specific to search backend (which is only used by versioned aspects)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay got it


/**
* Updates or inserts the given search document.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public void clear() {
esWriteDAO.clear();
}

@Override
public long docCount(@Nonnull String entityName) {
return esSearchDAO.docCount(entityName);
}

@Override
public void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId) {
log.debug(String.format("Upserting Search document entityName: %s, document: %s, docId: %s", entityName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -37,7 +36,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -57,6 +56,9 @@ public class ESBrowseDAO {
private static final String URN = "urn";
private static final String REMOVED = "removed";

private static final String GROUP_AGG = "groups";
private static final String ALL_PATHS = "allPaths";

/**
* Gets a list of groups/entities that match given browse request.
*
Expand All @@ -74,14 +76,26 @@ public BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nu

try {
final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName));

final SearchResponse groupsResponse =
client.search(constructGroupsSearchRequest(indexName, path, requestMap, 0, 1000), RequestOptions.DEFAULT);
client.search(constructGroupsSearchRequest(indexName, path, requestMap), RequestOptions.DEFAULT);
final BrowseResultMetadata browseResultMetadata = extractGroupsResponse(groupsResponse, path, from, size);
final int numGroups = browseResultMetadata.getTotalNumEntities().intValue();

// Based on the number of groups returned, compute the from and size to query for entities
int entityFrom = Math.max(from - numGroups, 0);
int entitySize = Math.min(Math.max(from + size - numGroups, 0), size);
final SearchResponse entitiesResponse =
client.search(constructEntitiesSearchRequest(indexName, path, requestMap, from, size),
client.search(constructEntitiesSearchRequest(indexName, path, requestMap, entityFrom, entitySize),
RequestOptions.DEFAULT);
Comment on lines 99 to 101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small optimization available here- if entitySize is 0 we don't have to do this query

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or do we need to for counts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I was going to do that until you mentioned we need the whole counts

final BrowseResult result = extractQueryResult(groupsResponse, entitiesResponse, path, from);
result.getMetadata().setPath(path);
return result;
final int numEntities = (int) entitiesResponse.getHits().getTotalHits().value;
final List<BrowseResultEntity> browseResultEntityList = extractEntitiesResponse(entitiesResponse, path);

return new BrowseResult().setMetadata(browseResultMetadata)
.setEntities(new BrowseResultEntityArray(browseResultEntityList))
.setNumEntities(numGroups + numEntities)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be numElements or total or some other name that indicates it is a sum of entities + groups?

I would expect numEntities to be numEntities + group.map(group -> group.count).sum()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the naming here is confusing... I'd agree I'd think num entities would be either

a. The number of concrete entities matching the exact path OR
b. The total number of entities somewhere under this path

In fact, it may be useful to include both of these in the response separately.

Can we nip this in the bud with this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately BrowseResult PDL is in datahub-gms right now. We can make the above changes after moving them over here.

.setFrom(from)
.setPageSize(size);
} catch (Exception e) {
log.error("Browse query failed: " + e.getMessage());
throw new ESQueryException("Browse query failed: ", e);
Expand All @@ -99,11 +113,11 @@ private AggregationBuilder buildAggregations(@Nonnull String path) {
final String includeFilter = ESUtils.escapeReservedCharacters(path) + "/.*";
final String excludeFilter = ESUtils.escapeReservedCharacters(path) + "/.*/.*";

return AggregationBuilders.terms("groups")
return AggregationBuilders.terms(GROUP_AGG)
.field(BROWSE_PATH)
.size(Integer.MAX_VALUE)
.order(BucketOrder.count(true)) // Ascending order
.includeExclude(new IncludeExclude(includeFilter, excludeFilter));
.includeExclude(new IncludeExclude(includeFilter, excludeFilter))
.subAggregation(AggregationBuilders.terms(ALL_PATHS).field(BROWSE_PATH).size(Integer.MAX_VALUE));
}

/**
Expand All @@ -114,11 +128,10 @@ private AggregationBuilder buildAggregations(@Nonnull String path) {
*/
@Nonnull
protected SearchRequest constructGroupsSearchRequest(@Nonnull String indexName, @Nonnull String path,
@Nonnull Map<String, String> requestMap, int from, int size) {
@Nonnull Map<String, String> requestMap) {
final SearchRequest searchRequest = new SearchRequest(indexName);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
searchSourceBuilder.size(0);
searchSourceBuilder.query(buildQueryString(path, requestMap, true));
searchSourceBuilder.aggregation(buildAggregations(path));
searchRequest.source(searchSourceBuilder);
Expand Down Expand Up @@ -180,29 +193,6 @@ SearchRequest constructEntitiesSearchRequest(@Nonnull String indexName, @Nonnull
return searchRequest;
}

/**
* Extracts search responses into browse result.
*
* @param groupsResponse groups search response
* @param entitiesResponse entity search response
* @param path the path which is being browsed
* @param from index of first entity
* @return {@link BrowseResult}
*/
@Nonnull
private BrowseResult extractQueryResult(@Nonnull SearchResponse groupsResponse,
@Nonnull SearchResponse entitiesResponse, @Nonnull String path, int from) {
final List<BrowseResultEntity> browseResultEntityList = extractEntitiesResponse(entitiesResponse, path);
final BrowseResultMetadata browseResultMetadata = extractGroupsResponse(groupsResponse, path);
browseResultMetadata.setTotalNumEntities(
browseResultMetadata.getTotalNumEntities() + entitiesResponse.getHits().getTotalHits().value);
return new BrowseResult().setEntities(new BrowseResultEntityArray(browseResultEntityList))
.setMetadata(browseResultMetadata)
.setFrom(from)
.setPageSize(browseResultEntityList.size())
.setNumEntities((int) entitiesResponse.getHits().getTotalHits().value);
}

/**
* Extracts group search response into browse result metadata.
*
Expand All @@ -211,21 +201,35 @@ private BrowseResult extractQueryResult(@Nonnull SearchResponse groupsResponse,
* @return {@link BrowseResultMetadata}
*/
@Nonnull
private BrowseResultMetadata extractGroupsResponse(@Nonnull SearchResponse groupsResponse, @Nonnull String path) {
final ParsedTerms groups = (ParsedTerms) groupsResponse.getAggregations().getAsMap().get("groups");
private BrowseResultMetadata extractGroupsResponse(@Nonnull SearchResponse groupsResponse, @Nonnull String path,
int from, int size) {
final ParsedTerms groups = groupsResponse.getAggregations().get(GROUP_AGG);
final List<BrowseResultGroup> groupsAgg = groups.getBuckets()
.stream()
.filter(this::validateBucket)
.map(group -> new BrowseResultGroup().setName(getSimpleName(group.getKeyAsString()))
.setCount(group.getDocCount()))
// Sort by document count desc and then by name
.sorted(Comparator.<BrowseResultGroup, Long>comparing(BrowseResultGroup::getCount).reversed()
.thenComparing(Comparator.comparing(BrowseResultGroup::getName)))
.collect(Collectors.toList());
return new BrowseResultMetadata().setGroups(new BrowseResultGroupArray(groupsAgg))
.setTotalNumEntities(groupsResponse.getHits().getTotalHits().value)
// Get the groups that are in the from to from + size range
final List<BrowseResultGroup> paginatedGroups =
groupsAgg.size() <= from ? Collections.emptyList() : groupsAgg.subList(from, Math.min(from + size, groupsAgg.size()));
return new BrowseResultMetadata().setGroups(new BrowseResultGroupArray(paginatedGroups))
.setTotalNumEntities(groupsAgg.size())
.setPath(path);
}

/**
* Check if there are any paths that extends the matchedPath signifying that the path does not point to an entity
*/
private boolean validateBucket(@Nonnull MultiBucketsAggregation.Bucket bucket) {
final ParsedTerms groups = bucket.getAggregations().get(ALL_PATHS);
final String matchedPath = bucket.getKeyAsString();
return groups.getBuckets()
.stream()
.map(MultiBucketsAggregation.Bucket::getKeyAsString)
.anyMatch(bucketPath -> (bucketPath.length() > matchedPath.length() && bucketPath.startsWith(matchedPath)));
}

/**
* Extracts entity search response into list of browse result entities.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.metadata.search.elasticsearch.query.request.AutocompleteRequestHandler;
import com.linkedin.metadata.search.elasticsearch.query.request.SearchRequestHandler;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import java.io.IOException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand All @@ -18,6 +19,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.query.QueryBuilders;


/**
Expand All @@ -31,6 +34,18 @@ public class ESSearchDAO {
private final RestHighLevelClient client;
private final IndexConvention indexConvention;

public long docCount(@Nonnull String entityName) {
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
CountRequest countRequest =
new CountRequest(indexConvention.getIndexName(entitySpec)).query(QueryBuilders.matchAllQuery());
try {
return client.count(countRequest, RequestOptions.DEFAULT).getCount();
} catch (IOException e) {
log.error("Count query failed:" + e.getMessage());
throw new ESQueryException("Count query failed:", e);
}
}

@Nonnull
private SearchResult executeAndExtract(@Nonnull EntitySpec entitySpec, @Nonnull SearchRequest searchRequest, int from,
int size) {
Expand Down
Loading