diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java index 3959b325e9bf7..7ecb8548519c1 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ResolverUtils.java @@ -2,6 +2,7 @@ import com.datahub.authentication.Authentication; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.ValidationException; import com.linkedin.datahub.graphql.generated.FacetFilterInput; @@ -27,6 +28,9 @@ public class ResolverUtils { + private static final Set KEYWORD_EXCLUDED_FILTERS = ImmutableSet.of( + "runId" + ); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final Logger _logger = LoggerFactory.getLogger(ResolverUtils.class.getName()); @@ -89,7 +93,14 @@ public static Filter buildFilter(@Nullable List facetFilterInp return null; } return new Filter().setOr(new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(new CriterionArray(facetFilterInputs.stream() - .map(filter -> new Criterion().setField(filter.getField() + ESUtils.KEYWORD_SUFFIX).setValue(filter.getValue())) + .map(filter -> new Criterion().setField(getFilterField(filter.getField())).setValue(filter.getValue())) .collect(Collectors.toList()))))); } + + private static String getFilterField(final String originalField) { + if (KEYWORD_EXCLUDED_FILTERS.contains(originalField)) { + return originalField; + } + return originalField + ESUtils.KEYWORD_SUFFIX; + } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java index 5ab59f03df666..4ae16d850d8db 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java @@ -41,6 +41,7 @@ public static ExecutionRequest mapExecutionRequest(final EntityResponse entityRe final ExecutionRequest result = new ExecutionRequest(); result.setUrn(entityUrn.toString()); + result.setId(entityUrn.getId()); // Map input aspect. Must be present. final EnvelopedAspect envelopedInput = aspects.get(Constants.EXECUTION_REQUEST_INPUT_ASPECT_NAME); diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java index 5716c831e3135..4c5a34cf07449 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java @@ -20,6 +20,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.config.IngestionConfiguration; import com.linkedin.metadata.key.ExecutionRequestKey; +import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetcher; @@ -28,6 +29,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import org.json.JSONException; +import org.json.JSONObject; import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*; @@ -68,6 +71,7 @@ public CompletableFuture get(final DataFetchingEnvironment environment) final UUID uuid = UUID.randomUUID(); final String uuidStr = uuid.toString(); key.setId(uuidStr); + final Urn executionRequestUrn = EntityKeyUtils.convertEntityKeyToUrn(key, Constants.EXECUTION_REQUEST_ENTITY_NAME); proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key)); // Fetch the original ingestion source @@ -100,7 +104,7 @@ public CompletableFuture get(final DataFetchingEnvironment environment) execInput.setRequestedAt(System.currentTimeMillis()); Map arguments = new HashMap<>(); - arguments.put(RECIPE_ARG_NAME, ingestionSourceInfo.getConfig().getRecipe()); + arguments.put(RECIPE_ARG_NAME, injectRunId(ingestionSourceInfo.getConfig().getRecipe(), executionRequestUrn.toString())); arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().hasVersion() ? ingestionSourceInfo.getConfig().getVersion() : _ingestionConfiguration.getDefaultCliVersion() @@ -123,4 +127,23 @@ public CompletableFuture get(final DataFetchingEnvironment environment) throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator."); }); } + + /** + * Injects an override run id into a recipe for tracking purposes. Any existing run id will be overwritten. + * + * TODO: Determine if this should be handled in the executor itself. + * + * @param runId the run id to place into the recipe + * @return a modified recipe JSON string + */ + private String injectRunId(final String originalJson, final String runId) { + try { + JSONObject obj = new JSONObject(originalJson); + obj.put("run_id", runId); + return obj.toString(); + } catch (JSONException e) { + // This should ideally never be hit. + throw new IllegalArgumentException("Failed to create execution request: Invalid recipe json provided."); + } + } } diff --git a/datahub-graphql-core/src/main/resources/ingestion.graphql b/datahub-graphql-core/src/main/resources/ingestion.graphql index c279d497081de..3064a3b27165b 100644 --- a/datahub-graphql-core/src/main/resources/ingestion.graphql +++ b/datahub-graphql-core/src/main/resources/ingestion.graphql @@ -167,6 +167,11 @@ type ExecutionRequest { """ urn: String! + """ + Unique id for the execution request + """ + id: String! + """ Input provided when creating the Execution Request """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestTestUtils.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestTestUtils.java index 38b3b310c1302..822d353218ab3 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestTestUtils.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestTestUtils.java @@ -64,7 +64,7 @@ public static DataHubIngestionSourceInfo getTestIngestionSourceInfo() { info.setName("My Test Source"); info.setType("mysql"); info.setSchedule(new DataHubIngestionSourceSchedule().setTimezone("UTC").setInterval("* * * * *")); - info.setConfig(new DataHubIngestionSourceConfig().setVersion("0.8.18").setRecipe("my recipe").setExecutorId("executor id")); + info.setConfig(new DataHubIngestionSourceConfig().setVersion("0.8.18").setRecipe("{}").setExecutorId("executor id")); return info; } diff --git a/datahub-web-react/src/app/entity/container/ContainerEntitiesTab.tsx b/datahub-web-react/src/app/entity/container/ContainerEntitiesTab.tsx index 45161a99a1067..b4fd67806edfe 100644 --- a/datahub-web-react/src/app/entity/container/ContainerEntitiesTab.tsx +++ b/datahub-web-react/src/app/entity/container/ContainerEntitiesTab.tsx @@ -1,6 +1,6 @@ import React from 'react'; import { useEntityData } from '../shared/EntityContext'; -import { EmbeddedListSearch } from '../shared/components/styled/search/EmbeddedListSearch'; +import { EmbeddedListSearchSection } from '../shared/components/styled/search/EmbeddedListSearchSection'; export const ContainerEntitiesTab = () => { const { urn } = useEntityData(); @@ -11,7 +11,7 @@ export const ContainerEntitiesTab = () => { }; return ( - { const { urn, entityType } = useEntityData(); @@ -16,7 +16,7 @@ export const DomainEntitiesTab = () => { } return ( - - { return ( - void; + onChangeFilters: (filters) => void; + onChangePage: (page) => void; emptySearchQuery?: string | null; fixedFilter?: FacetFilterInput | null; fixedQuery?: string | null; placeholderText?: string | null; defaultShowFilters?: boolean; defaultFilters?: Array; + searchBarStyle?: any; + searchBarInputStyle?: any; useGetSearchResults?: (params: GetSearchResultsParams) => { data: SearchResultsInterface | undefined | null; loading: boolean; @@ -71,24 +69,26 @@ type Props = { }; export const EmbeddedListSearch = ({ + query, + filters, + page, + onChangeQuery, + onChangeFilters, + onChangePage, emptySearchQuery, fixedFilter, fixedQuery, placeholderText, defaultShowFilters, defaultFilters, + searchBarStyle, + searchBarInputStyle, useGetSearchResults = useWrappedSearchResults, }: Props) => { - const history = useHistory(); - const location = useLocation(); - const entityRegistry = useEntityRegistry(); - const baseParams = useEntityQueryParams(); - - const params = QueryString.parse(location.search, { arrayFormat: 'comma' }); - const query: string = addFixedQuery(params?.query as string, fixedQuery as string, emptySearchQuery as string); - const activeType = entityRegistry.getTypeOrDefaultFromPathName(useParams().type || '', undefined); - const page: number = params.page && Number(params.page as string) > 0 ? Number(params.page as string) : 1; - const filters: Array = useFilters(params); + // Adjust query based on props + const finalQuery: string = addFixedQuery(query as string, fixedQuery as string, emptySearchQuery as string); + + // Adjust filters based on props const filtersWithoutEntities: Array = filters.filter( (filter) => filter.field !== ENTITY_FILTER_NAME, ); @@ -106,7 +106,7 @@ export const EmbeddedListSearch = ({ variables: { input: { types: entityFilters, - query, + query: finalQuery, start: (page - 1) * SearchCfg.RESULTS_PER_PAGE, count: SearchCfg.RESULTS_PER_PAGE, filters: finalFilters, @@ -123,7 +123,7 @@ export const EmbeddedListSearch = ({ variables: { input: { types: entityFilters, - query, + query: finalQuery, start: (page - 1) * numResultsPerPage, count: numResultsPerPage, filters: finalFilters, @@ -136,42 +136,6 @@ export const EmbeddedListSearch = ({ const searchResultUrns = searchResultEntities.map((entity) => entity.urn); const selectedEntityUrns = selectedEntities.map((entity) => entity.urn); - const onSearch = (q: string) => { - const finalQuery = addFixedQuery(q as string, fixedQuery as string, emptySearchQuery as string); - navigateToEntitySearchUrl({ - baseUrl: location.pathname, - baseParams, - type: activeType, - query: finalQuery, - page: 1, - history, - }); - }; - - const onChangeFilters = (newFilters: Array) => { - navigateToEntitySearchUrl({ - baseUrl: location.pathname, - baseParams, - type: activeType, - query, - page: 1, - filters: newFilters, - history, - }); - }; - - const onChangePage = (newPage: number) => { - navigateToEntitySearchUrl({ - baseUrl: location.pathname, - baseParams, - type: activeType, - query, - page: newPage, - filters, - history, - }); - }; - const onToggleFilters = () => { setShowFilters(!showFilters); }; @@ -198,6 +162,12 @@ export const EmbeddedListSearch = ({ } }; + useEffect(() => { + if (!isSelectMode) { + setSelectedEntities([]); + } + }, [isSelectMode]); + useEffect(() => { if (defaultFilters) { onChangeFilters(defaultFilters); @@ -206,12 +176,6 @@ export const EmbeddedListSearch = ({ // eslint-disable-next-line react-hooks/exhaustive-deps }, []); - useEffect(() => { - if (!isSelectMode) { - setSelectedEntities([]); - } - }, [isSelectMode]); - // Filter out the persistent filter values const filteredFilters = data?.facets?.filter((facet) => facet.field !== fixedFilter?.field) || []; @@ -219,19 +183,21 @@ export const EmbeddedListSearch = ({ {error && message.error(`Failed to complete search: ${error && error.message}`)} onChangeQuery(addFixedQuery(q, fixedQuery as string, emptySearchQuery as string))} placeholderText={placeholderText} onToggleFilters={onToggleFilters} callSearchOnVariables={callSearchOnVariables} entityFilters={entityFilters} filters={finalFilters} - query={query} + query={finalQuery} isSelectMode={isSelectMode} isSelectAll={selectedEntities.length > 0 && isListSubset(searchResultUrns, selectedEntityUrns)} setIsSelectMode={setIsSelectMode} selectedEntities={selectedEntities} onChangeSelectAll={onChangeSelectAll} refetch={refetch as any} + searchBarStyle={searchBarStyle} + searchBarInputStyle={searchBarInputStyle} /> any; onChangeSelectAll: (selected: boolean) => void; refetch?: () => void; + searchBarStyle?: any; + searchBarInputStyle?: any; }; export default function EmbeddedListSearchHeader({ @@ -60,6 +62,8 @@ export default function EmbeddedListSearchHeader({ setIsSelectMode, onChangeSelectAll, refetch, + searchBarStyle, + searchBarInputStyle, }: Props) { const entityRegistry = useEntityRegistry(); @@ -76,14 +80,18 @@ export default function EmbeddedListSearchHeader({ initialQuery="" placeholderText={placeholderText || 'Search entities...'} suggestions={[]} - style={{ - maxWidth: 220, - padding: 0, - }} - inputStyle={{ - height: 32, - fontSize: 12, - }} + style={ + searchBarStyle || { + maxWidth: 220, + padding: 0, + } + } + inputStyle={ + searchBarInputStyle || { + height: 32, + fontSize: 12, + } + } onSearch={onSearch} onQueryChange={onSearch} entityRegistry={entityRegistry} diff --git a/datahub-web-react/src/app/entity/shared/components/styled/search/EmbeddedListSearchModal.tsx b/datahub-web-react/src/app/entity/shared/components/styled/search/EmbeddedListSearchModal.tsx new file mode 100644 index 0000000000000..4293789fcdd0b --- /dev/null +++ b/datahub-web-react/src/app/entity/shared/components/styled/search/EmbeddedListSearchModal.tsx @@ -0,0 +1,88 @@ +import React, { useState } from 'react'; +import { Button, Modal } from 'antd'; +import styled from 'styled-components'; +import { FacetFilterInput } from '../../../../../../types.generated'; +import { EmbeddedListSearch } from './EmbeddedListSearch'; + +const SearchContainer = styled.div` + height: 500px; +`; +const modalStyle = { + top: 40, +}; + +const modalBodyStyle = { + padding: 0, +}; + +type Props = { + emptySearchQuery?: string | null; + fixedFilter?: FacetFilterInput | null; + fixedQuery?: string | null; + placeholderText?: string | null; + defaultShowFilters?: boolean; + defaultFilters?: Array; + onClose?: () => void; + searchBarStyle?: any; + searchBarInputStyle?: any; +}; + +export const EmbeddedListSearchModal = ({ + emptySearchQuery, + fixedFilter, + fixedQuery, + placeholderText, + defaultShowFilters, + defaultFilters, + onClose, + searchBarStyle, + searchBarInputStyle, +}: Props) => { + // Component state + const [query, setQuery] = useState(''); + const [page, setPage] = useState(1); + const [filters, setFilters] = useState>([]); + + const onChangeQuery = (q: string) => { + setQuery(q); + }; + + const onChangeFilters = (newFilters: Array) => { + setFilters(newFilters); + }; + + const onChangePage = (newPage: number) => { + setPage(newPage); + }; + + return ( + Close} + > + + + + + ); +}; diff --git a/datahub-web-react/src/app/entity/shared/components/styled/search/EmbeddedListSearchSection.tsx b/datahub-web-react/src/app/entity/shared/components/styled/search/EmbeddedListSearchSection.tsx new file mode 100644 index 0000000000000..c121775c6e9aa --- /dev/null +++ b/datahub-web-react/src/app/entity/shared/components/styled/search/EmbeddedListSearchSection.tsx @@ -0,0 +1,100 @@ +import React from 'react'; +import * as QueryString from 'query-string'; +import { useHistory, useLocation } from 'react-router'; +import { ApolloError } from '@apollo/client'; +import { FacetFilterInput } from '../../../../../../types.generated'; +import useFilters from '../../../../../search/utils/useFilters'; +import { navigateToEntitySearchUrl } from './navigateToEntitySearchUrl'; +import { GetSearchResultsParams, SearchResultsInterface } from './types'; +import { useEntityQueryParams } from '../../../containers/profile/utils'; +import { EmbeddedListSearch } from './EmbeddedListSearch'; + +type Props = { + emptySearchQuery?: string | null; + fixedFilter?: FacetFilterInput | null; + fixedQuery?: string | null; + placeholderText?: string | null; + defaultShowFilters?: boolean; + defaultFilters?: Array; + searchBarStyle?: any; + searchBarInputStyle?: any; + useGetSearchResults?: (params: GetSearchResultsParams) => { + data: SearchResultsInterface | undefined | null; + loading: boolean; + error: ApolloError | undefined; + refetch: (variables: GetSearchResultsParams['variables']) => Promise; + }; +}; + +export const EmbeddedListSearchSection = ({ + emptySearchQuery, + fixedFilter, + fixedQuery, + placeholderText, + defaultShowFilters, + defaultFilters, + searchBarStyle, + searchBarInputStyle, + useGetSearchResults, +}: Props) => { + const history = useHistory(); + const location = useLocation(); + const baseParams = useEntityQueryParams(); + + const params = QueryString.parse(location.search, { arrayFormat: 'comma' }); + const query: string = params?.query as string; + const page: number = params.page && Number(params.page as string) > 0 ? Number(params.page as string) : 1; + const filters: Array = useFilters(params); + + const onSearch = (q: string) => { + navigateToEntitySearchUrl({ + baseUrl: location.pathname, + baseParams, + query: q, + page: 1, + history, + }); + }; + + const onChangeFilters = (newFilters: Array) => { + navigateToEntitySearchUrl({ + baseUrl: location.pathname, + baseParams, + query, + page: 1, + filters: newFilters, + history, + }); + }; + + const onChangePage = (newPage: number) => { + navigateToEntitySearchUrl({ + baseUrl: location.pathname, + baseParams, + query, + page: newPage, + filters, + history, + }); + }; + + return ( + + ); +}; diff --git a/datahub-web-react/src/app/entity/shared/tabs/Lineage/ImpactAnalysis.tsx b/datahub-web-react/src/app/entity/shared/tabs/Lineage/ImpactAnalysis.tsx index 085687d9f94db..5993df25bbef6 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Lineage/ImpactAnalysis.tsx +++ b/datahub-web-react/src/app/entity/shared/tabs/Lineage/ImpactAnalysis.tsx @@ -9,8 +9,8 @@ import { ENTITY_FILTER_NAME } from '../../../../search/utils/constants'; import useFilters from '../../../../search/utils/useFilters'; import { SearchCfg } from '../../../../../conf'; import analytics, { EventType } from '../../../../analytics'; -import { EmbeddedListSearch } from '../../components/styled/search/EmbeddedListSearch'; import generateUseSearchResultsViaRelationshipHook from './generateUseSearchResultsViaRelationshipHook'; +import { EmbeddedListSearchSection } from '../../components/styled/search/EmbeddedListSearchSection'; const ImpactAnalysisWrapper = styled.div` flex: 1; @@ -61,7 +61,7 @@ export const ImpactAnalysis = ({ urn, direction }: Props) => { return ( - { return ( - { + const [showExpandedLogs, setShowExpandedLogs] = useState(true); const { data, loading, error } = useGetIngestionExecutionRequestQuery({ variables: { urn } }); const output = data?.executionRequest?.result?.report || 'No output found.'; + useEffect(() => { + if (output.length > 100) { + setShowExpandedLogs(false); + } + }, [output, setShowExpandedLogs]); + + const downloadLogs = () => { + downloadFile(output, `exec-${urn}.log`); + }; + + const logs = (showExpandedLogs && output) || output.slice(0, 100); + const result = data?.executionRequest?.result?.status; + + const ResultIcon = result && getExecutionRequestStatusIcon(result); + const resultColor = result && getExecutionRequestStatusDisplayColor(result); + const resultText = result && ( + + {ResultIcon && } + {getExecutionRequestStatusDisplayText(result)} + + ); + const resultSummaryText = + (result && ( + + Ingestion {result === SUCCESS ? 'successfully completed' : 'completed with errors'}. + + )) || + undefined; + return ( Execution Details} + width={800} + footer={} + style={modalStyle} + bodyStyle={modalBodyStyle} + title={ + + Ingestion Run Details + + } visible={visible} onCancel={onClose} > {!data && loading && } {error && message.error('Failed to load execution details :(')}
- Captured Output - -
{`${output}`}
-
+ + Status + {resultText} + {resultSummaryText} + + {result === SUCCESS && ( + + {data?.executionRequest?.id && } + + )} + + Logs + + + View logs that were collected during the ingestion run. + + + + +
{`${logs}${!showExpandedLogs && '...'}`}
+ {!showExpandedLogs && ( + setShowExpandedLogs(true)}> + Show More + + )} +
+
); diff --git a/datahub-web-react/src/app/ingest/source/IngestedAssets.tsx b/datahub-web-react/src/app/ingest/source/IngestedAssets.tsx new file mode 100644 index 0000000000000..7514e37c1c8bf --- /dev/null +++ b/datahub-web-react/src/app/ingest/source/IngestedAssets.tsx @@ -0,0 +1,135 @@ +import { Button, Typography } from 'antd'; +import React, { useState } from 'react'; +import styled from 'styled-components'; +import { useGetSearchResultsForMultipleQuery } from '../../../graphql/search.generated'; +import { EmbeddedListSearchModal } from '../../entity/shared/components/styled/search/EmbeddedListSearchModal'; +import { ANTD_GRAY } from '../../entity/shared/constants'; +import { formatNumber } from '../../shared/formatNumber'; +import { Message } from '../../shared/Message'; +import { extractEntityTypeCountsFromFacets } from './utils'; + +const HeaderContainer = styled.div` + display: flex; + justify-content: space-between; +`; + +const TitleContainer = styled.div``; + +const TotalContainer = styled.div` + display: flex; + flex-direction: column; + justify-content: right; + align-items: end; +`; + +const TotalText = styled(Typography.Text)` + font-size: 16px; + color: ${ANTD_GRAY[8]}; +`; + +const EntityCountsContainer = styled.div` + display: flex; + justify-content: left; + align-items: center; +`; + +const EntityCount = styled.div` + margin-right: 40px; + display: flex; + flex-direction: column; + align-items: flex-start; +`; + +const ViewAllButton = styled(Button)` + padding: 0px; + margin-top: 4px; +`; + +type Props = { + id: string; +}; + +export default function IngestedAssets({ id }: Props) { + // First thing to do is to search for all assets with the id as the run id! + const [showAssetSearch, setShowAssetSearch] = useState(false); + + // Execute search + const { data, loading, error } = useGetSearchResultsForMultipleQuery({ + variables: { + input: { + query: '*', + start: 0, + count: 1, + filters: [ + { + field: 'runId', + value: id, + }, + ], + }, + }, + }); + + // Parse filter values to get results. + const facets = data?.searchAcrossEntities?.facets; + + // Extract facets to construct the per-entity type breakdown stats + const hasEntityTypeFacet = (facets?.findIndex((facet) => facet.field === 'entity') || -1) >= 0; + const entityTypeFacets = + (hasEntityTypeFacet && facets?.filter((facet) => facet.field === 'entity')[0]) || undefined; + const hasSubTypeFacet = (facets?.findIndex((facet) => facet.field === 'typeNames') || -1) >= 0; + const subTypeFacets = (hasSubTypeFacet && facets?.filter((facet) => facet.field === 'typeNames')[0]) || undefined; + const countsByEntityType = + (entityTypeFacets && extractEntityTypeCountsFromFacets(entityTypeFacets, subTypeFacets)) || []; + + // The total number of assets ingested + const total = data?.searchAcrossEntities?.total || 0; + + return ( + <> + {error && } + + + Ingested Assets + {(loading && Loading...) || ( + <> + {(total > 0 && ( + + The following asset types were ingested during this run. + + )) || No assets were ingested.} + + )} + + {!loading && ( + + Total + + {formatNumber(total)} assets + + + )} + + + {countsByEntityType.map((entityCount) => ( + + + {formatNumber(entityCount.count)} + + {entityCount.displayName} + + ))} + + setShowAssetSearch(true)}> + View All + + {showAssetSearch && ( + setShowAssetSearch(false)} + /> + )} + + ); +} diff --git a/datahub-web-react/src/app/ingest/source/IngestionSourceExecutionList.tsx b/datahub-web-react/src/app/ingest/source/IngestionSourceExecutionList.tsx index 4fa839c95cc9d..dc4854a510060 100644 --- a/datahub-web-react/src/app/ingest/source/IngestionSourceExecutionList.tsx +++ b/datahub-web-react/src/app/ingest/source/IngestionSourceExecutionList.tsx @@ -19,6 +19,17 @@ const ListContainer = styled.div` margin-left: 28px; `; +const StatusContainer = styled.div` + display: flex; + justify-content: left; + align-items: center; +`; + +const StatusButton = styled(Button)` + padding: 0px; + margin: 0px; +`; + type Props = { urn: string; lastRefresh: number; @@ -127,19 +138,19 @@ export const IngestionSourceExecutionList = ({ urn, lastRefresh, onRefresh }: Pr title: 'Status', dataIndex: 'status', key: 'status', - render: (status: any) => { + render: (status: any, record) => { const Icon = getExecutionRequestStatusIcon(status); const text = getExecutionRequestStatusDisplayText(status); const color = getExecutionRequestStatusDisplayColor(status); return ( - <> -
- {Icon && } + + {Icon && } + setFocusExecutionUrn(record.urn)}> {text || 'N/A'} -
- + + ); }, }, diff --git a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx index 45b2e80b9479b..a13e40aabe2e8 100644 --- a/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx +++ b/datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx @@ -28,6 +28,7 @@ import { UpdateIngestionSourceInput } from '../../../types.generated'; import { capitalizeFirstLetter } from '../../shared/textUtil'; import { SearchBar } from '../../search/SearchBar'; import { useEntityRegistry } from '../../useEntityRegistry'; +import { ExecutionDetailsModal } from './ExecutionRequestDetailsModal'; const SourceContainer = styled.div``; @@ -50,6 +51,11 @@ const StatusContainer = styled.div` align-items: center; `; +const StatusButton = styled(Button)` + padding: 0px; + margin: 0px; +`; + const ActionButtonContainer = styled.div` display: flex; justify-content: right; @@ -84,6 +90,7 @@ export const IngestionSourceList = () => { const [isBuildingSource, setIsBuildingSource] = useState(false); const [focusSourceUrn, setFocusSourceUrn] = useState(undefined); + const [focusExecutionUrn, setFocusExecutionUrn] = useState(undefined); const [lastRefresh, setLastRefresh] = useState(0); // Set of removed urns used to account for eventual consistency const [removedUrns, setRemovedUrns] = useState([]); @@ -348,16 +355,18 @@ export const IngestionSourceList = () => { title: 'Last Status', dataIndex: 'lastExecStatus', key: 'lastExecStatus', - render: (status: any) => { + render: (status: any, record) => { const Icon = getExecutionRequestStatusIcon(status); const text = getExecutionRequestStatusDisplayText(status); const color = getExecutionRequestStatusDisplayColor(status); return ( {Icon && } - - {text || 'N/A'} - + setFocusExecutionUrn(record.lastExecUrn)}> + + {text || 'N/A'} + + ); }, @@ -405,6 +414,8 @@ export const IngestionSourceList = () => { schedule: source.schedule?.interval, timezone: source.schedule?.timezone, execCount: source.executions?.total || 0, + lastExecUrn: + source.executions?.total && source.executions?.total > 0 && source.executions?.executionRequests[0].urn, lastExecTime: source.executions?.total && source.executions?.total > 0 && @@ -490,6 +501,13 @@ export const IngestionSourceList = () => { onSubmit={onSubmit} onCancel={onCancel} /> + {focusExecutionUrn && ( + setFocusExecutionUrn(undefined)} + /> + )} ); }; diff --git a/datahub-web-react/src/app/ingest/source/utils.ts b/datahub-web-react/src/app/ingest/source/utils.ts index ec6cff577180e..7c50a0d2a2111 100644 --- a/datahub-web-react/src/app/ingest/source/utils.ts +++ b/datahub-web-react/src/app/ingest/source/utils.ts @@ -2,6 +2,8 @@ import YAML from 'yamljs'; import { CheckCircleOutlined, CloseCircleOutlined, LoadingOutlined } from '@ant-design/icons'; import { ANTD_GRAY, REDESIGN_COLORS } from '../../entity/shared/constants'; import { SOURCE_TEMPLATE_CONFIGS } from './conf/sources'; +import { EntityType, FacetMetadata } from '../../../types.generated'; +import { capitalizeFirstLetterOnly, pluralize } from '../../shared/textUtil'; export const sourceTypeToIconUrl = (type: string) => { return SOURCE_TEMPLATE_CONFIGS.find((config) => config.type === type)?.logoUrl; @@ -61,3 +63,54 @@ export const getExecutionRequestStatusDisplayColor = (status: string) => { ANTD_GRAY[7] ); }; + +const ENTITIES_WITH_SUBTYPES = new Set([ + EntityType.Dataset.toLowerCase(), + EntityType.Container.toLowerCase(), + EntityType.Notebook.toLowerCase(), +]); + +type EntityTypeCount = { + count: number; + displayName: string; +}; + +/** + * Extract entity type counts to display in the ingestion summary. + * + * @param entityTypeFacets the filter facets for entity type. + * @param subTypeFacets the filter facets for sub types. + */ +export const extractEntityTypeCountsFromFacets = ( + entityTypeFacets: FacetMetadata, + subTypeFacets?: FacetMetadata | null, +): EntityTypeCount[] => { + const finalCounts: EntityTypeCount[] = []; + + if (subTypeFacets) { + subTypeFacets.aggregations.forEach((agg) => + finalCounts.push({ + count: agg.count, + displayName: pluralize(agg.count, capitalizeFirstLetterOnly(agg.value) || ''), + }), + ); + entityTypeFacets.aggregations + .filter((agg) => !ENTITIES_WITH_SUBTYPES.has(agg.value.toLowerCase())) + .forEach((agg) => + finalCounts.push({ + count: agg.count, + displayName: pluralize(agg.count, capitalizeFirstLetterOnly(agg.value) || ''), + }), + ); + } else { + // Only use Entity Types- no subtypes. + entityTypeFacets.aggregations.forEach((agg) => + finalCounts.push({ + count: agg.count, + displayName: pluralize(agg.count, capitalizeFirstLetterOnly(agg.value) || ''), + }), + ); + } + + return finalCounts; +}; diff --git a/datahub-web-react/src/app/search/utils/csvUtils.ts b/datahub-web-react/src/app/search/utils/csvUtils.ts index 7ed845a3dbe91..f701b90ab78e4 100644 --- a/datahub-web-react/src/app/search/utils/csvUtils.ts +++ b/datahub-web-react/src/app/search/utils/csvUtils.ts @@ -1,4 +1,4 @@ -function downloadFile(data: string, title: string) { +export function downloadFile(data: string, title: string) { const blobx = new Blob([data], { type: 'text/plain' }); // ! Blob const elemx = window.document.createElement('a'); elemx.href = window.URL.createObjectURL(blobx); // ! createObjectURL diff --git a/datahub-web-react/src/app/shared/textUtil.ts b/datahub-web-react/src/app/shared/textUtil.ts index 2ce838998b92f..5a258aff36406 100644 --- a/datahub-web-react/src/app/shared/textUtil.ts +++ b/datahub-web-react/src/app/shared/textUtil.ts @@ -27,3 +27,7 @@ export function groupIdTextValidation(str: string) { if (str.indexOf(':') > 0) return false; return true; } + +export function pluralize(count: number, noun: string, suffix = 's') { + return `${noun}${count !== 1 ? suffix : ''}`; +} diff --git a/datahub-web-react/src/graphql/ingestion.graphql b/datahub-web-react/src/graphql/ingestion.graphql index 65449bcda84c9..f8529bc356c40 100644 --- a/datahub-web-react/src/graphql/ingestion.graphql +++ b/datahub-web-react/src/graphql/ingestion.graphql @@ -22,6 +22,7 @@ query listIngestionSources($input: ListIngestionSourcesInput!) { total executionRequests { urn + id input { requestedAt } @@ -56,6 +57,7 @@ query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) { total executionRequests { urn + id input { requestedAt source { @@ -75,6 +77,7 @@ query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) { query getIngestionExecutionRequest($urn: String!) { executionRequest(urn: $urn) { urn + id input { source { type diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index 46bc682f7a8ac..f7915f46c7fd9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -45,6 +45,15 @@ public interface EntitySearchService { */ void deleteDocument(@Nonnull String entityName, @Nonnull String docId); + /** + * Appends a run id to the list for a certain document + * + * @param entityName name of the entity + * @param urn the urn of the user + * @param runId the ID of the run + */ + void appendRunId(@Nonnull String entityName, @Nonnull Urn urn, @Nullable String runId); + /** * Gets a list of documents that match given search request. The results are aggregated and filters are applied to the * search hits and not the aggregation results. diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index ae40f9af23e4d..4e0eb2489dbee 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -12,8 +12,10 @@ import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO; import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; import com.linkedin.metadata.search.utils.ESUtils; +import com.linkedin.metadata.search.utils.SearchUtils; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -24,6 +26,7 @@ @RequiredArgsConstructor public class ElasticSearchService implements EntitySearchService { + private static final int MAX_RUN_IDS_INDEXED = 25; // Save the previous 25 run ids in the index. private final EntityIndexBuilders indexBuilders; private final ESSearchDAO esSearchDAO; private final ESBrowseDAO esBrowseDAO; @@ -57,6 +60,33 @@ public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) { esWriteDAO.deleteDocument(entityName, docId); } + @Override + public void appendRunId(@Nonnull String entityName, @Nonnull Urn urn, @Nullable String runId) { + final Optional maybeDocId = SearchUtils.getDocId(urn); + if (!maybeDocId.isPresent()) { + log.warn(String.format("Failed to append run id, could not generate a doc id for urn %s", urn)); + return; + } + final String docId = maybeDocId.get(); + log.debug(String.format("Appending run id for entityName: %s, docId: %s", entityName, docId)); + esWriteDAO.applyScriptUpdate(entityName, docId, + /* + Script used to apply updates to the runId field of the index. + This script saves the past N run ids which touched a particular URN in the search index. + It only adds a new run id if it is not already stored inside the list. (List is unique AND ordered) + */ + String.format( + "if (ctx._source.containsKey('runId')) { " + + "if (!ctx._source.runId.contains('%s')) { " + + "ctx._source.runId.add('%s'); " + + "if (ctx._source.runId.length > %s) { ctx._source.runId.remove(0) } } " + + "} else { ctx._source.runId = ['%s'] }", + runId, + runId, + MAX_RUN_IDS_INDEXED, + runId)); + } + @Nonnull @Override public SearchResult search(@Nonnull String entityName, @Nonnull String input, @Nullable Filter postFilters, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java index 9ad3374855ee0..b8c07ce7bb5f6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java @@ -19,7 +19,11 @@ private MappingsBuilder() { public static Map getMappings(@Nonnull final EntitySpec entitySpec) { Map mappings = new HashMap<>(); + + // Fixed fields mappings.put("urn", getMappingsForUrn()); + mappings.put("runId", getMappingsForRunId()); + entitySpec.getSearchableFieldSpecs() .forEach(searchableFieldSpec -> mappings.putAll(getMappingsForField(searchableFieldSpec))); entitySpec.getSearchScoreFieldSpecs() @@ -31,6 +35,10 @@ private static Map getMappingsForUrn() { return ImmutableMap.builder().put("type", "keyword").build(); } + private static Map getMappingsForRunId() { + return ImmutableMap.builder().put("type", "keyword").build(); + } + private static Map getMappingsForField(@Nonnull final SearchableFieldSpec searchableFieldSpec) { FieldType fieldType = searchableFieldSpec.getSearchableAnnotation().getFieldType(); boolean addToFilters = searchableFieldSpec.getSearchableAnnotation().isAddToFilters(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index d557f3227e58d..cda77412baeba 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.script.Script; @Slf4j @@ -54,6 +55,14 @@ public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) { bulkProcessor.add(new DeleteRequest(indexName).id(docId)); } + /** + * Applies a script to a particular document + */ + public void applyScriptUpdate(@Nonnull String entityName, @Nonnull String docId, @Nonnull String script) { + final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName)); + bulkProcessor.add(new UpdateRequest(indexName, docId).script(new Script(script))); + } + /** * Clear all documents in all the indices */ diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java index 5a26131b45594..4c527b7cef7f2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java @@ -47,7 +47,10 @@ public Optional transformSnapshot(final RecordTemplate snapshot, final E return Optional.of(searchDocument.toString()); } - public Optional transformAspect(final Urn urn, final RecordTemplate aspect, final AspectSpec aspectSpec, + public Optional transformAspect( + final Urn urn, + final RecordTemplate aspect, + final AspectSpec aspectSpec, final Boolean forDelete) { final Map> extractedSearchableFields = FieldExtractor.extractFields(aspect, aspectSpec.getSearchableFieldSpecs()); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilderTest.java index 9ed1fc337e29f..8038c5561a5d7 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilderTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilderTest.java @@ -16,8 +16,9 @@ public void testMappingsBuilder() { Map result = MappingsBuilder.getMappings(TestEntitySpecBuilder.getSpec()); assertEquals(result.size(), 1); Map properties = (Map) result.get("properties"); - assertEquals(properties.size(), 14); + assertEquals(properties.size(), 15); assertEquals(properties.get("urn"), ImmutableMap.of("type", "keyword")); + assertEquals(properties.get("runId"), ImmutableMap.of("type", "keyword")); assertTrue(properties.containsKey("browsePaths")); // KEYWORD Map keyPart3Field = (Map) properties.get("keyPart3"); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 6e162af48fa57..0ab79c2007ef4 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -44,6 +44,7 @@ import java.util.Optional; import java.util.Set; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Import; @@ -115,7 +116,7 @@ public void invoke(@Nonnull MetadataChangeLog event) { updateTimeseriesFields(event.getEntityType(), event.getAspectName(), urn, aspect, aspectSpec, event.getSystemMetadata()); } else { - updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect); + updateSearchService(entitySpec.getName(), urn, aspectSpec, aspect, event.hasSystemMetadata() ? event.getSystemMetadata().getRunId() : null); updateGraphService(urn, aspectSpec, aspect); updateSystemMetadata(event.getSystemMetadata(), urn, aspectSpec, aspect); } @@ -185,7 +186,7 @@ private void updateGraphService(Urn urn, AspectSpec aspectSpec, RecordTemplate a /** * Process snapshot and update search index */ - private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { + private void updateSearchService(String entityName, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect, @Nullable String runId) { Optional searchDocument; try { searchDocument = _searchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, false); @@ -277,7 +278,7 @@ private void deleteSearchData(Urn urn, String entityName, AspectSpec aspectSpec, Optional searchDocument; try { - searchDocument = _searchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, true); + searchDocument = _searchDocumentTransformer.transformAspect(urn, aspect, aspectSpec, true); // TODO } catch (Exception e) { log.error("Error in getting documents from aspect: {} for aspect {}", e, aspectSpec.getName()); return; diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java index 2579a736d5686..f7913f0feaf09 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java @@ -182,6 +182,7 @@ public BrowseResult browse( } @SneakyThrows + @Deprecated public void update(@Nonnull final Entity entity, @Nonnull final Authentication authentication) throws RemoteInvocationException { Objects.requireNonNull(authentication, "authentication must not be null"); @@ -192,6 +193,7 @@ public void update(@Nonnull final Entity entity, @Nonnull final Authentication a } @SneakyThrows + @Deprecated public void updateWithSystemMetadata( @Nonnull final Entity entity, @Nullable final SystemMetadata systemMetadata, @@ -206,15 +208,17 @@ public void updateWithSystemMetadata( auditStamp.setTime(Clock.systemUTC().millis()); _entityService.ingestEntity(entity, auditStamp, systemMetadata); + tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata); } @SneakyThrows + @Deprecated public void batchUpdate(@Nonnull final Set entities, @Nonnull final Authentication authentication) throws RemoteInvocationException { AuditStamp auditStamp = new AuditStamp(); auditStamp.setActor(Urn.createFromString(authentication.getActor().toUrnStr())); auditStamp.setTime(Clock.systemUTC().millis()); - _entityService.ingestEntities(entities.stream().collect(Collectors.toList()), auditStamp, ImmutableList.of()); + _entityService.ingestEntities(entities.stream().collect(Collectors.toList()), auditStamp, ImmutableList.of()); } /** @@ -433,6 +437,7 @@ public String ingestProposal(@Nonnull MetadataChangeProposal metadataChangePropo Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp).getUrn(); additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp)); + tryIndexRunId(urn, metadataChangeProposal.getSystemMetadata()); return urn.toString(); } @@ -475,4 +480,10 @@ public void producePlatformEvent( @Nonnull Authentication authentication) throws Exception { _eventProducer.producePlatformEvent(name, key, event); } + + private void tryIndexRunId(Urn entityUrn, @Nullable SystemMetadata systemMetadata) { + if (systemMetadata != null && systemMetadata.hasRunId()) { + _entitySearchService.appendRunId(entityUrn.getEntityType(), entityUrn, systemMetadata.getRunId()); + } + } } diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index e65269d244eb2..1e6537da6234a 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -12,6 +12,7 @@ import com.linkedin.metadata.entity.ValidationException; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.restli.RestliUtil; +import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.parseq.Task; @@ -35,6 +36,7 @@ import javax.inject.Named; import lombok.extern.slf4j.Slf4j; +import static com.linkedin.metadata.resources.entity.ResourceUtils.*; import static com.linkedin.metadata.resources.restli.RestliConstants.*; @@ -61,6 +63,10 @@ public class AspectResource extends CollectionResourceTaskTemplate ingestProposal( try { Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp).getUrn(); additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp)); + tryIndexRunId(urn, metadataChangeProposal.getSystemMetadata(), _entitySearchService); return urn.toString(); } catch (ValidationException e) { throw new RestLiServiceException(HttpStatus.S_422_UNPROCESSABLE_ENTITY, e.getMessage()); } }, MetricRegistry.name(this.getClass(), "ingestProposal")); } - } diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java index bacd97676290a..c5dc43a8ee6e1 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java @@ -67,6 +67,7 @@ import org.apache.maven.artifact.versioning.ComparableVersion; import static com.linkedin.metadata.entity.ValidationUtils.*; +import static com.linkedin.metadata.resources.entity.ResourceUtils.*; import static com.linkedin.metadata.resources.restli.RestliConstants.*; import static com.linkedin.metadata.utils.PegasusUtils.*; @@ -205,6 +206,7 @@ public Task ingest(@ActionParam(PARAM_ENTITY) @Nonnull Entity entity, final SystemMetadata finalSystemMetadata = systemMetadata; return RestliUtil.toTask(() -> { _entityService.ingestEntity(entity, auditStamp, finalSystemMetadata); + tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata, _entitySearchService); return null; }, MetricRegistry.name(this.getClass(), "ingest")); } @@ -239,8 +241,14 @@ public Task batchIngest(@ActionParam(PARAM_ENTITIES) @Nonnull Entity[] ent .map(systemMetadata -> populateDefaultFieldsIfEmpty(systemMetadata)) .collect(Collectors.toList()); + SystemMetadata[] finalSystemMetadataList1 = systemMetadataList; return RestliUtil.toTask(() -> { _entityService.ingestEntities(Arrays.asList(entities), auditStamp, finalSystemMetadataList); + for (int i = 0; i < entities.length; i++) { + SystemMetadata systemMetadata = finalSystemMetadataList1[i]; + Entity entity = entities[i]; + tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata, _entitySearchService); + } return null; }, MetricRegistry.name(this.getClass(), "batchIngest")); } diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/ResourceUtils.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/ResourceUtils.java index 679cb421bc5fe..5918f453c6d88 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/ResourceUtils.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/ResourceUtils.java @@ -1,7 +1,11 @@ package com.linkedin.metadata.resources.entity; +import com.linkedin.common.urn.Urn; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.mxe.SystemMetadata; import java.util.Set; +import javax.annotation.Nullable; public class ResourceUtils { @@ -12,4 +16,13 @@ private ResourceUtils() { public static Set getAllAspectNames(final EntityService entityService, final String entityName) { return entityService.getEntityAspectNames(entityName); } + + public static void tryIndexRunId( + final Urn urn, + final @Nullable SystemMetadata systemMetadata, + final EntitySearchService entitySearchService) { + if (systemMetadata != null && systemMetadata.hasRunId()) { + entitySearchService.appendRunId(urn.getEntityType(), urn, systemMetadata.getRunId()); + } + } }