diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index cb899ce92625b..40659ca251d78 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; @@ -74,6 +75,9 @@ public class FileSystems { Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):/.*"); private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]"); + private static final AtomicReference> FILESYSTEM_REVISION = + new AtomicReference<>(); + private static final AtomicReference> SCHEME_TO_FILESYSTEM = new AtomicReference<>(ImmutableMap.of(DEFAULT_SCHEME, new LocalFileSystem())); @@ -529,13 +533,27 @@ static FileSystem getFileSystemInternal(String scheme) { @Internal public static void setDefaultPipelineOptions(PipelineOptions options) { checkNotNull(options, "options"); - Set registrars = - Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); - registrars.addAll( - Lists.newArrayList( - ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); + long id = options.getOptionsId(); + int nextRevision = options.revision(); + + while (true) { + KV revision = FILESYSTEM_REVISION.get(); + // only update file systems if the pipeline changed or the options revision increased + if (revision != null && revision.getKey().equals(id) && revision.getValue() >= nextRevision) { + return; + } + + if (FILESYSTEM_REVISION.compareAndSet(revision, KV.of(id, nextRevision))) { + Set registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll( + Lists.newArrayList( + ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); - SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars)); + SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars)); + return; + } + } } @VisibleForTesting diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 0cefaba81e11f..64d4c3517c485 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -357,6 +357,12 @@ public String create(PipelineOptions options) { */ Map> outputRuntimeOptions(); + /** + * A monotonically increasing revision number of this {@link PipelineOptions} object that can be + * used to detect changes. + */ + int revision(); + /** * Provides a process wide unique ID for this {@link PipelineOptions} object, assigned at graph * construction time. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 4d3f993257aad..01a8f37e706fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1269,6 +1269,7 @@ private static void validateMethodsAreEitherBeanMethodOrKnownMethod( try { knownMethods.add(iface.getMethod("as", Class.class)); knownMethods.add(iface.getMethod("outputRuntimeOptions")); + knownMethods.add(iface.getMethod("revision")); knownMethods.add(iface.getMethod("populateDisplayData", DisplayData.Builder.class)); } catch (NoSuchMethodException | SecurityException e) { throw new RuntimeException(e); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 77324eb7f18f1..cd7e7a2ae5ddd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -49,11 +49,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.options.PipelineOptionsFactory.AnnotationPredicates; import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration; @@ -98,6 +100,8 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable { */ private final int hashCode = ThreadLocalRandom.current().nextInt(); + private final AtomicInteger revision; + private static final class ComputedProperties { final ImmutableClassToInstanceMap interfaceToProxyCache; final ImmutableMap gettersToPropertyNames; @@ -158,7 +162,7 @@ ComputedProperties updated( private final ImmutableMap jsonOptions; ProxyInvocationHandler(Map options) { - this(bindOptions(options), Maps.newHashMap()); + this(bindOptions(options), Maps.newHashMap(), 0); } private static Map bindOptions(Map inputOptions) { @@ -171,9 +175,10 @@ private static Map bindOptions(Map inputOpti } private ProxyInvocationHandler( - Map options, Map jsonOptions) { + Map options, Map jsonOptions, int revision) { this.options = new ConcurrentHashMap<>(options); this.jsonOptions = ImmutableMap.copyOf(jsonOptions); + this.revision = new AtomicInteger(revision); this.computedProperties = new ComputedProperties( ImmutableClassToInstanceMap.of(), @@ -193,6 +198,8 @@ public Object invoke(Object proxy, Method method, Object[] args) { return hashCode(); } else if (args == null && "outputRuntimeOptions".equals(method.getName())) { return outputRuntimeOptions((PipelineOptions) proxy); + } else if (args == null && "revision".equals(method.getName())) { + return revision.get(); } else if (args != null && "as".equals(method.getName()) && args[0] instanceof Class) { @SuppressWarnings("unchecked") Class clazz = (Class) args[0]; @@ -222,9 +229,13 @@ public Object invoke(Object proxy, Method method, Object[] args) { } return options.get(propertyName).getValue(); } else if (properties.settersToPropertyNames.containsKey(methodName)) { - options.put( - properties.settersToPropertyNames.get(methodName), - BoundValue.fromExplicitOption(args[0])); + BoundValue prev = + options.put( + properties.settersToPropertyNames.get(methodName), + BoundValue.fromExplicitOption(args[0])); + if (prev == null ? args[0] != null : !Objects.equals(args[0], prev.getValue())) { + revision.incrementAndGet(); + } return Void.TYPE; } throw new RuntimeException( @@ -781,6 +792,8 @@ public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvi jgen.writeFieldName("display_data"); jgen.writeObject(serializedDisplayData); + + jgen.writeNumberField("revision", handler.revision.get()); jgen.writeEndObject(); } @@ -879,9 +892,9 @@ public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt) fields.put(field.getKey(), field.getValue()); } } - + int revision = objectNode.hasNonNull("revision") ? objectNode.get("revision").asInt() : 0; PipelineOptions options = - new ProxyInvocationHandler(Maps.newHashMap(), fields).as(PipelineOptions.class); + new ProxyInvocationHandler(Maps.newHashMap(), fields, revision).as(PipelineOptions.class); ValueProvider.RuntimeValueProvider.setRuntimeOptions(options); return options; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 1970b240bbfb8..fcc40c904f721 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -103,6 +103,18 @@ public class PipelineOptionsFactoryTest { @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(PipelineOptionsFactory.class); + @Test + public void testRevision() { + PipelineOptions options = PipelineOptionsFactory.create(); + assertEquals(1, options.revision()); + for (int i = 0; i < 10; i++) { + options.setJobName("other" + i); + // updates are idempotent, the 2nd call won't increment the revision + options.setJobName("other" + i); + } + assertEquals(11, options.revision()); + } + @Test public void testAutomaticRegistrationOfPipelineOptions() { assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class));