Skip to content

Commit

Permalink
Merge branch 'trunk' into fix-retry-mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
pujagani authored Oct 3, 2023
2 parents be5b4e3 + 2ffb772 commit daf4260
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -379,8 +380,9 @@ private void updateNodeStatus(NodeStatus status, Runnable healthCheck) {
.build();

LOG.log(getDebugLogLevel(), "Running health check for Node " + status.getExternalUri());
Executors.newSingleThreadExecutor()
.submit(() -> Failsafe.with(initialHealthCheckPolicy).run(healthCheck::run));
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> Failsafe.with(initialHealthCheckPolicy).run(healthCheck::run));
executor.shutdown();
}
}

Expand Down
40 changes: 21 additions & 19 deletions java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import dev.failsafe.RetryPolicy;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
Expand Down Expand Up @@ -200,25 +201,26 @@ public NettyServer start() {
.build();

LOG.info("Starting registration process for Node " + node.getUri());
Executors.newSingleThreadExecutor()
.submit(
() -> {
Failsafe.with(registrationPolicy)
.run(
() -> {
if (nodeRegistered.get()) {
throw new InterruptedException("Stopping registration thread.");
}
HealthCheck.Result check = node.getHealthCheck().check();
if (DOWN.equals(check.getAvailability())) {
LOG.severe("Node is not alive: " + check.getMessage());
// Throw an exception to force another check sooner.
throw new UnsupportedOperationException("Node cannot be registered");
}
bus.fire(new NodeStatusEvent(node.getStatus()));
LOG.info("Sending registration event...");
});
});
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(
() -> {
Failsafe.with(registrationPolicy)
.run(
() -> {
if (nodeRegistered.get()) {
throw new InterruptedException("Stopping registration thread.");
}
HealthCheck.Result check = node.getHealthCheck().check();
if (DOWN.equals(check.getAvailability())) {
LOG.severe("Node is not alive: " + check.getMessage());
// Throw an exception to force another check sooner.
throw new UnsupportedOperationException("Node cannot be registered");
}
bus.fire(new NodeStatusEvent(node.getStatus()));
LOG.info("Sending registration event...");
});
});
executor.shutdown();

return this;
}
Expand Down
85 changes: 20 additions & 65 deletions java/src/org/openqa/selenium/grid/router/HandleSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,19 @@
import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST_EVENT;
import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.collect.ImmutableMap;
import java.net.URL;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openqa.selenium.NoSuchSessionException;
import org.openqa.selenium.concurrent.GuardedRunnable;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.web.ReverseProxyHandler;
import org.openqa.selenium.internal.Require;
Expand All @@ -60,60 +58,24 @@

class HandleSession implements HttpHandler {

private static final Logger LOG = Logger.getLogger(HandleSession.class.getName());

private static class CacheEntry {
private final SessionId sessionId;
private final HttpClient httpClient;
// volatile as the ConcurrentMap will not take care of synchronization
private volatile Instant lastUse;

public CacheEntry(SessionId sessionId, HttpClient httpClient) {
this.sessionId = sessionId;
this.httpClient = httpClient;
this.lastUse = Instant.now();
}
}

private final Tracer tracer;
private final HttpClient.Factory httpClientFactory;
private final SessionMap sessions;
private final ConcurrentMap<URL, CacheEntry> httpClients;
private final Cache<URL, HttpClient> httpClients;

HandleSession(Tracer tracer, HttpClient.Factory httpClientFactory, SessionMap sessions) {
this.tracer = Require.nonNull("Tracer", tracer);
this.httpClientFactory = Require.nonNull("HTTP client factory", httpClientFactory);
this.sessions = Require.nonNull("Sessions", sessions);

this.httpClients = new ConcurrentHashMap<>();

Runnable cleanUpHttpClients =
() -> {
Instant revalidateBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
Iterator<CacheEntry> iterator = httpClients.values().iterator();

while (iterator.hasNext()) {
CacheEntry entry = iterator.next();

if (!entry.lastUse.isBefore(revalidateBefore)) {
// the session was recently used
return;
}

try {
sessions.get(entry.sessionId);
} catch (NoSuchSessionException e) {
// the session is dead, remove it from the cache
iterator.remove();

try {
entry.httpClient.close();
} catch (Exception ex) {
LOG.log(Level.WARNING, "failed to close a stale httpclient", ex);
}
}
}
};
this.httpClients =
CacheBuilder.newBuilder()
// this timeout must be bigger than default connection + read timeout, to ensure we do
// not close HttpClients which might have requests waiting for responses
.expireAfterAccess(Duration.ofMinutes(4))
.removalListener(
(RemovalListener<URL, HttpClient>) removal -> removal.getValue().close())
.build();

ScheduledExecutorService cleanUpHttpClientsCacheService =
Executors.newSingleThreadScheduledExecutor(
Expand All @@ -124,7 +86,7 @@ public CacheEntry(SessionId sessionId, HttpClient httpClient) {
return thread;
});
cleanUpHttpClientsCacheService.scheduleAtFixedRate(
GuardedRunnable.guard(cleanUpHttpClients), 1, 1, TimeUnit.MINUTES);
GuardedRunnable.guard(httpClients::cleanUp), 1, 1, TimeUnit.MINUTES);
}

@Override
Expand Down Expand Up @@ -203,18 +165,11 @@ public HttpResponse execute(HttpRequest req) {
private Callable<HttpHandler> loadSessionId(Tracer tracer, Span span, SessionId id) {
return span.wrap(
() -> {
CacheEntry cacheEntry =
httpClients.computeIfAbsent(
Urls.fromUri(sessions.getUri(id)),
(sessionUrl) -> {
ClientConfig config =
ClientConfig.defaultConfig().baseUrl(sessionUrl).withRetries();
HttpClient httpClient = httpClientFactory.createClient(config);

return new CacheEntry(id, httpClient);
});
cacheEntry.lastUse = Instant.now();
return new ReverseProxyHandler(tracer, cacheEntry.httpClient);
Session session = sessions.get(id);
URL url = Urls.fromUri(session.getUri());
ClientConfig config = ClientConfig.defaultConfig().baseUrl(url).withRetries();
HttpClient client = httpClients.get(url, () -> httpClientFactory.createClient(config));
return new ReverseProxyHandler(tracer, client);
});
}
}
30 changes: 16 additions & 14 deletions java/src/org/openqa/selenium/manager/SeleniumManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,23 @@ public class SeleniumManager {

/** Wrapper for the Selenium Manager binary. */
private SeleniumManager() {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
if (binary != null && Files.exists(binary)) {
try {
Files.delete(binary);
} catch (IOException e) {
LOG.warning(
String.format(
"%s deleting temporal file: %s",
e.getClass().getSimpleName(), e.getMessage()));
if (managerPath == null) {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
if (binary != null && Files.exists(binary)) {
try {
Files.delete(binary);
} catch (IOException e) {
LOG.warning(
String.format(
"%s deleting temporal file: %s",
e.getClass().getSimpleName(), e.getMessage()));
}
}
}
}));
}));
}
}

public static SeleniumManager getInstance() {
Expand Down
4 changes: 2 additions & 2 deletions py/selenium/webdriver/common/bidi/cdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def import_devtools(ver):
versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir())
latest = max(int(x[1:]) for x in versions)
selenium_logger = logging.getLogger(__name__)
selenium_logger.debug(f"Falling back to loading `devtools`: v{latest}")
selenium_logger.debug("Falling back to loading `devtools`: v%s", latest)
devtools = importlib.import_module(f"{base}{latest}")
return devtools

Expand Down Expand Up @@ -265,7 +265,7 @@ def _handle_cmd_response(self, data):
try:
cmd, event = self.inflight_cmd.pop(cmd_id)
except KeyError:
logger.warning(f"Got a message with a command ID that does not exist: {data}")
logger.warning("Got a message with a command ID that does not exist: %s", data)
return
if "error" in data:
# If the server reported an error, convert it to an exception and do
Expand Down
8 changes: 4 additions & 4 deletions py/selenium/webdriver/common/selenium_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ def get_binary() -> Path:
if not path.is_file() and os.environ["CONDA_PREFIX"]:
# conda has a separate package selenium-manager, installs in bin
path = Path(os.path.join(os.environ["CONDA_PREFIX"], "bin", file))
logger.debug(f"Conda environment detected, using `{path}`")
logger.debug("Conda environment detected, using `%s`", path)
if not path.is_file():
raise WebDriverException(f"Unable to obtain working Selenium Manager binary; {path}")

logger.debug(f"Selenium Manager binary found at: {path}")
logger.debug("Selenium Manager binary found at: %s", path)

return path

Expand Down Expand Up @@ -99,7 +99,7 @@ def driver_location(self, options: BaseOptions) -> str:

browser_path = output["browser_path"]
driver_path = output["driver_path"]
logger.debug(f"Using driver at: {driver_path}")
logger.debug("Using driver at: %s", driver_path)

if hasattr(options.__class__, "binary_location"):
options.binary_location = browser_path
Expand All @@ -121,7 +121,7 @@ def run(args: List[str]) -> dict:
args.append("json")

command = " ".join(args)
logger.debug(f"Executing process: {command}")
logger.debug("Executing process: %s", command)
try:
if sys.platform == "win32":
completed_proc = subprocess.run(args, capture_output=True, creationflags=subprocess.CREATE_NO_WINDOW)
Expand Down
2 changes: 1 addition & 1 deletion py/selenium/webdriver/common/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _start_process(self, path: str) -> None:
startupinfo=start_info,
**self.popen_kw,
)
logger.debug(f"Started executable: `{self._path}` in a child process with pid: {self.process.pid}")
logger.debug("Started executable: `%s` in a child process with pid: %s", self._path, self.process.pid)
except TypeError:
raise
except OSError as err:
Expand Down
4 changes: 2 additions & 2 deletions py/selenium/webdriver/remote/remote_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _request(self, method, url, body=None):
:Returns:
A dictionary with the server's parsed JSON response.
"""
LOGGER.debug(f"{method} {url} {body}")
LOGGER.debug("%s %s %s", method, url, body)
parsed_url = parse.urlparse(url)
headers = self.get_remote_connection_headers(parsed_url, self.keep_alive)
response = None
Expand All @@ -323,7 +323,7 @@ def _request(self, method, url, body=None):
response = http.request(method, url, body=body, headers=headers)
statuscode = response.status
data = response.data.decode("UTF-8")
LOGGER.debug(f"Remote response: status={response.status} | data={data} | headers={response.headers}")
LOGGER.debug("Remote response: status=%s | data=%s | headers=%s", response.status, data, response.headers)
try:
if 300 <= statuscode < 304:
return self._request("GET", response.headers.get("location", None))
Expand Down
4 changes: 2 additions & 2 deletions py/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ force_single_line = True
skip_install = true
deps =
isort==5.12.0
black==23.7.0
black==23.9.1
flake8==6.1.0
flake8-typing-imports==1.14.0
docformatter==1.7.5
Expand All @@ -55,7 +55,7 @@ commands =
skip_install = true
deps =
isort==5.12.0
black==23.7.0
black==23.9.1
flake8==6.1.0
flake8-typing-imports==1.14.0
docformatter==1.7.5
Expand Down
9 changes: 6 additions & 3 deletions rb/lib/selenium/webdriver/atoms.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
module Selenium
module WebDriver
module Atoms
def atom_script(function_name)
format("/* #{function_name} */return (%<atom>s).apply(null, arguments)",
atom: read_atom(function_name))
end

private

def read_atom(function)
File.read(File.expand_path("../atoms/#{function}.js", __FILE__))
end

def execute_atom(function_name, *arguments)
script = format("/* #{function_name} */return (%<atom>s).apply(null, arguments)",
atom: read_atom(function_name))
execute_script(script, *arguments)
execute_script(atom_script(function_name), *arguments)
end
end # Atoms
end # WebDriver
Expand Down

0 comments on commit daf4260

Please sign in to comment.