Skip to content

Commit

Permalink
Use IntetnalProjectInitializer for tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
minwoox committed Feb 18, 2025
1 parent 7856455 commit 4407579
Show file tree
Hide file tree
Showing 28 changed files with 361 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -64,8 +65,8 @@ void updateWriteQuota() throws Exception {
CompletableFutures.allAsList(futures).join();

/// update write quota to 2qps
QuotaConfig writeQuota = new QuotaConfig(2, 1);
QuotaConfig updated = updateWriteQuota(webClient(), repositoryName, writeQuota);
final QuotaConfig writeQuota = new QuotaConfig(2, 1);
final QuotaConfig updated = updateWriteQuota(webClient(), repositoryName, writeQuota);
assertThat(updated).isEqualTo(writeQuota);

// Wait for releasing previously acquired locks
Expand All @@ -86,9 +87,9 @@ void updateWriteQuota() throws Exception {
Thread.sleep(1000);

// Increase write quota
writeQuota = new QuotaConfig(5, 1);
updated = updateWriteQuota(webClient(), repositoryName, writeQuota);
assertThat(updated).isEqualTo(writeQuota);
final QuotaConfig writeQuota1 = new QuotaConfig(5, 1);
await().untilAsserted(() -> assertThat(updateWriteQuota(webClient(), repositoryName, writeQuota1))
.isEqualTo(writeQuota1));

final List<CompletableFuture<PushResult>> futures4 = parallelPush(dogmaClient(), repositoryName, 4);
assertThat(CompletableFutures.allAsList(futures4).join()).hasSize(8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,8 @@ private Server startServer(ProjectManager pm, CommandExecutor executor,
cfg.gracefulShutdownTimeout().ifPresent(
t -> sb.gracefulShutdownTimeoutMillis(t.quietPeriodMillis(), t.timeoutMillis()));

final MetadataService mds = new MetadataService(pm, executor);
final MetadataService mds = new MetadataService(pm, executor, projectInitializer);
executor.setRepositoryMetadataSupplier(mds::getRepo);
final WatchService watchService = new WatchService(meterRegistry);
final AuthProvider authProvider = createAuthProvider(executor, sessionManager, mds);
final ProjectApiManager projectApiManager = new ProjectApiManager(pm, executor, mds);
Expand Down Expand Up @@ -796,7 +797,7 @@ private CommandExecutor newZooKeeperCommandExecutor(
new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager,
/* onTakeLeadership */ null, /* onReleaseLeadership */ null,
/* onTakeZoneLeadership */ null, /* onReleaseZoneLeadership */ null),
meterRegistry, pm, config().writeQuotaPerRepository(), zone,
meterRegistry, config().writeQuotaPerRepository(), zone,
onTakeLeadership, onReleaseLeadership,
onTakeZoneLeadership, onReleaseZoneLeadership);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import javax.annotation.Nullable;
Expand All @@ -34,6 +35,7 @@
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.StartStopSupport;
import com.linecorp.centraldogma.common.ReadOnlyException;
import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;

/**
* Helps to implement a concrete {@link CommandExecutor}.
Expand All @@ -57,6 +59,9 @@ public abstract class AbstractCommandExecutor implements CommandExecutor {
private final AtomicInteger numPendingStopRequests = new AtomicInteger();
private final CommandExecutorStatusManager statusManager;

@Nullable
private BiFunction<String, String, CompletableFuture<RepositoryMetadata>> repositoryMetadataSupplier;

/**
* Creates a new instance.
*
Expand Down Expand Up @@ -120,6 +125,23 @@ public final void setWritable(boolean writable) {
this.writable = writable;
}

@Override
public void setRepositoryMetadataSupplier(
BiFunction<String, String, CompletableFuture<RepositoryMetadata>> supplier) {
repositoryMetadataSupplier = requireNonNull(supplier, "supplier");
}

/**
* Returns the {@link RepositoryMetadata} of the specified repository.
*/
@Nullable
protected CompletableFuture<RepositoryMetadata> repositoryMetadata(String projectName, String repoName) {
if (repositoryMetadataSupplier == null) {
return null;
}
return repositoryMetadataSupplier.apply(projectName, repoName);
}

@Override
public final <T> CompletableFuture<T> execute(Command<T> command) {
requireNonNull(command, "command");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.linecorp.centraldogma.server.command;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import javax.annotation.Nullable;

import com.linecorp.centraldogma.server.QuotaConfig;
import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;

/**
* An executor interface which executes {@link Command}s.
Expand Down Expand Up @@ -66,6 +68,12 @@ public interface CommandExecutor {
*/
void setWriteQuota(String projectName, String repoName, @Nullable QuotaConfig writeQuota);

/**
* Sets the {@link RepositoryMetadata} supplier.
*/
void setRepositoryMetadataSupplier(
BiFunction<String, String, CompletableFuture<RepositoryMetadata>> supplier);

/**
* Executes the specified {@link Command}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import static java.util.Objects.requireNonNull;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import com.linecorp.centraldogma.internal.Util;
import com.linecorp.centraldogma.server.QuotaConfig;
import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;

/**
* A {@link CommandExecutor} which forwards all its method calls to another {@link CommandExecutor}.
Expand Down Expand Up @@ -73,6 +75,12 @@ public void setWriteQuota(String projectName, String repoName, QuotaConfig write
delegate().setWriteQuota(projectName, repoName, writeQuota);
}

@Override
public void setRepositoryMetadataSupplier(
BiFunction<String, String, CompletableFuture<RepositoryMetadata>> supplier) {
delegate().setRepositoryMetadataSupplier(supplier);
}

@Override
public <T> CompletableFuture<T> execute(Command<T> command) {
return delegate().execute(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import com.google.common.util.concurrent.RateLimiter;
import com.spotify.futures.CompletableFutures;

import com.linecorp.armeria.common.util.UnmodifiableFuture;
import com.linecorp.centraldogma.common.TooManyRequestsException;
import com.linecorp.centraldogma.server.QuotaConfig;
import com.linecorp.centraldogma.server.auth.Session;
import com.linecorp.centraldogma.server.auth.SessionManager;
import com.linecorp.centraldogma.server.management.ServerStatusManager;
import com.linecorp.centraldogma.server.metadata.MetadataService;
import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
import com.linecorp.centraldogma.server.storage.project.Project;
import com.linecorp.centraldogma.server.storage.project.ProjectManager;
import com.linecorp.centraldogma.server.storage.repository.Repository;
Expand All @@ -58,7 +59,6 @@ public class StandaloneCommandExecutor extends AbstractCommandExecutor {
private final SessionManager sessionManager;
// if permitsPerSecond is -1, a quota is checked by ZooKeeperCommandExecutor.
private final double permitsPerSecond;
private final MetadataService metadataService;
private final ServerStatusManager serverStatusManager;

@VisibleForTesting
Expand Down Expand Up @@ -130,7 +130,6 @@ private StandaloneCommandExecutor(ProjectManager projectManager,
this.sessionManager = sessionManager;
this.permitsPerSecond = permitsPerSecond;
writeRateLimiters = new ConcurrentHashMap<>();
metadataService = new MetadataService(projectManager, this);
}

@Override
Expand Down Expand Up @@ -334,7 +333,12 @@ private CompletableFuture<CommitResult> push0(RepositoryCommand<?> c, boolean no
}

private CompletableFuture<RateLimiter> getRateLimiter(String projectName, String repoName) {
return metadataService.getRepo(projectName, repoName).thenApply(meta -> {
final CompletableFuture<RepositoryMetadata> future = repositoryMetadata(projectName, repoName);
if (future == null) {
// metadata is not available yet.
return UnmodifiableFuture.completedFuture(RateLimiter.create(permitsPerSecond));
}
return future.thenApply(meta -> {
setWriteQuota(projectName, repoName, meta.writeQuota());
return writeRateLimiters.get(rateLimiterKey(projectName, repoName));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public CompletableFuture<Revision> removeMember(@Param String projectName,
public CompletableFuture<Revision> addToken(@Param String projectName,
IdAndProjectRole request,
Author author) {
return mds.findTokenByAppId(request.id())
.thenCompose(token -> mds.addToken(author, projectName, token.appId(), request.role()));
final Token token = mds.findTokenByAppId(request.id());
return mds.addToken(author, projectName, token.appId(), request.role());
}

/**
Expand All @@ -147,8 +147,8 @@ public CompletableFuture<Revision> updateTokenRole(@Param String projectName,
Author author) {
final ReplaceOperation operation = ensureSingleReplaceOperation(jsonPatch, "/role");
final ProjectRole role = ProjectRole.of(operation.value());
return mds.findTokenByAppId(appId)
.thenCompose(token -> mds.updateTokenRole(author, projectName, token, role));
final Token token = mds.findTokenByAppId(appId);
return mds.updateTokenRole(author, projectName, token, role);
}

/**
Expand All @@ -160,8 +160,8 @@ public CompletableFuture<Revision> updateTokenRole(@Param String projectName,
public CompletableFuture<Revision> removeToken(@Param String projectName,
@Param String appId,
Author author) {
return mds.findTokenByAppId(appId)
.thenCompose(token -> mds.removeToken(author, projectName, token.appId()));
final Token token = mds.findTokenByAppId(appId);
return mds.removeToken(author, projectName, token.appId());
}

/**
Expand Down Expand Up @@ -256,9 +256,8 @@ public CompletableFuture<Revision> removeTokenRepositoryRole(@Param String proje
@Param String repoName,
@Param String appId,
Author author) {
return mds.findTokenByAppId(appId)
.thenCompose(token -> mds.removeTokenRepositoryRole(author,
projectName, repoName, appId));
final Token token = mds.findTokenByAppId(appId);
return mds.removeTokenRepositoryRole(author, projectName, repoName, appId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

package com.linecorp.centraldogma.server.internal.api.auth;

import static com.linecorp.armeria.common.util.Functions.voidFunction;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

Expand All @@ -33,6 +31,7 @@
import com.linecorp.armeria.common.auth.OAuth2Token;
import com.linecorp.armeria.common.logging.LogLevel;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.UnmodifiableFuture;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.auth.AuthTokenExtractors;
import com.linecorp.armeria.server.auth.Authorizer;
Expand All @@ -51,9 +50,9 @@ public class ApplicationTokenAuthorizer implements Authorizer<HttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(
ApplicationTokenAuthorizer.class);

private final Function<String, CompletionStage<Token>> tokenLookupFunc;
private final Function<String, Token> tokenLookupFunc;

public ApplicationTokenAuthorizer(Function<String, CompletionStage<Token>> tokenLookupFunc) {
public ApplicationTokenAuthorizer(Function<String, Token> tokenLookupFunc) {
this.tokenLookupFunc = requireNonNull(tokenLookupFunc, "tokenLookupFunc");
}

Expand All @@ -64,40 +63,34 @@ public CompletionStage<Boolean> authorize(ServiceRequestContext ctx, HttpRequest
return completedFuture(false);
}

final CompletableFuture<Boolean> res = new CompletableFuture<>();
tokenLookupFunc.apply(token.accessToken())
.thenAccept(appToken -> {
if (appToken != null && appToken.isActive()) {
final String appId = appToken.appId();
final StringBuilder login = new StringBuilder(appId);
final SocketAddress ra = ctx.remoteAddress();
if (ra instanceof InetSocketAddress) {
login.append('@').append(((InetSocketAddress) ra).getHostString());
}
ctx.logBuilder().authenticatedUser("app/" + appId);
final UserWithToken user = new UserWithToken(login.toString(), appToken);
AuthUtil.setCurrentUser(ctx, user);
HttpApiUtil.setVerboseResponses(ctx, user);
res.complete(true);
} else {
res.complete(false);
}
})
// Should be authorized by the next authorizer.
.exceptionally(voidFunction(cause -> {
cause = Exceptions.peel(cause);
final LogLevel level;
if (cause instanceof IllegalArgumentException ||
cause instanceof TokenNotFoundException) {
level = LogLevel.DEBUG;
} else {
level = LogLevel.WARN;
}
level.log(logger, "Failed to authorize an application token: token={}, addr={}",
token.accessToken(), ctx.clientAddress(), cause);
res.complete(false);
}));

return res;
try {
final Token appToken = tokenLookupFunc.apply(token.accessToken());
if (appToken != null && appToken.isActive()) {
final String appId = appToken.appId();
final StringBuilder login = new StringBuilder(appId);
final SocketAddress ra = ctx.remoteAddress();
if (ra instanceof InetSocketAddress) {
login.append('@').append(((InetSocketAddress) ra).getHostString());
}
ctx.logBuilder().authenticatedUser("app/" + appId);
final UserWithToken user = new UserWithToken(login.toString(), appToken);
AuthUtil.setCurrentUser(ctx, user);
HttpApiUtil.setVerboseResponses(ctx, user);
return UnmodifiableFuture.completedFuture(true);
}
return UnmodifiableFuture.completedFuture(false);
} catch (Throwable cause) {
cause = Exceptions.peel(cause);
final LogLevel level;
if (cause instanceof IllegalArgumentException ||
cause instanceof TokenNotFoundException) {
level = LogLevel.DEBUG;
} else {
level = LogLevel.WARN;
}
level.log(logger, "Failed to authorize an application token: token={}, addr={}",
token.accessToken(), ctx.clientAddress(), cause);
return UnmodifiableFuture.completedFuture(false);
}
}
}
Loading

0 comments on commit 4407579

Please sign in to comment.