Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3/MinIO support for application settings #2733

Closed
wants to merge 57 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
6f82ee8
Add S3/MinIO support for application settings
rmattis Sep 19, 2022
b433a07
Fix checkstyle warning
muuki88 Oct 27, 2023
017c543
Use property for version in pom.xml
muuki88 Dec 21, 2023
b93a281
Specify version for awssdk s3
muuki88 Dec 21, 2023
c163511
Adding requrireNonNull checks in S3ApplicationSettings
muuki88 Dec 21, 2023
a17952c
Recfactor lambda into method for less nesting
muuki88 Dec 21, 2023
d1c6770
Remove Option<String> as map value
muuki88 Dec 21, 2023
c2be492
Add empty lines and make withInitialSlash static
muuki88 Dec 22, 2023
58f47cb
Fix checkstyle issues
muuki88 Dec 22, 2023
5520b54
Add requireNonNull checks
muuki88 Jan 3, 2024
6ab79c9
Check that accountId and file path match
muuki88 Jan 3, 2024
183107a
Fix linting issues
muuki88 Jan 22, 2024
9ad80b5
Fix linting issue #2
muuki88 Jan 22, 2024
c9dd737
Merge branch 'master' into highfivve-github-master
muuki88 Feb 12, 2024
f02c424
Fix pom.xml
muuki88 Mar 4, 2024
79c2f94
Use Set instead of List
muuki88 Mar 4, 2024
75c7ba7
Merge remote-tracking branch 'origin/master' into highfivve-github-ma…
muuki88 Mar 4, 2024
dc0f985
Add empty lines after multi-line parameter function
muuki88 Mar 4, 2024
a18a9b7
Fix compile error in S3ApplicationSettingsTest
muuki88 Mar 4, 2024
67f047c
Remove optional
muuki88 Mar 4, 2024
5ec3976
Remove unused import
muuki88 Mar 9, 2024
c958771
Merge remote-tracking branch 'origin' into highfivve-github-master
muuki88 Mar 9, 2024
a3e350f
Use AccountPrivacyConfig.builder
muuki88 Mar 9, 2024
a0aa64a
GD-7732 handle non existing stored impressions gracefully
muuki88 Mar 25, 2024
53ce17b
GD-7732 Use SetUtils for calculating missing stored impressions
muuki88 Mar 25, 2024
5939670
GD-7732 Use atomic reference and remove timeout
muuki88 Mar 26, 2024
a085ae6
GD-7732 Use SetUtils.difference
muuki88 Mar 26, 2024
62f5096
GD-7732 Use onSuccess/onFailure instead of map/recover
muuki88 Mar 26, 2024
8132c26
GD-7732 Remove redundant Set/Stream/List conversions
muuki88 Mar 26, 2024
754e318
GD-7732 Rename aLong var to ignored
muuki88 Mar 26, 2024
1e40c25
GD-7732 getFileContents runs in parallel
muuki88 Mar 26, 2024
20cdf45
GD-7732 Use CompositeFutura.join instead of all
muuki88 Mar 26, 2024
99f499c
Merge master
muuki88 May 25, 2024
d089b27
Fix compile error in SettingsConfiguration
muuki88 May 25, 2024
f55beff
Remove unused imports
muuki88 May 26, 2024
bdc37f2
Proper initialize implementation
muuki88 May 26, 2024
7702b2c
Merge remote-tracking branch 'origin' into highfivve-github-master
muuki88 Jul 16, 2024
a4469e2
Adding region property
muuki88 Jul 16, 2024
87466a7
Migrate to Junit5 - one test case still broken
muuki88 Jul 18, 2024
9c2b22c
Use prebid logger implementation, mark vars as final and return void …
muuki88 Jul 25, 2024
6a5d1fa
Use proposed refactoring
muuki88 Jul 25, 2024
6a61e58
Use vertx.getOrCreateContext()
muuki88 Jul 25, 2024
70be3ca
Add proposed refactoring
muuki88 Jul 25, 2024
c97e0bf
Add sample config with s3
muuki88 Jul 25, 2024
7ea3745
Update aws depdendency
muuki88 Jul 25, 2024
8655f55
Remove unnecessary beans
muuki88 Jul 26, 2024
5b21d38
Remove invalidate cache logic
muuki88 Jul 26, 2024
f898648
Change private to protected to satisfy IDEA
muuki88 Jul 26, 2024
e89c0f0
Formatting
muuki88 Jul 26, 2024
1229372
remove unused imports
muuki88 Jul 29, 2024
31febb4
Remove vertx.getOrCreateContext() call
muuki88 Aug 19, 2024
cc29721
Revert "Remove vertx.getOrCreateContext() call"
muuki88 Aug 20, 2024
ea36f03
Reintroduce vertx.getOrCreateContext
muuki88 Aug 20, 2024
8f6cddd
hello checkstyle my old friend ...
muuki88 Aug 20, 2024
cda0e5a
Add force-path-style value
muuki88 Aug 22, 2024
332d5d4
Log refresh period
muuki88 Aug 22, 2024
68f2975
Remove println debugging
muuki88 Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/application-settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,50 @@ Here's an example YAML file containing account-specific settings:
default-coop-sync: true
```

## Setting Account Configuration in S3

This is identical to the account configuration in a file system, with the main difference that your file system is
[AWS S3](https://aws.amazon.com/de/s3/) or any S3 compatible storage, such as [MinIO](https://min.io/).


The general idea is that you'll place all the account-specific settings in a separate YAML file and point to that file.

```yaml
settings:
s3:
accessKeyId: <S3 access key>
secretAccessKey: <S3 access key>
endpoint: <endpoint> # http://s3.storage.com
bucket: <bucket name> # prebid-application-settings
accounts-dir: accounts
stored-imps-dir: stored-impressions
stored-requests-dir: stored-requests
stored-responses-dir: stored-responses

# recommended to configure an in memory cache, but this is optional
in-memory-cache:
# example settings, tailor to your needs
cache-size: 100000
ttl-seconds: 1200 # 20 minutes
# recommended to configure
s3-update:
refresh-rate: 900000 # Refresh every 15 minutes
timeout: 5000
```

### File format

We recommend using the `json` format for your account configuration. A minimal configuration may look like this.

```json
{
"id" : "979c7116-1f5a-43d4-9a87-5da3ccc4f52c",
"status" : "active"
}
```

This pairs nicely if you have a default configuration defined in your prebid server config under `settings.default-account-config`.

## Setting Account Configuration in the Database

In database approach account properties are stored in database table(s).
Expand Down
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.17.274</version>
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -275,6 +282,10 @@
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
Expand Down
205 changes: 205 additions & 0 deletions src/main/java/org/prebid/server/settings/S3ApplicationSettings.java
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. There are some things that need to be addressed:

  • support passed Timeout object
  • reconsider getStoredData implementation (currently it works sequentially: storedReq THEN storedImp, but you need to download it simultaneously)
  • reimplement getMissingStoredDataIds method (in terms of performance)
  • refactor some parts

Below you can find general idea:

  • Timeout: timer created with vertx.setTimer is reused for multiple Futures, which is preferable due to less jvm pollution.
  • getStoredData: storedReqs & storedImps are now downloaded simultaneously.
  • getMissingStoredDataIds: used much faster SetView implementation
  • Optionals removed to make code easier to read ( At least it seems to me that it has become “cleaner” :) )
    @Override
    public Future<Account> getAccountById(String accountId, Timeout timeout) {
        return withTimeout(() -> downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX), timeout)
                .map(fileContent -> decodeAccount(fileContent, accountId));
    }

    private Account decodeAccount(String fileContent, String requestedAccountId) {
        if (fileContent == null) {
            throw new PreBidException("Account with id %s not found".formatted(requestedAccountId));
        }

        final Account account;
        try {
            account = jacksonMapper.decodeValue(fileContent, Account.class);
        } catch (DecodeException e) {
            throw new PreBidException("Invalid json for account with id %s".formatted(requestedAccountId));
        }

        validateAccount(account, requestedAccountId);
        return account;
    }

    private static void validateAccount(Account account, String requestedAccountId) {
        final String receivedAccountId = account != null ? account.getId() : null;
        if (!StringUtils.equals(receivedAccountId, requestedAccountId)) {
            throw new PreBidException(
                    "Account with id %s does not match id %s in file".formatted(requestedAccountId, receivedAccountId));
        }
    }

    @Override
    public Future<StoredDataResult> getStoredData(String accountId,
                                                  Set<String> requestIds,
                                                  Set<String> impIds,
                                                  Timeout timeout) {

        return withTimeout(
                () -> CompositeFuture.all(
                        getFileContents(storedRequestsDirectory, requestIds),
                        getFileContents(storedImpressionsDirectory, impIds)),
                timeout)
                .map(results -> buildStoredDataResult(
                        results.resultAt(0),
                        results.resultAt(1),
                        requestIds,
                        impIds));
    }

    private StoredDataResult buildStoredDataResult(Map<String, String> storedIdToRequest,
                                                   Map<String, String> storedIdToImp,
                                                   Set<String> requestIds,
                                                   Set<String> impIds) {

        final List<String> errors = Stream
                .concat(
                        missingStoredDataIds(storedIdToImp, impIds).stream()
                                .map("No stored impression found for id: %s"::formatted),
                        missingStoredDataIds(storedIdToRequest, requestIds).stream()
                                .map("No stored request found for id: %s"::formatted))
                .toList();

        return StoredDataResult.of(
                storedIdToRequest,
                storedIdToImp,
                errors);
    }

    private Set<String> missingStoredDataIds(Map<String, String> fileContents, Set<String> responseIds) {
        return SetUtils.difference(responseIds, fileContents.keySet());
    }

    @Override
    public Future<StoredDataResult> getAmpStoredData(String accountId,
                                                     Set<String> requestIds,
                                                     Set<String> impIds,
                                                     Timeout timeout) {

        return getStoredData(accountId, requestIds, Collections.emptySet(), timeout);
    }

    @Override
    public Future<StoredDataResult> getVideoStoredData(String accountId,
                                                       Set<String> requestIds,
                                                       Set<String> impIds,
                                                       Timeout timeout) {

        return getStoredData(accountId, requestIds, impIds, timeout);
    }

    @Override
    public Future<StoredResponseDataResult> getStoredResponses(Set<String> responseIds, Timeout timeout) {
        return withTimeout(() -> getFileContents(storedResponsesDirectory, responseIds), timeout)
                .map(storedIdToResponse -> StoredResponseDataResult.of(
                        storedIdToResponse,
                        missingStoredDataIds(storedIdToResponse, responseIds).stream()
                                .map("No stored response found for id: %s"::formatted)
                                .toList()));
    }

    @Override
    public Future<Map<String, String>> getCategories(String primaryAdServer, String publisher, Timeout timeout) {
        return Future.succeededFuture(Collections.emptyMap());
    }

    private Future<Map<String, String>> getFileContents(String directory, Set<String> ids) {
        return CompositeFuture.all(ids.stream()
                        .<Future>map(impId -> downloadFile(directory + withInitialSlash(impId) + JSON_SUFFIX)
                                .map(fileContent -> Tuple2.of(impId, fileContent)))
                        .toList())
                .map(CompositeFuture::<Tuple2<String, String>>list)
                .map(impIdToFileContent -> impIdToFileContent.stream()
                        .filter(tuple -> tuple.getRight() != null)
                        .collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight)));
    }

    /**
     * When the impression id is the ad unit path it may already start with a slash and there's no need to add
     * another one.
     *
     * @param impressionId from the bid request
     * @return impression id with only a single slash at the beginning
     */
    private static String withInitialSlash(String impressionId) {
        return impressionId.startsWith("/") ? impressionId : "/" + impressionId;
    }

    private Future<String> downloadFile(String key) {
        final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();

        return Future.fromCompletionStage(
                        asyncClient.getObject(request, AsyncResponseTransformer.toBytes()),
                        vertx.getOrCreateContext())
                .map(BytesWrapper::asUtf8String)
                .otherwiseEmpty();
    }

    private <T> Future<T> withTimeout(Supplier<Future<T>> futureFactory, Timeout timeout) {
        final long remainingTime = timeout.remaining();
        if (remainingTime <= 0L) {
            return Future.failedFuture(new TimeoutException("Timeout has been exceeded"));
        }

        final Promise<T> promise = Promise.promise();
        final Future<T> future = futureFactory.get();

        final long timerId = vertx.setTimer(
                remainingTime, id -> promise.tryFail(new TimeoutException("Timeout has been exceeded")));

        future.onComplete(result -> {
            vertx.cancelTimer(timerId);
            if (result.succeeded()) {
                promise.tryComplete(result.result());
            } else {
                promise.tryFail(result.cause());
            }
        });

        return promise.future();
    }

Copy link
Contributor Author

@muuki88 muuki88 Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @CTMBNara

Thanks for the detailed feedback and the given code example 😃

support passed Timeout object

I would highly argue against this for a couple of reasons

  1. The JdbcApplicationSettings do not have this either, so adding it here would open too paths on how timeouts should be handled
  2. Everything is put inside an in memory cache anyways and without the stored impressions a request usually doesn't make any sense.
  3. Timeouts should be configured in the AWS client itself IMHO so that everything from connection reset and request aborting is handled properly. https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations

reconsider getStoredData implementation (currently it works sequentially: storedReq THEN storedImp, but you need to download it simultaneously)

That's probably a good thing to do, but the effect will probably be minimal as everything is cached and there are merely any requests be sent out.

Still I'm trying to implement this.

reimplement getMissingStoredDataIds method (in terms of performance)

Just added this. Thanks for the code 👍

Optionals removed to make code easier to read ( At least it seems to me that it has become “cleaner” :) )

In the end I comply with the standards you set for this project, but I never looked back at null after switching from Java to Scala 10 years ago. My first contributions to prebid-server-java where bug fixes for null pointer exceptions. I understand that Optional in java is a bit more cumbersome to write, but in my experience it's a lot easier to maintain.

If you want null you get null 😂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the first one: In JdbcApplicationSettings timeout is passed to BasicJdbcClient which actually utilizes it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the second one: why do sequentially things that can be done in parallel? Its fairly easy change and can give us result much faster. Thus reduce amount of live requests and their objects that live in ram and put pressure on gc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the third one: this is not applicable for prebid server, we aim to support high thoughput and low latency, so we need to adjust timeouts on per-request basis to ensure that we fulfill SLA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the first one: In JdbcApplicationSettings timeout is passed to BasicJdbcClient which actually utilizes it.

Exactly my point. The JdbcClient is configured. The same should here be the case. We configure the S3Client. Not the application settings implementation.

Regarding the second one: why do sequentially things that can be done in parallel? Its fairly easy change and can give us result much faster. Thus reduce amount of live requests and their objects that live in ram and put pressure on gc.

From my experience parallel doesn't necessary equals faster. Number of cores, available RAM and number of Futures on the stack determine the performance. Still I copied your code example and things should now run in parallel.

Regarding the third one: this is not applicable for prebid server, we aim to support high thoughput and low latency, so we need to adjust timeouts on per-request basis to ensure that we fulfill SLA.

That's totally fine. Then we should do it the same way as the JdbcSettings do it. Build a wrapper around the S3Client and use this instead. I don't feel competent enough to do this, though. I have no Vert.x experience and no idea on how to test this.

Is this something that can be added in a follow up PR?

muuki88 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

@CTMBNara CTMBNara Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one more important point to explore. :)

In the above implementation, we don't tell the CompletableFutures (from the S3AsyncClient) that the timeout has expired and their result is no longer needed. Perhaps we should notify them, but to do that we need to take a deep dive into the AWS library and see if notifying the futures makes more than just conceptual sense.

Guys, what do you think? @muuki88 @SerhiiNahornyi @Net-burst @And1sS @AntoxaAntoxic

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code example:

    private Future<List<String>> downloadFiles(List<String> keys, Timeout timeout) {
        final long remainingTime = timeout.remaining();
        if (remainingTime <= 0L) {
            return Future.failedFuture(new TimeoutException("Timeout has been exceeded"));
        }

        final List<CompletableFuture<ResponseBytes<GetObjectResponse>>> completableFutures = new ArrayList<>();
        for (String key : keys) {
            final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();
            completableFutures.add(asyncClient.getObject(request, AsyncResponseTransformer.toBytes()));
        }

        final Promise<CompositeFuture> promise = Promise.promise();
        final Context context = vertx.getOrCreateContext();
        final CompositeFuture compositeFuture = CompositeFuture.all(completableFutures.stream()
                .<Future>map(completableFuture -> Future
                        .fromCompletionStage(completableFuture, context)
                        .map(BytesWrapper::asUtf8String)
                        .otherwiseEmpty())
                .toList());

        final long timerId = vertx.setTimer(remainingTime, id -> {
            final TimeoutException exception = new TimeoutException("Timeout has been exceeded");

            promise.tryFail(exception);
            completableFutures.forEach(completableFuture -> completableFuture.completeExceptionally(exception));
        });

        compositeFuture.onComplete(result -> {
            vertx.cancelTimer(timerId);
            if (result.succeeded()) {
                promise.tryComplete(result.result());
            } else {
                promise.tryFail(result.cause());
            }
        });

        return promise.future().map(CompositeFuture::list);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS SDK 2.0 seems to have a way to plugin your own HTTP implementation.
https://github.com/aws/aws-sdk-java-v2/#using-the-sdk

There are a bunch of pre defined integrations: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration.html

This is something that we probably should flesh out - how to configure the http client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found this library, but it's rather small and unofficial: https://github.com/reactiverse/aws-sdk

It provides a vertx http client implementation for the aws sdk. What do you guys think?

Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package org.prebid.server.settings;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.prebid.server.auction.model.Tuple2;
import org.prebid.server.exception.PreBidException;
import org.prebid.server.execution.Timeout;
import org.prebid.server.json.DecodeException;
import org.prebid.server.json.JacksonMapper;
import org.prebid.server.settings.model.Account;
import org.prebid.server.settings.model.StoredDataResult;
import org.prebid.server.settings.model.StoredResponseDataResult;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Implementation of {@link ApplicationSettings}.
* <p>
* Reads an application settings from JSON file in an s3 bucket, stores and serves them in and from the memory.
* <p>
* Immediately loads stored request data from local files. These are stored in memory for low-latency reads.
* This expects each file in the directory to be named "{config_id}.json".
*/
public class S3ApplicationSettings implements ApplicationSettings {

private static final String JSON_SUFFIX = ".json";

final S3AsyncClient asyncClient;
final String bucket;
final String accountsDirectory;
final String storedImpressionsDirectory;
final String storedRequestsDirectory;
final String storedResponsesDirectory;
final JacksonMapper jacksonMapper;
final Vertx vertx;

public S3ApplicationSettings(
S3AsyncClient asyncClient,
String bucket,
String accountsDirectory,
String storedImpressionsDirectory,
String storedRequestsDirectory,
String storedResponsesDirectory,
JacksonMapper jacksonMapper,
Vertx vertx) {
this.asyncClient = asyncClient;
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
this.bucket = bucket;
this.accountsDirectory = accountsDirectory;
this.storedImpressionsDirectory = storedImpressionsDirectory;
this.storedRequestsDirectory = storedRequestsDirectory;
this.storedResponsesDirectory = storedResponsesDirectory;
this.jacksonMapper = jacksonMapper;
this.vertx = vertx;
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Future<Account> getAccountById(String accountId, Timeout timeout) {
return downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX)
.map(fileContentOpt ->
fileContentOpt.map(fileContent -> jacksonMapper.decodeValue(fileContent, Account.class)))
.compose(accountOpt -> {
if (accountOpt.isPresent()) {
return Future.succeededFuture(accountOpt.get());
} else {
return Future
.failedFuture(new PreBidException("Account with id %s not found".formatted(accountId)));
}
})
.recover(ex -> {
if (ex instanceof DecodeException) {
return Future
.failedFuture(
new PreBidException(
"Invalid json for account with id %s".formatted(accountId)));
}
return Future
.failedFuture(new PreBidException("Account with id %s not found".formatted(accountId)));
});
}

@Override
public Future<StoredDataResult> getStoredData(
String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {

return getFileContents(storedRequestsDirectory, requestIds).compose(storedIdToRequest ->
getFileContents(storedImpressionsDirectory, impIds)
.map(storedIdToImp -> {
final List<String> missingStoredRequestIds =
getMissingStoredDataIds(storedIdToRequest).stream()
.map("No stored request found for id: %s"::formatted).toList();
final List<String> missingStoredImpressionIds =
getMissingStoredDataIds(storedIdToImp).stream()
.map("No stored impression found for id: %s"::formatted).toList();

return StoredDataResult.of(
filterOptionalFileContent(storedIdToRequest),
filterOptionalFileContent(storedIdToImp),
Stream.concat(
missingStoredImpressionIds.stream(),
missingStoredRequestIds.stream()).toList());
}
));
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
}

private Map<String, String> filterOptionalFileContent(Map<String, Optional<String>> fileContents) {
return fileContents
.entrySet()
.stream()
.filter(e -> e.getValue().isPresent())
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
}
muuki88 marked this conversation as resolved.
Show resolved Hide resolved

private List<String> getMissingStoredDataIds(Map<String, Optional<String>> fileContents) {
return fileContents.entrySet().stream().filter(e -> e.getValue().isEmpty()).map(Map.Entry::getKey).toList();
}
muuki88 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public Future<StoredDataResult> getAmpStoredData(
String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {
return getStoredData(accountId, requestIds, Collections.emptySet(), timeout);
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Future<StoredDataResult> getVideoStoredData(
String accountId,
Set<String> requestIds,
Set<String> impIds,
Timeout timeout) {
return getStoredData(accountId, requestIds, impIds, timeout);
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Future<StoredResponseDataResult> getStoredResponses(Set<String> responseIds, Timeout timeout) {
return getFileContents(storedResponsesDirectory, responseIds).map(storedIdToResponse -> {
final List<String> missingStoredResponseIds =
getMissingStoredDataIds(storedIdToResponse).stream()
.map("No stored response found for id: %s"::formatted).toList();

return StoredResponseDataResult.of(
filterOptionalFileContent(storedIdToResponse),
missingStoredResponseIds
);
});
}

@Override
public Future<Map<String, String>> getCategories(String primaryAdServer, String publisher, Timeout timeout) {
return Future.succeededFuture(Collections.emptyMap());
}

private Future<Map<String, Optional<String>>> getFileContents(String directory, Set<String> ids) {
final List<Future<Tuple2<String, Optional<String>>>> futureListContents = ids.stream()
.map(impressionId ->
downloadFile(directory + withInitialSlash(impressionId) + JSON_SUFFIX)
.map(fileContent -> Tuple2.of(impressionId, fileContent)))
.collect(Collectors.toCollection(ArrayList::new));

final Future<List<Tuple2<String, Optional<String>>>> composedFutures = CompositeFuture
.all(new ArrayList<>(futureListContents))
.map(CompositeFuture::list);

return composedFutures.map(one -> one.stream().collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight)));
}

/**
* When the impression id is the ad unit path it may already start with a slash and there's no need to add
* another one.
*
* @param impressionId from the bid request
* @return impression id with only a single slash at the beginning
*/
private String withInitialSlash(String impressionId) {
muuki88 marked this conversation as resolved.
Show resolved Hide resolved
if (impressionId.startsWith("/")) {
return impressionId;
}
return "/" + impressionId;
}

private Future<Optional<String>> downloadFile(String key) {
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();

return Future.fromCompletionStage(
asyncClient.getObject(request, AsyncResponseTransformer.toBytes()),
vertx.getOrCreateContext())
.map(test -> Optional.of(test.asUtf8String())).recover(ex -> Future.succeededFuture(Optional.empty()));
}
muuki88 marked this conversation as resolved.
Show resolved Hide resolved

}
Loading
Loading