Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix computing diff on large resources during updates (#4684) #4726

Merged
merged 5 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ app {
event-log = ${app.defaults.event-log}
# Reject payloads which contain nexus metadata fields (any field beginning with _)
decoding-option = "strict"
# Do not create a new revision of a resource when the update does not introduce a change
skip-update-no-change = true
}

# Schemas configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverResolution.ResourceRe
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, Resolvers, ResourceResolution}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{Resource, ResourceEvent}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
Expand All @@ -41,9 +41,12 @@ object ResourcesModule extends ModuleDef {

make[ResourcesConfig].from { (config: AppConfig) => config.resources }

make[DetectChange].from { (config: ResourcesConfig) => DetectChange(config.skipUpdateNoChange) }

make[Resources].from {
(
validate: ValidateResource,
detectChange: DetectChange,
fetchContext: FetchContext[ContextRejection],
config: ResourcesConfig,
resolverContextResolution: ResolverContextResolution,
Expand All @@ -54,6 +57,7 @@ object ResourcesModule extends ModuleDef {
) =>
ResourcesImpl(
validate,
detectChange,
fetchContext.mapRejection(ProjectContextRejection),
resolverContextResolution,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverResolution.FetchResource
import ch.epfl.bluebrain.nexus.delta.sdk.resources.NexusSource.DecodingOption
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, Resources, ResourcesConfig, ResourcesImpl, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group, Subject, User}
Expand Down Expand Up @@ -106,9 +106,10 @@ class ResourcesRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues
private def routesWithDecodingOption(implicit decodingOption: DecodingOption): (Route, Resources) = {
val resources = ResourcesImpl(
validator,
DetectChange(enabled = true),
fetchContext,
resolverContextResolution,
ResourcesConfig(eventLogConfig, decodingOption),
ResourcesConfig(eventLogConfig, decodingOption, skipUpdateNoChange = true),
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.sdk.jsonld

import cats.Eq
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.RdfError
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdRejection._
import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContext, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.{CompactedJsonLd, ExpandedJsonLd}
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdRejection._
import ch.epfl.bluebrain.nexus.delta.sdk.model.jsonld.RemoteContextRef
import io.circe.Json

Expand All @@ -27,7 +26,7 @@ import io.circe.Json
* @param graph
* its graph representation
* @param remoteContexts
* it
* the resolved remote contexts
*/
final case class JsonLdAssembly(
id: Iri,
Expand All @@ -46,20 +45,6 @@ final case class JsonLdAssembly(

object JsonLdAssembly {

/**
* Defines the equality between two instances
*
* - If the remote contexts and the local context are the same, then the compacted form will be the same
* - If the graph forms are isomorphic then, the expanded form will be the same
*/
implicit val jsonLdAssemblyEq: Eq[JsonLdAssembly] = Eq.instance { (jsonld1, jsonld2) =>
jsonld1.id == jsonld2.id &&
jsonld1.remoteContexts == jsonld2.remoteContexts &&
jsonld1.compacted.ctx == jsonld2.compacted.ctx &&
jsonld1.graph.isIsomorphic(jsonld2.graph) &&
jsonld1.source == jsonld2.source
}

def apply(
iri: Iri,
source: Json,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ sealed abstract class JsonLdSourceProcessor(implicit api: JsonLdApi) {
protected def getOrGenerateId(iri: Option[Iri], context: ProjectContext): IO[Iri] =
iri.fold(uuidF().map(uuid => context.base.iri / uuid.toString))(IO.pure)

protected def expandSource(
context: ProjectContext,
source: Json
)(implicit rcr: RemoteContextResolution): IO[(ContextValue, ExplainResult[ExpandedJsonLd])] = {
implicit val opts: JsonLdOptions = JsonLdOptions(base = Some(context.base.iri))
ExpandedJsonLd
.explain(source)
.flatMap {
case result if result.value.isEmpty && source.topContextValueOrEmpty.isEmpty =>
val ctx = defaultCtx(context)
ExpandedJsonLd.explain(source.addContext(ctx.contextObj)).map(ctx -> _)
case result =>
IO.pure(source.topContextValueOrEmpty -> result)
}
.adaptError { case err: RdfError => InvalidJsonLdFormat(None, err) }
/**
* Expand the source document using the provided project context and remote context resolution.
*
* If the source does not provide a context, one will be injected from the project base and vocab.
*/
protected def expandSource(projectContext: ProjectContext, source: Json)(implicit
rcr: RemoteContextResolution
): IO[(ContextValue, ExplainResult[ExpandedJsonLd])] = {
implicit val opts: JsonLdOptions = JsonLdOptions(base = Some(projectContext.base.iri))
val sourceContext = source.topContextValueOrEmpty
if (sourceContext.isEmpty) {
val defaultContext = defaultCtx(projectContext)
ExpandedJsonLd.explain(source.addContext(defaultContext.contextObj)).map(defaultContext -> _)
} else {
ExpandedJsonLd.explain(source).map(sourceContext -> _)
}.adaptError { case err: RdfError => InvalidJsonLdFormat(None, err) }
}

protected def checkAndSetSameId(iri: Iri, expanded: ExpandedJsonLd): IO[ExpandedJsonLd] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package ch.epfl.bluebrain.nexus.delta.sdk.resources

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.CompactedJsonLd
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdAssembly
import ch.epfl.bluebrain.nexus.delta.sdk.model.jsonld.RemoteContextRef
import ch.epfl.bluebrain.nexus.delta.sdk.resources.DetectChange.Current
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState
import io.circe.Json

/**
* Detect if the new json-ld state introduces changes compared to the current state
*/
trait DetectChange {
def apply(newValue: JsonLdAssembly, currentState: ResourceState): IO[Boolean] =
apply(
newValue,
Current(currentState.types, currentState.source, currentState.compacted, currentState.remoteContexts)
)

def apply(newValue: JsonLdAssembly, current: Current): IO[Boolean]
}

object DetectChange {

final case class Current(
types: Set[Iri],
source: Json,
compacted: CompactedJsonLd,
remoteContexts: Set[RemoteContextRef]
)

private val Disabled = new DetectChange {

override def apply(newValue: JsonLdAssembly, current: Current): IO[Boolean] = IO.pure(true)
}

/**
* Default implementation
*
* There will be a change if:
* - If there is a change in the resource types
* - If there is a change in one of the remote JSON-LD contexts
* - If there is a change in the local JSON-LD context
* - If there is a change in the rest of the payload
*
* The implementation uses `IO.cede` as comparing source can induce expensive work in the case of large payloads.
*/
private val Impl = new DetectChange {

override def apply(newValue: JsonLdAssembly, current: Current): IO[Boolean] =
IO.cede
.as(
newValue.types != current.types ||
newValue.remoteContexts != current.remoteContexts ||
newValue.compacted.ctx != current.compacted.ctx ||
newValue.source != current.source
)
.guarantee(IO.cede)
}

def apply(enabled: Boolean): DetectChange = if (enabled) Impl else Disabled

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.sdk.resources

import io.circe.Decoder.Result
import io.circe.{Decoder, DecodingFailure, HCursor, Json}
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure}
import pureconfig.{ConfigCursor, ConfigReader}
import pureconfig.ConfigReader
import pureconfig.error.CannotConvert

final case class NexusSource(value: Json) extends AnyVal

Expand All @@ -16,30 +16,20 @@ object NexusSource {

final case object Lenient extends DecodingOption

implicit val decodingOptionConfigReader: ConfigReader[DecodingOption] = {
new ConfigReader[DecodingOption] {
private val stringReader = implicitly[ConfigReader[String]]
override def from(cur: ConfigCursor): ConfigReader.Result[DecodingOption] = {
stringReader.from(cur).flatMap {
case "strict" => Right(Strict)
case "lenient" => Right(Lenient)
case other =>
Left(
ConfigReaderFailures(
ConvertFailure(
CannotConvert(
other,
"DecodingOption",
s"values can only be 'strict' or 'lenient'"
),
cur
)
)
)
}
}
implicit val decodingOptionConfigReader: ConfigReader[DecodingOption] =
ConfigReader.fromString {
case "strict" => Right(Strict)
case "lenient" => Right(Lenient)
case other =>
Left(
CannotConvert(
other,
"DecodingOption",
s"values can only be 'strict' or 'lenient'"
)
)

}
}
}

private val strictDecoder = new Decoder[NexusSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ object Resources {
@SuppressWarnings(Array("OptionGet"))
private[delta] def evaluate(
validateResource: ValidateResource,
detectChange: DetectChange,
clock: Clock[IO]
)(state: Option[ResourceState], cmd: ResourceCommand): IO[ResourceEvent] = {

Expand Down Expand Up @@ -432,10 +433,9 @@ object Resources {
}

for {
state <- stateWhereResourceIsEditable(u)
stateJsonLd <- IO.fromEither(state.toAssembly)
changeDetected = sys.env.get("DISABLE_CHANGE_DETECTION").contains("true") || stateJsonLd =!= u.jsonld
event <- if (u.schemaOpt.isDefined || changeDetected) onChange(state) else fallbackToTag(state)
state <- stateWhereResourceIsEditable(u)
changeDetected <- detectChange(u.jsonld, state)
event <- if (u.schemaOpt.isDefined || changeDetected) onChange(state) else fallbackToTag(state)
} yield event
}

Expand All @@ -452,10 +452,10 @@ object Resources {
def refresh(r: RefreshResource) = {
for {
state <- stateWhereResourceIsEditable(r)
stateJsonLd <- IO.fromEither(state.toAssembly)
_ <- raiseWhenDifferentSchema(r, state)
(schemaRev, schemaProject) <- validate(r.jsonld, r.schemaOpt.getOrElse(state.schema), state.project, r.caller)
_ <- IO.raiseWhen(stateJsonLd === r.jsonld)(NoChangeDetected(state))
changeDetected <- detectChange(r.jsonld, state)
_ <- IO.raiseUnless(changeDetected)(NoChangeDetected(state))
time <- clock.realTimeInstant
} yield ResourceRefreshed(r.project, schemaRev, schemaProject, r.jsonld, state.rev + 1, time, r.subject)
}
Expand Down Expand Up @@ -511,11 +511,12 @@ object Resources {
*/
def definition(
validateResource: ValidateResource,
detectChange: DetectChange,
clock: Clock[IO]
): ScopedEntityDefinition[Iri, ResourceState, ResourceCommand, ResourceEvent, ResourceRejection] =
ScopedEntityDefinition(
entityType,
StateMachine(None, evaluate(validateResource, clock)(_, _), next),
StateMachine(None, evaluate(validateResource, detectChange, clock)(_, _), next),
ResourceEvent.serializer,
ResourceState.serializer,
Tagger[ResourceEvent](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import pureconfig.generic.semiauto.deriveReader
*
* @param eventLog
* configuration of the event log
* @param decodingOption
* strict/lenient decoding of resources
* @param skipUpdateNoChange
* do not create a new revision when the update does not introduce a change in the current resource state
*/
final case class ResourcesConfig(eventLog: EventLogConfig, decodingOption: DecodingOption)
final case class ResourcesConfig(eventLog: EventLogConfig, decodingOption: DecodingOption, skipUpdateNoChange: Boolean)

object ResourcesConfig {
implicit final val resourcesConfigReader: ConfigReader[ResourcesConfig] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ object ResourcesImpl {
*/
final def apply(
validateResource: ValidateResource,
detectChange: DetectChange,
fetchContext: FetchContext[ProjectContextRejection],
contextResolution: ResolverContextResolution,
config: ResourcesConfig,
Expand All @@ -233,7 +234,7 @@ object ResourcesImpl {
uuidF: UUIDF = UUIDF.random
): Resources =
new ResourcesImpl(
ScopedEventLog(Resources.definition(validateResource, clock), config.eventLog, xas),
ScopedEventLog(Resources.definition(validateResource, detectChange, clock), config.eventLog, xas),
fetchContext,
JsonLdSourceResolvingParser[ResourceRejection](contextResolution, uuidF)
)
Expand Down
Loading