Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactoring] Ec2 and GCS plugins build client lazily #31250

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ class HostType {
* client.
*
* @param clientSettings the new refreshed settings
* @return the old stale settings
*/
Ec2ClientSettings refreshAndClearCache(Ec2ClientSettings clientSettings);
void refreshAndClearCache(Ec2ClientSettings clientSettings);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.discovery.ec2;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand All @@ -32,16 +32,19 @@
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.LazyInitializable;

class AwsEc2ServiceImpl extends AbstractComponent implements AwsEc2Service {

public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";

private volatile AmazonEc2Reference clientReference;
private volatile Ec2ClientSettings clientSettings;
private final AtomicReference<LazyInitializable<AmazonEc2Reference, ElasticsearchException>> lazyClientReference =
new AtomicReference<>();

AwsEc2ServiceImpl(Settings settings) {
super(settings);
Expand Down Expand Up @@ -108,52 +111,35 @@ static AWSCredentialsProvider buildCredentials(Logger logger, Ec2ClientSettings

@Override
public AmazonEc2Reference client() {
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
synchronized (this) {
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
if (clientSettings == null) {
throw new IllegalArgumentException("Missing ec2 client configs.");
}
final AmazonEc2Reference clientReference = new AmazonEc2Reference(buildClient(clientSettings));
clientReference.incRef();
this.clientReference = clientReference;
return clientReference;
final LazyInitializable<AmazonEc2Reference, ElasticsearchException> clientReference = this.lazyClientReference.get();
if (clientReference == null) {
throw new IllegalStateException("Missing ec2 client configs");
}
return clientReference.getOrCompute();
}


/**
* Refreshes the settings for the AmazonEC2 client. New clients will be build
* using these new settings. Old client is usable until released. On release it
* Refreshes the settings for the AmazonEC2 client. The new client will be build
* using these new settings. The old client is usable until released. On release it
* will be destroyed instead of being returned to the cache.
*/
@Override
public synchronized Ec2ClientSettings refreshAndClearCache(Ec2ClientSettings clientSettings) {
// shutdown all unused clients
// others will shutdown on their respective release
releaseCachedClient();
final Ec2ClientSettings prevSettings = this.clientSettings;
this.clientSettings = clientSettings;
return prevSettings;
public void refreshAndClearCache(Ec2ClientSettings clientSettings) {
final LazyInitializable<AmazonEc2Reference, ElasticsearchException> newClient = new LazyInitializable<>(
() -> new AmazonEc2Reference(buildClient(clientSettings)), clientReference -> clientReference.incRef(),
clientReference -> clientReference.decRef());
final LazyInitializable<AmazonEc2Reference, ElasticsearchException> oldClient = this.lazyClientReference.getAndSet(newClient);
if (oldClient != null) {
oldClient.reset();
}
}

@Override
public void close() {
releaseCachedClient();
}

private synchronized void releaseCachedClient() {
if (this.clientReference == null) {
return;
final LazyInitializable<AmazonEc2Reference, ElasticsearchException> clientReference = this.lazyClientReference.getAndSet(null);
if (clientReference != null) {
clientReference.reset();
}
// the client will shutdown when it will not be used anymore
this.clientReference.decRef();
// clear the cached client, it will be build lazily
this.clientReference = null;
// shutdown IdleConnectionReaper background thread
// it will be restarted on new client usage
IdleConnectionReaper.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai

@Override
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
throws URISyntaxException, StorageException, FileAlreadyExistsExceeption {
throws URISyntaxException, StorageException, FileAlreadyExistsException {
if (blobs.containsKey(blobName)) {
throw new FileAlreadyExistsException(blobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,25 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.LazyInitializable;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyMap;

public class GoogleCloudStorageService extends AbstractComponent {

/** Clients settings identified by client name. */
private volatile Map<String, GoogleCloudStorageClientSettings> clientsSettings = emptyMap();
/** Cache of client instances. Client instances are built once for each setting change. */
private volatile Map<String, Storage> clientsCache = emptyMap();
/**
* Dictionary of client instances. Client instances are built lazily from the
* latest settings.
*/
private final AtomicReference<Map<String, LazyInitializable<Storage, IOException>>> clientsCache = new AtomicReference<>(emptyMap());

public GoogleCloudStorageService(final Settings settings) {
super(settings);
Expand All @@ -57,18 +61,21 @@ public GoogleCloudStorageService(final Settings settings) {
/**
* Refreshes the client settings and clears the client cache. Subsequent calls to
* {@code GoogleCloudStorageService#client} will return new clients constructed
* using these passed settings.
* using the parameter settings.
*
* @param clientsSettings the new settings used for building clients for subsequent requests
* @return previous settings which have been substituted
*/
public synchronized Map<String, GoogleCloudStorageClientSettings>
refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) {
final Map<String, GoogleCloudStorageClientSettings> prevSettings = this.clientsSettings;
this.clientsSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
this.clientsCache = emptyMap();
// clients are built lazily by {@link client(String)}
return prevSettings;
public synchronized void refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) {
// build the new lazy clients
final MapBuilder<String, LazyInitializable<Storage, IOException>> newClientsCache = MapBuilder.newMapBuilder();
for (final Map.Entry<String, GoogleCloudStorageClientSettings> entry : clientsSettings.entrySet()) {
newClientsCache.put(entry.getKey(),
new LazyInitializable<Storage, IOException>(() -> createClient(entry.getKey(), entry.getValue())));
}
// make the new clients available
final Map<String, LazyInitializable<Storage, IOException>> oldClientCache = clientsCache.getAndSet(newClientsCache.immutableMap());
// release old clients
oldClientCache.values().forEach(LazyInitializable::reset);
}

/**
Expand All @@ -83,37 +90,26 @@ public GoogleCloudStorageService(final Settings settings) {
* (blobs)
*/
public Storage client(final String clientName) throws IOException {
Storage storage = clientsCache.get(clientName);
if (storage != null) {
return storage;
}
synchronized (this) {
storage = clientsCache.get(clientName);
if (storage != null) {
return storage;
}
storage = SocketAccess.doPrivilegedIOException(() -> createClient(clientName));
clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, storage).immutableMap();
return storage;
final LazyInitializable<Storage, IOException> lazyClient = clientsCache.get().get(clientName);
if (lazyClient == null) {
throw new IllegalArgumentException("Unknown client name [" + clientName + "]. Existing client configs: "
+ Strings.collectionToDelimitedString(clientsCache.get().keySet(), ","));
}
return lazyClient.getOrCompute();
}

/**
* Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe.
*
* @param clientName name of client settings to use, including secure settings
* @param clientSettings name of client settings to use, including secure settings
* @return a new client storage instance that can be used to manage objects
* (blobs)
*/
private Storage createClient(final String clientName) throws Exception {
final GoogleCloudStorageClientSettings clientSettings = clientsSettings.get(clientName);
if (clientSettings == null) {
throw new IllegalArgumentException("Unknown client name [" + clientName + "]. Existing client configs: "
+ Strings.collectionToDelimitedString(clientsSettings.keySet(), ","));
}
private Storage createClient(final String clientName, final GoogleCloudStorageClientSettings clientSettings) throws IOException {
logger.debug(() -> new ParameterizedMessage("creating GCS client with client_name [{}], endpoint [{}]", clientName,
clientSettings.getHost()));
final HttpTransport httpTransport = createHttpTransport(clientSettings.getHost());
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> createHttpTransport(clientSettings.getHost()));
final HttpTransportOptions httpTransportOptions = HttpTransportOptions.newBuilder()
.setConnectTimeout(toTimeout(clientSettings.getConnectTimeout()))
.setReadTimeout(toTimeout(clientSettings.getReadTimeout()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util;

import org.elasticsearch.common.CheckedSupplier;

import java.util.Objects;
import java.util.function.Consumer;

/**
* Encapsulates a {@link CheckedSupplier} which is lazily invoked once on the
* first call to {@code #getOrCompute()}. The value which the
* <code>supplier</code> returns is memorized and will be served until
* {@code #reset()} is called. Each value returned by {@code #getOrCompute()},
* newly minted or cached, will be passed to the <code>primer</code>
* {@link Consumer}. On {@code #reset()} the value will be passed to the
* <code>finalizer</code> {@code Consumer} and the next {@code #getOrCompute()}
* will regenerate the value.
*/
public final class LazyInitializable<T, E extends Exception> {

private final CheckedSupplier<T, E> supplier;
private final Consumer<T> primer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about onGet as a name instead of primer

private final Consumer<T> finalizer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about onReset instead of finalizer? finalizer makes me think of garbage collection

private volatile T value;

/**
* Creates the simple LazyInitializable instance.
*
* @param supplier
* The {@code CheckedSupplier} to generate values which will be
* served on {@code #getOrCompute()} invocations.
*/
public LazyInitializable(CheckedSupplier<T, E> supplier) {
this(supplier, v -> {}, v -> {});
}

/**
* Creates the complete LazyInitializable instance.
*
* @param supplier
* The {@code CheckedSupplier} to generate values which will be
* served on {@code #getOrCompute()} invocations.
* @param primer
* A {@code Consumer} which is called on each value, newly forged or
* stale, that is returned by {@code #getOrCompute()}
* @param finalizer
* A {@code Consumer} which is invoked on the value that will be
* erased when calling {@code #reset()}
*/
public LazyInitializable(CheckedSupplier<T, E> supplier, Consumer<T> primer, Consumer<T> finalizer) {
this.supplier = supplier;
this.primer = primer;
this.finalizer = finalizer;
}

/**
* Returns a value that was created by <code>supplier</code>. The value might
* have been previously created, if not it will be created now, thread safe of
* course.
*/
public T getOrCompute() throws E {
final T readOnce = value; // Read volatile just once...
final T result = readOnce == null ? maybeCompute(supplier) : readOnce;
primer.accept(result);
return result;
}

/**
* Clears the value, if it has been previously created by calling
* {@code #getOrCompute()}. The <code>finalizer</code> will be called on this
* value. The next call to {@code #getOrCompute()} will recreate the value.
*/
public synchronized void reset() {
if (value != null) {
finalizer.accept(value);
value = null;
}
}

/**
* Creates a new value thread safely.
*/
private synchronized T maybeCompute(CheckedSupplier<T, E> supplier) throws E {
if (value == null) {
value = Objects.requireNonNull(supplier.get());
}
return value;
}

}