Skip to content

Commit

Permalink
Merge pull request apache#26694: [Java] Track pipeline options revisi…
Browse files Browse the repository at this point in the history
…on for idempotent initialization of file systems
  • Loading branch information
aromanenko-dev committed Aug 2, 2023
2 parents e0b6d83 + f3bdc8d commit 663bd52
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
Expand All @@ -74,6 +75,9 @@ public class FileSystems {
Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):/.*");
private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]");

private static final AtomicReference<KV<Long, Integer>> FILESYSTEM_REVISION =
new AtomicReference<>();

private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<>(ImmutableMap.of(DEFAULT_SCHEME, new LocalFileSystem()));

Expand Down Expand Up @@ -529,13 +533,27 @@ static FileSystem getFileSystemInternal(String scheme) {
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
long id = options.getOptionsId();
int nextRevision = options.revision();

while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
// only update file systems if the pipeline changed or the options revision increased
if (revision != null && revision.getKey().equals(id) && revision.getValue() >= nextRevision) {
return;
}

if (FILESYSTEM_REVISION.compareAndSet(revision, KV.of(id, nextRevision))) {
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));

SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
return;
}
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ public String create(PipelineOptions options) {
*/
Map<String, Map<String, Object>> outputRuntimeOptions();

/**
* A monotonically increasing revision number of this {@link PipelineOptions} object that can be
* used to detect changes.
*/
int revision();

/**
* Provides a process wide unique ID for this {@link PipelineOptions} object, assigned at graph
* construction time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,7 @@ private static void validateMethodsAreEitherBeanMethodOrKnownMethod(
try {
knownMethods.add(iface.getMethod("as", Class.class));
knownMethods.add(iface.getMethod("outputRuntimeOptions"));
knownMethods.add(iface.getMethod("revision"));
knownMethods.add(iface.getMethod("populateDisplayData", DisplayData.Builder.class));
} catch (NoSuchMethodException | SecurityException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.options.PipelineOptionsFactory.AnnotationPredicates;
import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration;
Expand Down Expand Up @@ -98,6 +100,8 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
*/
private final int hashCode = ThreadLocalRandom.current().nextInt();

private final AtomicInteger revision;

private static final class ComputedProperties {
final ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
final ImmutableMap<String, String> gettersToPropertyNames;
Expand Down Expand Up @@ -158,7 +162,7 @@ <T extends PipelineOptions> ComputedProperties updated(
private final ImmutableMap<String, JsonNode> jsonOptions;

ProxyInvocationHandler(Map<String, Object> options) {
this(bindOptions(options), Maps.newHashMap());
this(bindOptions(options), Maps.newHashMap(), 0);
}

private static Map<String, BoundValue> bindOptions(Map<String, Object> inputOptions) {
Expand All @@ -171,9 +175,10 @@ private static Map<String, BoundValue> bindOptions(Map<String, Object> inputOpti
}

private ProxyInvocationHandler(
Map<String, BoundValue> options, Map<String, JsonNode> jsonOptions) {
Map<String, BoundValue> options, Map<String, JsonNode> jsonOptions, int revision) {
this.options = new ConcurrentHashMap<>(options);
this.jsonOptions = ImmutableMap.copyOf(jsonOptions);
this.revision = new AtomicInteger(revision);
this.computedProperties =
new ComputedProperties(
ImmutableClassToInstanceMap.of(),
Expand All @@ -193,6 +198,8 @@ public Object invoke(Object proxy, Method method, Object[] args) {
return hashCode();
} else if (args == null && "outputRuntimeOptions".equals(method.getName())) {
return outputRuntimeOptions((PipelineOptions) proxy);
} else if (args == null && "revision".equals(method.getName())) {
return revision.get();
} else if (args != null && "as".equals(method.getName()) && args[0] instanceof Class) {
@SuppressWarnings("unchecked")
Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
Expand Down Expand Up @@ -222,9 +229,13 @@ public Object invoke(Object proxy, Method method, Object[] args) {
}
return options.get(propertyName).getValue();
} else if (properties.settersToPropertyNames.containsKey(methodName)) {
options.put(
properties.settersToPropertyNames.get(methodName),
BoundValue.fromExplicitOption(args[0]));
BoundValue prev =
options.put(
properties.settersToPropertyNames.get(methodName),
BoundValue.fromExplicitOption(args[0]));
if (prev == null ? args[0] != null : !Objects.equals(args[0], prev.getValue())) {
revision.incrementAndGet();
}
return Void.TYPE;
}
throw new RuntimeException(
Expand Down Expand Up @@ -781,6 +792,8 @@ public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvi

jgen.writeFieldName("display_data");
jgen.writeObject(serializedDisplayData);

jgen.writeNumberField("revision", handler.revision.get());
jgen.writeEndObject();
}

Expand Down Expand Up @@ -879,9 +892,9 @@ public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt)
fields.put(field.getKey(), field.getValue());
}
}

int revision = objectNode.hasNonNull("revision") ? objectNode.get("revision").asInt() : 0;
PipelineOptions options =
new ProxyInvocationHandler(Maps.newHashMap(), fields).as(PipelineOptions.class);
new ProxyInvocationHandler(Maps.newHashMap(), fields, revision).as(PipelineOptions.class);
ValueProvider.RuntimeValueProvider.setRuntimeOptions(options);
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ public class PipelineOptionsFactoryTest {
@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(PipelineOptionsFactory.class);

@Test
public void testRevision() {
PipelineOptions options = PipelineOptionsFactory.create();
assertEquals(1, options.revision());
for (int i = 0; i < 10; i++) {
options.setJobName("other" + i);
// updates are idempotent, the 2nd call won't increment the revision
options.setJobName("other" + i);
}
assertEquals(11, options.revision());
}

@Test
public void testAutomaticRegistrationOfPipelineOptions() {
assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class));
Expand Down

0 comments on commit 663bd52

Please sign in to comment.