diff --git a/docs/application-settings.md b/docs/application-settings.md index c51febaea3e..39fd52c7e5a 100644 --- a/docs/application-settings.md +++ b/docs/application-settings.md @@ -259,6 +259,51 @@ Here's an example YAML file containing account-specific settings: default: true ``` +## Setting Account Configuration in S3 + +This is identical to the account configuration in a file system, with the main difference that your file system is +[AWS S3](https://aws.amazon.com/de/s3/) or any S3 compatible storage, such as [MinIO](https://min.io/). + + +The general idea is that you'll place all the account-specific settings in a separate YAML file and point to that file. + +```yaml +settings: + s3: + accessKeyId: + secretAccessKey: + endpoint: # http://s3.storage.com + bucket: # prebid-application-settings + region: # if not provided AWS_GLOBAL will be used. Example value: 'eu-central-1' + accounts-dir: accounts + stored-imps-dir: stored-impressions + stored-requests-dir: stored-requests + stored-responses-dir: stored-responses + + # recommended to configure an in memory cache, but this is optional + in-memory-cache: + # example settings, tailor to your needs + cache-size: 100000 + ttl-seconds: 1200 # 20 minutes + # recommended to configure + s3-update: + refresh-rate: 900000 # Refresh every 15 minutes + timeout: 5000 +``` + +### File format + +We recommend using the `json` format for your account configuration. A minimal configuration may look like this. + +```json +{ + "id" : "979c7116-1f5a-43d4-9a87-5da3ccc4f52c", + "status" : "active" +} +``` + +This pairs nicely if you have a default configuration defined in your prebid server config under `settings.default-account-config`. + ## Setting Account Configuration in the Database In database approach account properties are stored in database table(s). diff --git a/extra/pom.xml b/extra/pom.xml index 5e561df05a0..52f145ad28a 100644 --- a/extra/pom.xml +++ b/extra/pom.xml @@ -51,6 +51,7 @@ 3.21.7 3.17.3 1.0.7 + 2.26.24 3.5.4 @@ -206,6 +207,11 @@ geoip2 ${maxmind-client.version} + + software.amazon.awssdk + s3 + ${aws.awssdk.version} + com.google.protobuf protobuf-java diff --git a/pom.xml b/pom.xml index 2e335a8e4ee..b61807a8ebd 100644 --- a/pom.xml +++ b/pom.xml @@ -173,6 +173,10 @@ org.postgresql postgresql + + software.amazon.awssdk + s3 + com.github.ben-manes.caffeine caffeine diff --git a/sample/configs/prebid-config-s3.yaml b/sample/configs/prebid-config-s3.yaml new file mode 100644 index 00000000000..277ad94613c --- /dev/null +++ b/sample/configs/prebid-config-s3.yaml @@ -0,0 +1,60 @@ +status-response: "ok" + +server: + enable-quickack: true + enable-reuseport: true + +adapters: + appnexus: + enabled: true + ix: + enabled: true + openx: + enabled: true + pubmatic: + enabled: true + rubicon: + enabled: true +metrics: + prefix: prebid +cache: + scheme: http + host: localhost + path: /cache + query: uuid= +settings: + enforce-valid-account: false + generate-storedrequest-bidrequest-id: true + s3: + accessKeyId: prebid-server-test + secretAccessKey: nq9h6whXQURNL2NnWg3rcMlLMtGGDJeWrdl8hC9g + endpoint: http://localhost:9000 + bucket: prebid-server-configs.example.com # prebid-application-settings + force-path-style: true # virtual bucketing + # region: # if not provided AWS_GLOBAL will be used. Example value: 'eu-central-1' + accounts-dir: accounts + stored-imps-dir: stored-impressions + stored-requests-dir: stored-requests + stored-responses-dir: stored-responses + + in-memory-cache: + cache-size: 10000 + ttl-seconds: 1200 # 20 minutes + s3-update: + refresh-rate: 900000 # Refresh every 15 minutes + timeout: 5000 + +gdpr: + default-value: 1 + vendorlist: + v2: + cache-dir: /var/tmp/vendor2 + v3: + cache-dir: /var/tmp/vendor3 + +admin-endpoints: + logging-changelevel: + enabled: true + path: /logging/changelevel + on-application-port: true + protected: false diff --git a/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java b/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java new file mode 100644 index 00000000000..6ae0ec7f887 --- /dev/null +++ b/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java @@ -0,0 +1,210 @@ +package org.prebid.server.settings; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import org.apache.commons.collections4.SetUtils; +import org.prebid.server.auction.model.Tuple2; +import org.prebid.server.exception.PreBidException; +import org.prebid.server.execution.Timeout; +import org.prebid.server.json.DecodeException; +import org.prebid.server.json.JacksonMapper; +import org.prebid.server.settings.model.Account; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.settings.model.StoredResponseDataResult; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Implementation of {@link ApplicationSettings}. + *

+ * Reads an application settings from JSON file in a s3 bucket, stores and serves them in and from the memory. + *

+ * Immediately loads stored request data from local files. These are stored in memory for low-latency reads. + * This expects each file in the directory to be named "{config_id}.json". + */ +public class S3ApplicationSettings implements ApplicationSettings { + + private static final String JSON_SUFFIX = ".json"; + + final S3AsyncClient asyncClient; + final String bucket; + final String accountsDirectory; + final String storedImpressionsDirectory; + final String storedRequestsDirectory; + final String storedResponsesDirectory; + final JacksonMapper jacksonMapper; + final Vertx vertx; + + public S3ApplicationSettings( + S3AsyncClient asyncClient, + String bucket, + String accountsDirectory, + String storedImpressionsDirectory, + String storedRequestsDirectory, + String storedResponsesDirectory, + JacksonMapper jacksonMapper, + Vertx vertx) { + + this.asyncClient = Objects.requireNonNull(asyncClient); + this.bucket = Objects.requireNonNull(bucket); + this.accountsDirectory = Objects.requireNonNull(accountsDirectory); + this.storedImpressionsDirectory = Objects.requireNonNull(storedImpressionsDirectory); + this.storedRequestsDirectory = Objects.requireNonNull(storedRequestsDirectory); + this.storedResponsesDirectory = Objects.requireNonNull(storedResponsesDirectory); + this.jacksonMapper = Objects.requireNonNull(jacksonMapper); + this.vertx = Objects.requireNonNull(vertx); + } + + @Override + public Future getAccountById(String accountId, Timeout timeout) { + return downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX) + .flatMap(fileContentOpt -> fileContentOpt.map(Future::succeededFuture) + .orElseGet(() -> Future.failedFuture( + new PreBidException("Account with id %s not found".formatted(accountId))) + ) + ) + .map(fileContent -> jacksonMapper.decodeValue(fileContent, Account.class)) + .flatMap(account -> { + if (!Objects.equals(account.getId(), accountId)) { + return Future.failedFuture(new PreBidException( + "Account with id %s does not match id %s in file".formatted(accountId, account.getId())) + ); + } + return Future.succeededFuture(account); + }) + .recover(ex -> { + if (ex instanceof DecodeException) { + return Future + .failedFuture( + new PreBidException( + "Invalid json for account with id %s".formatted(accountId))); + } + // if a previous validation already yielded a PreBidException, just return it + if (ex instanceof PreBidException) { + return Future.failedFuture(ex); + } + return Future + .failedFuture(new PreBidException("Account with id %s not found".formatted(accountId))); + }); + } + + @Override + public Future getStoredData( + String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return getFileContents(storedRequestsDirectory, requestIds) + .compose(storedIdToRequest -> getFileContents(storedImpressionsDirectory, impIds) + .map(storedIdToImp -> + buildStoredDataResult(storedIdToRequest, storedIdToImp, requestIds, impIds)) + ); + } + + private StoredDataResult buildStoredDataResult( + Map storedIdToRequest, + Map storedIdToImp, + Set requestIds, + Set impIds + ) { + + final Stream missingStoredRequestIds = + getMissingStoredDataIds(storedIdToRequest, requestIds).stream() + .map("No stored request found for id: %s"::formatted); + final Stream missingStoredImpressionIds = + getMissingStoredDataIds(storedIdToImp, impIds).stream() + .map("No stored impression found for id: %s"::formatted); + + return StoredDataResult.of( + storedIdToRequest, + storedIdToImp, + Stream.concat( + missingStoredImpressionIds, + missingStoredRequestIds).toList()); + } + + private Set getMissingStoredDataIds(Map fileContents, Set responseIds) { + return SetUtils.difference(responseIds, fileContents.keySet()); + } + + @Override + public Future getAmpStoredData( + String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return getStoredData(accountId, requestIds, Collections.emptySet(), timeout); + } + + @Override + public Future getVideoStoredData( + String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return getStoredData(accountId, requestIds, impIds, timeout); + } + + @Override + public Future getStoredResponses(Set responseIds, Timeout timeout) { + return getFileContents(storedResponsesDirectory, responseIds).map(storedIdToResponse -> { + final List missingStoredResponseIds = + getMissingStoredDataIds(storedIdToResponse, responseIds).stream() + .map("No stored response found for id: %s"::formatted).toList(); + + return StoredResponseDataResult.of(storedIdToResponse, missingStoredResponseIds); + }); + } + + @Override + public Future> getCategories(String primaryAdServer, String publisher, Timeout timeout) { + return Future.succeededFuture(Collections.emptyMap()); + } + + private Future> getFileContents(String directory, Set ids) { + return CompositeFuture.join(ids.stream() + .map(impId -> downloadFile(directory + withInitialSlash(impId) + JSON_SUFFIX) + .map(fileContentOpt -> fileContentOpt + .map(fileContent -> Tuple2.of(impId, fileContent)))) + .toList()) + .map(CompositeFuture::>>list) + .map(impIdToFileContent -> impIdToFileContent.stream() + .flatMap(Optional::stream) + .collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight))); + } + + /** + * When the impression id is the ad unit path it may already start with a slash and there's no need to add + * another one. + * + * @param impressionId from the bid request + * @return impression id with only a single slash at the beginning + */ + private static String withInitialSlash(String impressionId) { + return impressionId.startsWith("/") ? impressionId : "/" + impressionId; + } + + private Future> downloadFile(String key) { + final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + return Future.fromCompletionStage( + asyncClient.getObject(request, AsyncResponseTransformer.toBytes()), + vertx.getOrCreateContext()) + .map(test -> Optional.of(test.asUtf8String())).recover(ex -> Future.succeededFuture(Optional.empty())); + } + +} diff --git a/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java b/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java new file mode 100644 index 00000000000..8b31811cdd3 --- /dev/null +++ b/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java @@ -0,0 +1,162 @@ +package org.prebid.server.settings.service; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.prebid.server.auction.model.Tuple2; +import org.prebid.server.log.Logger; +import org.prebid.server.log.LoggerFactory; +import org.prebid.server.metric.MetricName; +import org.prebid.server.metric.Metrics; +import org.prebid.server.settings.CacheNotificationListener; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.vertx.Initializable; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.util.concurrent.atomic.AtomicReference; +import java.time.Clock; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + *

+ * Service that periodically calls s3 for stored request updates. + * If refreshRate is negative, then the data will never be refreshed. + *

+ * Fetches all files from the specified folders/prefixes in s3 and downloads all files. + */ +public class S3PeriodicRefreshService implements Initializable { + + private static final String JSON_SUFFIX = ".json"; + + private static final Logger logger = LoggerFactory.getLogger(S3PeriodicRefreshService.class); + + private final S3AsyncClient asyncClient; + private final String bucket; + private final String storedImpressionsDirectory; + private final String storedRequestsDirectory; + private final long refreshPeriod; + private final MetricName cacheType; + private final CacheNotificationListener cacheNotificationListener; + private final Vertx vertx; + private final Metrics metrics; + private final Clock clock; + private final AtomicReference lastResult; + + public S3PeriodicRefreshService(S3AsyncClient asyncClient, + String bucket, + String storedRequestsDirectory, + String storedImpressionsDirectory, + long refreshPeriod, + MetricName cacheType, + CacheNotificationListener cacheNotificationListener, + Vertx vertx, + Metrics metrics, + Clock clock) { + + this.asyncClient = Objects.requireNonNull(asyncClient); + this.bucket = Objects.requireNonNull(bucket); + this.storedRequestsDirectory = Objects.requireNonNull(storedRequestsDirectory); + this.storedImpressionsDirectory = Objects.requireNonNull(storedImpressionsDirectory); + this.refreshPeriod = refreshPeriod; + this.cacheType = Objects.requireNonNull(cacheType); + this.cacheNotificationListener = Objects.requireNonNull(cacheNotificationListener); + this.vertx = Objects.requireNonNull(vertx); + this.metrics = Objects.requireNonNull(metrics); + this.clock = Objects.requireNonNull(clock); + this.lastResult = new AtomicReference<>(); + } + + @Override + public void initialize(Promise initializePromise) { + final long startTime = clock.millis(); + + fetchStoredDataResult() + .onSuccess(storedDataResult -> handleResult(storedDataResult, startTime, MetricName.initialize)) + .onFailure(exception -> handleFailure(exception, startTime, MetricName.initialize)) + .mapEmpty() + .onComplete(initializePromise); + + if (refreshPeriod > 0) { + logger.info("Starting s3 periodic refresh for " + cacheType + " every " + refreshPeriod + " s"); + vertx.setPeriodic(refreshPeriod, ignored -> refresh()); + } + } + + private Future fetchStoredDataResult() { + return Future.all( + getFileContentsForDirectory(storedRequestsDirectory), + getFileContentsForDirectory(storedImpressionsDirectory)) + .map(CompositeFuture::>list) + .map(results -> StoredDataResult.of(results.getFirst(), results.get(1), Collections.emptyList())); + } + + private void refresh() { + final long startTime = clock.millis(); + + fetchStoredDataResult() + .onSuccess(storedDataResult -> handleResult(storedDataResult, startTime, MetricName.update)) + .onFailure(exception -> handleFailure(exception, startTime, MetricName.update)); + } + + private void handleResult(StoredDataResult storedDataResult, long startTime, MetricName refreshType) { + lastResult.set(storedDataResult); + + cacheNotificationListener.save(storedDataResult.getStoredIdToRequest(), storedDataResult.getStoredIdToImp()); + + metrics.updateSettingsCacheRefreshTime(cacheType, refreshType, clock.millis() - startTime); + } + + private void handleFailure(Throwable exception, long startTime, MetricName refreshType) { + logger.warn("Error occurred while request to s3 refresh service", exception); + + metrics.updateSettingsCacheRefreshTime(cacheType, refreshType, clock.millis() - startTime); + metrics.updateSettingsCacheRefreshErrorMetric(cacheType, refreshType); + } + + private Future> getFileContentsForDirectory(String directory) { + return listFiles(directory) + .map(files -> files.stream().map(this::downloadFile).toList()) + .compose(Future::all) + .map(CompositeFuture::>list) + .map(fileNameToContent -> fileNameToContent.stream() + .collect(Collectors.toMap( + entry -> stripFileName(directory, entry.getLeft()), + Tuple2::getRight))); + } + + private Future> listFiles(String prefix) { + final ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() + .bucket(bucket) + .prefix(prefix) + .build(); + + return Future.fromCompletionStage(asyncClient.listObjects(listObjectsRequest), vertx.getOrCreateContext()) + .map(response -> response.contents().stream() + .map(S3Object::key) + .collect(Collectors.toList())); + } + + private Future> downloadFile(String key) { + final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + return Future.fromCompletionStage( + asyncClient.getObject(request, AsyncResponseTransformer.toBytes()), + vertx.getOrCreateContext()) + .map(content -> Tuple2.of(key, content.asUtf8String())); + } + + private String stripFileName(String directory, String name) { + return name + .replace(directory + "/", "") + .replace(JSON_SUFFIX, ""); + } +} diff --git a/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java b/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java index 1006403c9c4..90f56672b00 100644 --- a/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java +++ b/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java @@ -20,10 +20,12 @@ import org.prebid.server.settings.EnrichingApplicationSettings; import org.prebid.server.settings.FileApplicationSettings; import org.prebid.server.settings.HttpApplicationSettings; +import org.prebid.server.settings.S3ApplicationSettings; import org.prebid.server.settings.SettingsCache; import org.prebid.server.settings.helper.ParametrizedQueryHelper; import org.prebid.server.settings.service.DatabasePeriodicRefreshService; import org.prebid.server.settings.service.HttpPeriodicRefreshService; +import org.prebid.server.settings.service.S3PeriodicRefreshService; import org.prebid.server.spring.config.database.DatabaseConfiguration; import org.prebid.server.vertx.database.DatabaseClient; import org.prebid.server.vertx.httpclient.HttpClient; @@ -37,9 +39,16 @@ import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; - -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotNull; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Clock; import java.util.List; import java.util.Objects; @@ -217,6 +226,106 @@ public DatabasePeriodicRefreshService ampDatabasePeriodicRefreshService( } } + @Configuration + @ConditionalOnProperty(prefix = "settings.s3", name = {"accounts-dir", "stored-imps-dir", "stored-requests-dir"}) + static class S3SettingsConfiguration { + + @Component + @ConfigurationProperties(prefix = "settings.s3") + @ConditionalOnProperty(prefix = "settings.s3", name = {"accessKeyId", "secretAccessKey"}) + @Validated + @Data + @NoArgsConstructor + protected static class S3ConfigurationProperties { + @NotBlank + private String accessKeyId; + @NotBlank + private String secretAccessKey; + /** + * If not provided AWS_GLOBAL will be used as a region + */ + private String region; + @NotBlank + private String endpoint; + @NotBlank + private String bucket; + @NotBlank + private Boolean forcePathStyle; + @NotBlank + private String accountsDir; + @NotBlank + private String storedImpsDir; + @NotBlank + private String storedRequestsDir; + @NotBlank + private String storedResponsesDir; + } + + @Bean + S3AsyncClient s3AsyncClient(S3ConfigurationProperties s3ConfigurationProperties) throws URISyntaxException { + final AwsBasicCredentials credentials = AwsBasicCredentials.create( + s3ConfigurationProperties.getAccessKeyId(), + s3ConfigurationProperties.getSecretAccessKey()); + final String awsRegionName = s3ConfigurationProperties.getRegion(); + final Region awsRegion = awsRegionName != null + ? Region.of(s3ConfigurationProperties.getRegion()) + : Region.AWS_GLOBAL; + return S3AsyncClient + .builder() + .credentialsProvider(StaticCredentialsProvider.create(credentials)) + .endpointOverride(new URI(s3ConfigurationProperties.getEndpoint())) + .forcePathStyle(s3ConfigurationProperties.getForcePathStyle()) + .region(awsRegion) + .build(); + } + + @Bean + S3ApplicationSettings s3ApplicationSettings( + JacksonMapper mapper, + S3ConfigurationProperties s3ConfigurationProperties, + S3AsyncClient s3AsyncClient, + Vertx vertx) { + + return new S3ApplicationSettings( + s3AsyncClient, + s3ConfigurationProperties.getBucket(), + s3ConfigurationProperties.getAccountsDir(), + s3ConfigurationProperties.getStoredImpsDir(), + s3ConfigurationProperties.getStoredRequestsDir(), + s3ConfigurationProperties.getStoredResponsesDir(), + mapper, + vertx); + } + } + + @Configuration + @ConditionalOnProperty(prefix = "settings.in-memory-cache.s3-update", name = {"refresh-rate", "timeout"}) + static class S3PeriodicRefreshServiceConfiguration { + + @Bean + public S3PeriodicRefreshService s3PeriodicRefreshService( + S3AsyncClient s3AsyncClient, + S3SettingsConfiguration.S3ConfigurationProperties s3ConfigurationProperties, + @Value("${settings.in-memory-cache.s3-update.refresh-rate}") long refreshPeriod, + SettingsCache settingsCache, + Vertx vertx, + Metrics metrics, + Clock clock) { + + return new S3PeriodicRefreshService( + s3AsyncClient, + s3ConfigurationProperties.getBucket(), + s3ConfigurationProperties.getStoredRequestsDir(), + s3ConfigurationProperties.getStoredImpsDir(), + refreshPeriod, + MetricName.stored_request, + settingsCache, + vertx, + metrics, + clock); + } + } + /** * This configuration defines a collection of application settings fetchers and its ordering. */ @@ -227,10 +336,12 @@ static class CompositeSettingsConfiguration { CompositeApplicationSettings compositeApplicationSettings( @Autowired(required = false) FileApplicationSettings fileApplicationSettings, @Autowired(required = false) DatabaseApplicationSettings databaseApplicationSettings, - @Autowired(required = false) HttpApplicationSettings httpApplicationSettings) { + @Autowired(required = false) HttpApplicationSettings httpApplicationSettings, + @Autowired(required = false) S3ApplicationSettings s3ApplicationSettings) { final List applicationSettingsList = - Stream.of(fileApplicationSettings, + Stream.of(s3ApplicationSettings, + fileApplicationSettings, databaseApplicationSettings, httpApplicationSettings) .filter(Objects::nonNull) @@ -338,7 +449,7 @@ SettingsCache videoSettingCache(ApplicationSettingsCacheProperties cacheProperti @Validated @Data @NoArgsConstructor - private static class ApplicationSettingsCacheProperties { + protected static class ApplicationSettingsCacheProperties { @NotNull @Min(1) diff --git a/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java b/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java new file mode 100644 index 00000000000..1747b6f3a4f --- /dev/null +++ b/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java @@ -0,0 +1,360 @@ +package org.prebid.server.settings; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.Mock; +import org.prebid.server.VertxTest; +import org.prebid.server.exception.PreBidException; +import org.prebid.server.execution.Timeout; +import org.prebid.server.execution.TimeoutFactory; +import org.prebid.server.settings.model.Account; +import org.prebid.server.settings.model.AccountAuctionConfig; +import org.prebid.server.settings.model.AccountPrivacyConfig; +import org.prebid.server.settings.model.StoredDataResult; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +@ExtendWith({MockitoExtension.class, VertxExtension.class}) +public class S3ApplicationSettingsTest extends VertxTest { + + private static final String BUCKET = "bucket"; + private static final String ACCOUNTS_DIR = "accounts"; + private static final String STORED_IMPS_DIR = "stored-imps"; + private static final String STORED_REQUESTS_DIR = "stored-requests"; + private static final String STORED_RESPONSES_DIR = "stored-responses"; + private Timeout timeout; + + @Mock + private S3AsyncClient s3AsyncClient; + private Vertx vertx; + + private S3ApplicationSettings target; + + @BeforeEach + public void setUp() { + vertx = Vertx.vertx(); + target = new S3ApplicationSettings(s3AsyncClient, BUCKET, ACCOUNTS_DIR, + STORED_IMPS_DIR, STORED_REQUESTS_DIR, STORED_RESPONSES_DIR, jacksonMapper, vertx); + + final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + final TimeoutFactory timeoutFactory = new TimeoutFactory(clock); + timeout = timeoutFactory.create(500L); + } + + @AfterEach + public void tearDown(VertxTestContext context) { + vertx.close(context.succeedingThenComplete()); + } + + @Test + public void getAccountByIdShouldReturnFetchedAccount(VertxTestContext context) throws JsonProcessingException { + // given + final Account account = Account.builder() + .id("someId") + .auction(AccountAuctionConfig.builder() + .priceGranularity("testPriceGranularity") + .build()) + .privacy(AccountPrivacyConfig.builder().build()) + .build(); + + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + mapper.writeValueAsString(account).getBytes()))); + + // when + final Future future = target.getAccountById("someId", timeout); + + // then + + future.onComplete(context.succeeding(returnedAccount -> { + assertThat(returnedAccount.getId()).isEqualTo("someId"); + assertThat(returnedAccount.getAuction().getPriceGranularity()).isEqualTo("testPriceGranularity"); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(ACCOUNTS_DIR + "/someId.json").build()), + any(AsyncResponseTransformer.class)); + context.completeNow(); + })); + } + + @Test + public void getAccountByIdNoSuchKey(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("")))); + + // when + final Future future = target.getAccountById("notFoundId", timeout); + + // then + + future.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Account with id notFoundId not found"); + + context.completeNow(); + })); + } + + @Test + public void getAccountByIdInvalidJson(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "invalidJson".getBytes()))); + + // when + final Future future = target.getAccountById("invalidJsonId", timeout); + + // then + + future.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Invalid json for account with id invalidJsonId"); + context.completeNow(); + })); + } + + @Test + public void getAccountByIdWithAccountIdMismatch(VertxTestContext context) throws JsonProcessingException { + // given + final Account account = Account.builder() + .id("wrong-id") + .auction(AccountAuctionConfig.builder().build()) + .privacy(AccountPrivacyConfig.builder().build()) + .build(); + + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + mapper.writeValueAsString(account).getBytes()))); + + // when + final Future future = target.getAccountById("another-id", timeout); + + // then + future.onComplete(context.failing(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Account with id another-id does not match id wrong-id in file"); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(ACCOUNTS_DIR + "/another-id.json").build()), + any(AsyncResponseTransformer.class)); + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredRequest(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "req1Result".getBytes()))); + + // when + final Future future = target + .getStoredData("someId", Set.of("req1"), Collections.emptySet(), timeout); + + // then + future.onComplete(context.succeeding(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp().size()).isEqualTo(0); + assertThat(account.getStoredIdToRequest()).isEqualTo(Map.of("req1", "req1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_REQUESTS_DIR + "/req1.json").build()), + any(AsyncResponseTransformer.class)); + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpression(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "imp1Result".getBytes()))); + + // when + final Future future = target + .getStoredData("someId", Collections.emptySet(), Set.of("imp1"), timeout); + + // then + future.onComplete(context.succeeding(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(0); + assertThat(account.getStoredIdToImp().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp()).isEqualTo(Map.of("imp1", "imp1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_IMPS_DIR + "/imp1.json").build()), + any(AsyncResponseTransformer.class)); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpressionWithAdUnitPathStoredId(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "imp1Result".getBytes()))); + + // when + final Future future = target + .getStoredData("/123/root/position-1", Collections.emptySet(), Set.of("imp1"), timeout); + + // then + future.onComplete(context.succeeding(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(0); + assertThat(account.getStoredIdToImp().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp()).isEqualTo(Map.of("imp1", "imp1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_IMPS_DIR + "/imp1.json").build()), + any(AsyncResponseTransformer.class)); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpressionAndStoredRequest(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn( + CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "req1Result".getBytes())), + CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "imp1Result".getBytes()))); + + // when + final Future future = target + .getStoredData("someId", Set.of("req1"), Set.of("imp1"), timeout); + + // then + future.onComplete(context.succeeding(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(1); + assertThat(account.getStoredIdToRequest()).isEqualTo(Map.of("req1", "req1Result")); + assertThat(account.getStoredIdToImp().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp()).isEqualTo(Map.of("imp1", "imp1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_IMPS_DIR + "/imp1.json").build()), + any(AsyncResponseTransformer.class)); + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_REQUESTS_DIR + "/req1.json").build()), + any(AsyncResponseTransformer.class)); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataReturnsErrorsForNotFoundRequests(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("")))); + + // when + final Future future = target + .getStoredData("someId", Set.of("req1"), Collections.emptySet(), timeout); + + // then + future.onComplete(context.succeeding(account -> { + assertThat(account.getStoredIdToImp()).isEmpty(); + assertThat(account.getStoredIdToRequest()).isEmpty(); + assertThat(account.getErrors().size()).isEqualTo(1); + assertThat(account.getErrors()) + .isNotNull() + .hasSize(1) + .isEqualTo(singletonList("No stored request found for id: req1")); + + context.completeNow(); + })); + } + + @Test + public void getStoredDataReturnsErrorsForNotFoundImpressions(VertxTestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn( + CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("")))); + + // when + final Future future = target + .getStoredData("someId", Collections.emptySet(), Set.of("imp1"), timeout); + + // then + future.onComplete(context.succeeding(account -> { + assertThat(account.getStoredIdToImp()).isEmpty(); + assertThat(account.getStoredIdToRequest()).isEmpty(); + assertThat(account.getErrors().size()).isEqualTo(1); + assertThat(account.getErrors()) + .isNotNull() + .hasSize(1) + .isEqualTo(singletonList("No stored impression found for id: imp1")); + + context.completeNow(); + })); + } + +} diff --git a/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java b/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java new file mode 100644 index 00000000000..92619c0f320 --- /dev/null +++ b/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java @@ -0,0 +1,212 @@ +package org.prebid.server.settings.service; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.prebid.server.VertxTest; +import org.prebid.server.metric.MetricName; +import org.prebid.server.metric.Metrics; +import org.prebid.server.settings.CacheNotificationListener; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static java.util.Collections.singletonMap; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mock.Strictness.LENIENT; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +@ExtendWith(VertxExtension.class) +public class S3PeriodicRefreshServiceTest extends VertxTest { + + private static final String BUCKET = "bucket"; + private static final String STORED_REQ_DIR = "stored-req"; + private static final String STORED_IMP_DIR = "stored-imp"; + + @Mock + private CacheNotificationListener cacheNotificationListener; + + @Mock + private Vertx vertx; + private Vertx vertxImpl; + + @Mock(strictness = LENIENT) + private S3AsyncClient s3AsyncClient; + private final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + @Mock + private Metrics metrics; + + private final Map expectedRequests = singletonMap("id1", "value1"); + private final Map expectedImps = singletonMap("id2", "value2"); + + @BeforeEach + public void setUp() { + vertxImpl = Vertx.vertx(); + given(s3AsyncClient.listObjects(any(ListObjectsRequest.class))) + .willReturn(listObjectResponse(STORED_REQ_DIR + "/id1.json"), + listObjectResponse(STORED_IMP_DIR + "/id2.json")); + + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "value1".getBytes())), + CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "value2".getBytes()))); + + given(vertx.getOrCreateContext()).willReturn(vertxImpl.getOrCreateContext()); + } + + @AfterEach + public void tearDown(VertxTestContext context) { + vertxImpl.close(context.succeedingThenComplete()); + } + + @Test + public void shouldCallSaveWithExpectedParameters(VertxTestContext context) { + // when + createAndInitService(1000) + .onSuccess(unused -> { + verify(cacheNotificationListener).save(expectedRequests, expectedImps); + }) + .onComplete(context.succeedingThenComplete()); + + } + + @Test + public void initializeShouldMakeOneInitialRequestAndTwoScheduledRequestsWithParam(VertxTestContext context) { + // given + given(vertx.setPeriodic(anyLong(), any())) + .willAnswer(withSelfAndPassObjectToHandler(1L, 2L)); + + // when + createAndInitService(1000) + .onSuccess(unused -> { + // then + verify(s3AsyncClient, times(6)).listObjects(any(ListObjectsRequest.class)); + verify(s3AsyncClient, times(6)).getObject( + any(GetObjectRequest.class), any(AsyncResponseTransformer.class) + ); + }) + .onComplete(context.succeedingThenComplete()); + + } + + @Test + public void initializeShouldMakeOnlyOneInitialRequestIfRefreshPeriodIsNegative(VertxTestContext context) { + // when + createAndInitService(-1) + .onSuccess(unused -> { + // then + verify(vertx, never()).setPeriodic(anyLong(), any()); + verify(s3AsyncClient, times(2)).listObjects(any(ListObjectsRequest.class)); + }) + .onComplete(context.succeedingThenComplete()); + + } + + @Test + public void shouldUpdateTimerMetric(VertxTestContext context) { + // when + createAndInitService(1000) + .onSuccess(unused -> { + // then + verify(metrics).updateSettingsCacheRefreshTime( + eq(MetricName.stored_request), eq(MetricName.initialize), anyLong()); + }) + .onComplete(context.succeedingThenComplete()); + } + + @Test + public void shouldUpdateTimerAndErrorMetric(VertxTestContext context) { + // given + given(s3AsyncClient.listObjects(any(ListObjectsRequest.class))) + .willReturn(CompletableFuture.failedFuture(new IllegalStateException("Failed"))); + + // when + createAndInitService(1000) + .onFailure(unused -> { + // then + verify(metrics).updateSettingsCacheRefreshTime( + eq(MetricName.stored_request), eq(MetricName.initialize), anyLong()); + verify(metrics).updateSettingsCacheRefreshErrorMetric( + eq(MetricName.stored_request), eq(MetricName.initialize)); + }) + .onComplete(context.failingThenComplete()); + + } + + private CompletableFuture listObjectResponse(String... keys) { + return CompletableFuture.completedFuture( + ListObjectsResponse + .builder() + .contents(Arrays.stream(keys).map(key -> S3Object.builder().key(key).build()).toList()) + .build()); + } + + private CompletableFuture> getObjectResponse(String value) { + return CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + value.getBytes())); + } + + private Future createAndInitService(long refreshPeriod) { + final S3PeriodicRefreshService s3PeriodicRefreshService = new S3PeriodicRefreshService( + s3AsyncClient, + BUCKET, + STORED_REQ_DIR, + STORED_IMP_DIR, + refreshPeriod, + MetricName.stored_request, + cacheNotificationListener, + vertx, + metrics, + clock); + final Promise init = Promise.promise(); + s3PeriodicRefreshService.initialize(init); + return init.future(); + } + + @SuppressWarnings("unchecked") + private static Answer withSelfAndPassObjectToHandler(T... objects) { + return inv -> { + // invoking handler right away passing mock to it + for (T obj : objects) { + ((Handler) inv.getArgument(1)).handle(obj); + } + return 0L; + }; + } + +}