Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API module # use OpenLineage model in OL resource #1593

Merged
merged 4 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
compile "org.postgresql:postgresql:${postgresqlVersion}"
compile 'com.graphql-java:graphql-java:17.1'
compile 'com.graphql-java-kickstart:graphql-java-servlet:11.1.1'
implementation 'io.openlineage:openlineage-java:0.2.0'

testCompile "io.dropwizard:dropwizard-testing:${dropwizardVersion}"
testCompile "org.jdbi:jdbi3-testing:${jdbi3Version}"
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.openlineage.client.OpenLineage;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import javax.validation.Valid;
Expand All @@ -38,7 +39,6 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.service.ServiceFactory;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

@Slf4j
Expand All @@ -57,7 +57,7 @@ public OpenLineageResource(@NonNull final ServiceFactory serviceFactory) {
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
public void create(
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
@Valid @NotNull OpenLineage.RunEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
openLineageService
.createAsync(event)
Expand Down
9 changes: 5 additions & 4 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.Hashing;
import io.dropwizard.jackson.Jackson;
import io.openlineage.client.OpenLineage;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
Expand All @@ -58,7 +59,6 @@
import marquez.common.models.Version;
import marquez.service.models.DatasetMeta;
import marquez.service.models.DbTableMeta;
import marquez.service.models.LineageEvent;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Triple;

Expand Down Expand Up @@ -207,7 +207,7 @@ public static Version newDatasetVersionFor(
String sourceName,
String physicalName,
String datasetName,
List<LineageEvent.SchemaField> fields,
List<OpenLineage.SchemaDatasetFacetFields> fields,
UUID runId) {
DatasetVersionData data =
DatasetVersionData.builder()
Expand Down Expand Up @@ -274,7 +274,8 @@ private static class DatasetVersionData {
private UUID runId;

public static class DatasetVersionDataBuilder {
private static final Function<LineageEvent.SchemaField, Triple<String, String, String>>
private static final Function<
OpenLineage.SchemaDatasetFacetFields, Triple<String, String, String>>
schemaFieldToTripleFunction =
f ->
Triple.of(
Expand All @@ -295,7 +296,7 @@ public static class DatasetVersionDataBuilder {
private UUID runId;

DatasetVersionData.DatasetVersionDataBuilder schemaFields(
List<LineageEvent.SchemaField> schemaFields) {
List<OpenLineage.SchemaDatasetFacetFields> schemaFields) {
if (schemaFields == null) return this;
setFields(schemaFields, schemaFieldToTripleFunction);
return this;
Expand Down
7 changes: 3 additions & 4 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package marquez.db;

import com.google.common.collect.ImmutableSet;
import io.openlineage.client.OpenLineage;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,7 +41,6 @@
import marquez.service.DatasetService;
import marquez.service.models.DatasetMeta;
import marquez.service.models.DatasetVersion;
import marquez.service.models.LineageEvent.SchemaField;
import marquez.service.models.Run;
import marquez.service.models.StreamMeta;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
Expand Down Expand Up @@ -131,15 +131,14 @@ default PGobject toPgObjectFields(List<Field> fields) {
}
}

default PGobject toPgObjectSchemaFields(List<SchemaField> fields) {
default PGobject toPgObjectSchemaFields(List<OpenLineage.SchemaDatasetFacetFields> fields) {
return toPgObjectFields(toFields(fields));
}

default List<Field> toFields(List<SchemaField> fields) {
default List<Field> toFields(List<OpenLineage.SchemaDatasetFacetFields> fields) {
if (fields == null) {
return null;
}
OpenLineageDao openLineageDao = createOpenLineageDao();
return fields.stream()
.map(
f ->
Expand Down
95 changes: 47 additions & 48 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package marquez.db;

import static io.openlineage.client.OpenLineage.Dataset;
import static io.openlineage.client.OpenLineage.DatasetFacets;
import static io.openlineage.client.OpenLineage.InputDataset;
import static io.openlineage.client.OpenLineage.Job;
import static io.openlineage.client.OpenLineage.JobFacets;
import static io.openlineage.client.OpenLineage.OutputDataset;
import static io.openlineage.client.OpenLineage.RunEvent;
import static io.openlineage.client.OpenLineage.SchemaDatasetFacetFields;
import static io.openlineage.client.OpenLineage.SourceCodeLocationJobFacet;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.openlineage.client.OpenLineage;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
Expand Down Expand Up @@ -35,12 +46,6 @@
import marquez.db.models.SourceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.db.models.UpdateLineageRow.DatasetRecord;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
import marquez.service.models.LineageEvent.Job;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;
Expand Down Expand Up @@ -71,7 +76,7 @@ void createLineageEvent(
String producer);

@Transaction
default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
default UpdateLineageRow updateMarquezModel(RunEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
RunState runState = getRunState(event.getEventType());
if (event.getEventType() != null && runState.isDone()) {
Expand All @@ -80,7 +85,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
return updateLineageRow;
}

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
default UpdateLineageRow updateBaseMarquezModel(RunEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
Expand Down Expand Up @@ -115,11 +120,13 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
UUID.randomUUID(), now, Utils.toJson(context), Utils.checksumFor(context));
bag.setJobContext(jobContext);

String location = null;
if (event.getJob().getFacets() != null
&& event.getJob().getFacets().getSourceCodeLocation() != null) {
location = getUrlOrPlaceholder(event.getJob().getFacets().getSourceCodeLocation().getUrl());
}
String location =
Optional.of(event.getJob())
.map(Job::getFacets)
.map(JobFacets::getSourceCodeLocation)
.map(SourceCodeLocationJobFacet::getUrl)
.map(URI::toString)
.orElse("");

JobRow job =
jobDao.upsertJob(
Expand Down Expand Up @@ -164,14 +171,12 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
}
}

UUID runUuid = runToUuid(event.getRun().getRunId());

RunRow run;
if (event.getEventType() != null) {
RunState runStateType = getRunState(event.getEventType());
run =
runDao.upsert(
runUuid,
event.getRun().getRunId(),
event.getRun().getRunId(),
now,
null,
Expand All @@ -187,7 +192,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
} else {
run =
runDao.upsert(
runUuid,
event.getRun().getRunId(),
event.getRun().getRunId(),
now,
null,
Expand Down Expand Up @@ -219,12 +224,12 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
datasetInputs = new ArrayList<>();
for (Dataset ds : event.getInputs()) {
for (InputDataset ds : event.getInputs()) {
DatasetRecord record =
upsertLineageDataset(
ds,
now,
runUuid,
event.getRun().getRunId(),
true,
namespaceDao,
sourceDao,
Expand All @@ -240,12 +245,12 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
List<DatasetRecord> datasetOutputs = null;
if (event.getOutputs() != null) {
datasetOutputs = new ArrayList<>();
for (Dataset ds : event.getOutputs()) {
for (OutputDataset ds : event.getOutputs()) {
DatasetRecord record =
upsertLineageDataset(
ds,
now,
runUuid,
event.getRun().getRunId(),
false,
namespaceDao,
sourceDao,
Expand All @@ -261,7 +266,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
return bag;
}

default Set<DatasetId> toDatasetId(List<Dataset> datasets) {
default Set<DatasetId> toDatasetId(List<InputDataset> datasets) {
Set<DatasetId> set = new HashSet<>();
if (datasets == null) {
return set;
Expand All @@ -275,7 +280,7 @@ default Set<DatasetId> toDatasetId(List<Dataset> datasets) {
}

default void updateMarquezOnComplete(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
RunEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
Expand Down Expand Up @@ -331,7 +336,7 @@ default DatasetRecord upsertLineageDataset(
getSourceType(ds),
now,
ds.getFacets().getDataSource().getName(),
getUrlOrPlaceholder(ds.getFacets().getDataSource().getUri()));
getUrlOrPlaceholder(ds.getFacets().getDataSource().getUri().toString()));
} else {
source =
sourceDao.upsertOrDefault(
Expand Down Expand Up @@ -363,10 +368,10 @@ default DatasetRecord upsertLineageDataset(
ds.getName(),
dsDescription);

List<SchemaField> fields =
List<SchemaDatasetFacetFields> fields =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getSchema)
.map(SchemaDatasetFacet::getFields)
.map(OpenLineage.SchemaDatasetFacet::getFields)
.orElse(null);

final DatasetRow dsRow = datasetRow;
Expand Down Expand Up @@ -403,7 +408,7 @@ default DatasetRecord upsertLineageDataset(
});
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
if (fields != null) {
for (SchemaField field : fields) {
for (SchemaDatasetFacetFields field : fields) {
DatasetFieldRow datasetFieldRow =
datasetFieldDao.upsert(
UUID.randomUUID(),
Expand Down Expand Up @@ -463,7 +468,7 @@ default RunState getRunState(String eventType) {
}
}

default Map<String, String> createRunArgs(LineageEvent event) {
default Map<String, String> createRunArgs(RunEvent event) {
Map<String, String> args = new LinkedHashMap<>();
if (event.getRun().getFacets() != null) {
if (event.getRun().getFacets().getNominalTime() != null) {
Expand All @@ -477,33 +482,27 @@ default Map<String, String> createRunArgs(LineageEvent event) {
}
}
if (event.getRun().getFacets().getParent() != null) {
args.put("run_id", event.getRun().getFacets().getParent().getRun().getRunId());
args.put("run_id", event.getRun().getFacets().getParent().getRun().getRunId().toString());
args.put("name", event.getRun().getFacets().getParent().getJob().getName());
args.put("namespace", event.getRun().getFacets().getParent().getJob().getNamespace());
}
}
return args;
}

default Map<String, String> buildJobContext(LineageEvent event) {
default Map<String, String> buildJobContext(RunEvent event) {
Map<String, String> args = new LinkedHashMap<>();
if (event.getJob().getFacets() != null) {
if (event.getJob().getFacets().getSourceCodeLocation() != null) {
if (event.getJob().getFacets().getSourceCodeLocation().getType() != null) {
args.put(
"job.facets.sourceCodeLocation.type",
event.getJob().getFacets().getSourceCodeLocation().getType());
}
if (event.getJob().getFacets().getSourceCodeLocation().getUrl() != null) {
args.put(
"job.facets.sourceCodeLocation.url",
event.getJob().getFacets().getSourceCodeLocation().getUrl());
}
}
if (event.getJob().getFacets().getSql() != null) {
args.put("sql", event.getJob().getFacets().getSql().getQuery());
}
}
Optional<JobFacets> jobFacets = Optional.of(event.getJob()).map(Job::getFacets);
jobFacets.map(f -> f.getSql()).ifPresent(sql -> args.put("sql", sql.getQuery()));

Optional<SourceCodeLocationJobFacet> sourceCode = jobFacets.map(f -> f.getSourceCodeLocation());
sourceCode
.map(sc -> sc.getType())
.ifPresent(type -> args.put("job.facets.sourceCodeLocation.type", type));
sourceCode
.map(sc -> sc.getUrl())
.ifPresent(url -> args.put("job.facets.sourceCodeLocation.url", url.toString()));
sourceCode.map(sc -> sc.getUrl()).ifPresent(url -> args.put("sql", url.toString()));

return args;
}
Expand All @@ -517,7 +516,7 @@ default UUID runToUuid(String runId) {
}
}

default PGobject createJsonArray(LineageEvent event, ObjectMapper mapper) {
default PGobject createJsonArray(RunEvent event, ObjectMapper mapper) {
try {
PGobject jsonObject = new PGobject();
jsonObject.setType("json");
Expand Down
10 changes: 5 additions & 5 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static marquez.db.OpenLineageDao.DEFAULT_NAMESPACE_OWNER;

import com.google.common.collect.ImmutableSet;
import io.openlineage.client.OpenLineage;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
Expand All @@ -41,7 +42,6 @@
import marquez.db.models.RunRow;
import marquez.service.models.Dataset;
import marquez.service.models.JobMeta;
import marquez.service.models.LineageEvent.SchemaField;
import marquez.service.models.Run;
import marquez.service.models.RunMeta;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
Expand Down Expand Up @@ -193,7 +193,7 @@ public interface RunDao extends BaseDao {
+ "RETURNING *")
ExtendedRunRow upsert(
UUID runUuid,
String externalId,
UUID externalId,
Instant now,
UUID jobVersionUuid,
UUID runArgsUuid,
Expand Down Expand Up @@ -242,7 +242,7 @@ ExtendedRunRow upsert(
+ "RETURNING *")
ExtendedRunRow upsert(
UUID runUuid,
String externalId,
UUID externalId,
Instant now,
UUID jobVersionUuid,
UUID runArgsUuid,
Expand Down Expand Up @@ -306,14 +306,14 @@ default void upsertOutputDatasetsFor(UUID runUuid, ImmutableSet<DatasetId> runOu
}
}

default List<SchemaField> toSchemaFields(List<Field> fields) {
default List<OpenLineage.SchemaDatasetFacetFields> toSchemaFields(List<Field> fields) {
if (fields == null) {
return null;
}
return fields.stream()
.map(
f ->
SchemaField.builder()
new OpenLineage.SchemaDatasetFacetFieldsBuilder()
.name(f.getName().getValue())
.type(f.getType())
.description(f.getDescription().orElse(null))
Expand Down
Loading