Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HelixAccountService: limit the number of versions in the list #1252

Merged
merged 7 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ public class HelixAccountService implements AccountService {
this.backupFileManager = new BackupFileManager(this.accountServiceMetrics, config);
AccountMetadataStore backFillStore = null;
if (config.useNewZNodePath) {
accountMetadataStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, false);
accountMetadataStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, false,
config.totalNumberOfVersionToKeep);
// postpone initializeFetchAndSchedule to setupRouter function.
} else {
accountMetadataStore = new LegacyMetadataStore(this.accountServiceMetrics, backupFileManager, helixStore);
initialFetchAndSchedule();
if (config.backFillAccountsToNewZNode) {
backFillStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, true);
backFillStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, true,
config.totalNumberOfVersionToKeep);
}
}
this.backFillStore = backFillStore;
Expand Down
158 changes: 99 additions & 59 deletions ambry-account/src/main/java/com/github/ambry/account/RouterStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.HelixPropertyStore;
import org.json.JSONException;
Expand Down Expand Up @@ -61,8 +65,8 @@ class RouterStore extends AccountMetadataStore {
private static final Short CONTAINER_ID = Container.HELIX_ACCOUNT_SERVICE_CONTAINER_ID;
private static final String SERVICE_ID = "helixAccountService";

private final int totalNumberOfVersionToKeep;
private final AtomicReference<Router> router;

// If forBackFill is true, then when updating the account metadata, we don't create backup files and we don't merge
// accounts from ambry-server with the provided accounts set.
private final boolean forBackFill;
Expand All @@ -74,12 +78,15 @@ class RouterStore extends AccountMetadataStore {
* @param helixStore The {@link HelixPropertyStore} to fetch and update data.
* @param router The {@link Router} instance to retrieve and put blobs.
* @param forBackFill True if this {@link RouterStore} is created for backfill accounts to new zookeeper node.
* @param totalNumberOfVersionToKeep The total number of previous versions of account metadata to keep.
*/
RouterStore(AccountServiceMetrics accountServiceMetrics, BackupFileManager backupFileManager,
HelixPropertyStore<ZNRecord> helixStore, AtomicReference<Router> router, boolean forBackFill) {
HelixPropertyStore<ZNRecord> helixStore, AtomicReference<Router> router, boolean forBackFill,
int totalNumberOfVersionToKeep) {
super(accountServiceMetrics, backupFileManager, helixStore, ACCOUNT_METADATA_BLOB_IDS_PATH);
this.router = router;
this.forBackFill = forBackFill;
this.totalNumberOfVersionToKeep = totalNumberOfVersionToKeep;
}

@Override
Expand All @@ -88,20 +95,18 @@ Map<String, String> fetchAccountMetadataFromZNRecord(ZNRecord record) {
logger.error("Router is not yet initialized");
return null;
}
List<String> accountBlobIDs = record.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
if (accountBlobIDs == null || accountBlobIDs.size() == 0) {
List<String> blobIDAndVersionsJson = record.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
if (blobIDAndVersionsJson == null || blobIDAndVersionsJson.size() == 0) {
logger.info("ZNRecord={} to read on path={} does not have a simple list with key={}", record,
ACCOUNT_METADATA_BLOB_IDS_PATH, ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
return null;
} else {
// parse the json string list and get the blob id with the latest version
BlobIDAndVersion blobIDAndVersion = null;
for (String accountBlobIDInJson : accountBlobIDs) {
BlobIDAndVersion current = BlobIDAndVersion.fromJson(accountBlobIDInJson);
if (blobIDAndVersion == null || blobIDAndVersion.version < current.version) {
blobIDAndVersion = current;
}
}
// Parse the json string list and get the blob id with the latest version
// Since the the blobIDAndVersionsJson.size() is greater than 0, max will return an optional with solid value.
BlobIDAndVersion blobIDAndVersion = blobIDAndVersionsJson.stream()
.map(json -> BlobIDAndVersion.fromJson(json))
.max(Comparator.comparing(BlobIDAndVersion::getVersion))
.get();

logger.trace("Start reading remote account data from blob {} and versioned at {}.", blobIDAndVersion.blobID,
blobIDAndVersion.version);
Expand Down Expand Up @@ -163,6 +168,16 @@ static String writeAccountMapToRouter(Map<String, String> accountMap, Router rou
return router.putBlob(properties, null, channel, PutBlobOptions.DEFAULT).get();
}

/**
* Helper function to log the error message out and throw an {@link IllegalStateException}.
* @param errorMessage The error message.
* @param cause The cause exception.
*/
private void logAndThrowIllegalStateException(String errorMessage, Exception cause) {
logger.error(errorMessage, cause);
throw new IllegalStateException(errorMessage, cause);
}

/**
* A {@link DataUpdater} to be used for updating {@link #ACCOUNT_METADATA_BLOB_IDS_PATH} inside of
* {@link #updateAccounts(Collection)}
Expand All @@ -180,12 +195,15 @@ private class ZKUpdater implements AccountMetadataStore.ZKUpdater {

@Override
public ZNRecord update(ZNRecord znRecord) {
// There are several steps to finish an update
// 1. Fetch the list from the ZNRecord
// 2. Fetch the AccountMetadata from the blob id if the list exist in the ZNRecord
// 3. Construct a new AccountMetadata
// 4. save it as a blob in the ambry server
// 5. Add the new blob id back to the list.
/**
* There are several steps to finish an update
* 1. Fetch the list from the ZNRecord
* 2. Fetch the AccountMetadata from the blob id if the list exist in the ZNRecord
* 3. Construct a new AccountMetadata
* 4. Save it as a blob in the ambry server
* 5. Remove oldest version if number of version exceeds the maximum value.
* 6. Add the new blob id back to the list.
*/

// Start step 1:
ZNRecord recordToUpdate;
Expand All @@ -197,76 +215,68 @@ public ZNRecord update(ZNRecord znRecord) {
} else {
recordToUpdate = znRecord;
}

String errorMessage = null;
List<String> accountBlobIDs = recordToUpdate.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
// This is the version number for the new blob id.
int newVersion = 1;
List<String> blobIDAndVersionsJson = recordToUpdate.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
List<BlobIDAndVersion> blobIDAndVersions = new ArrayList<>();
Map<String, String> accountMap = null;
if (accountBlobIDs != null && accountBlobIDs.size() != 0) {
// parse the json string list and get the blob id with the latest version
if (blobIDAndVersionsJson != null && blobIDAndVersionsJson.size() != 0) {
try {
BlobIDAndVersion blobIDAndVersion = null;
for (String accountBlobIDInJson : accountBlobIDs) {
BlobIDAndVersion current = BlobIDAndVersion.fromJson(accountBlobIDInJson);
if (blobIDAndVersion == null || blobIDAndVersion.version < current.version) {
blobIDAndVersion = current;
}
}
// Parse the json string list and get the BlobIDAndVersion with the latest version number.
blobIDAndVersionsJson.stream()
.forEach(accountBlobIDInJson -> blobIDAndVersions.add(BlobIDAndVersion.fromJson(accountBlobIDInJson)));
Collections.sort(blobIDAndVersions, Comparator.comparing(BlobIDAndVersion::getVersion));
BlobIDAndVersion blobIDAndVersion = blobIDAndVersions.get(blobIDAndVersions.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this piece of code has same logic (correct me if I am wrong) with line106-109. If yes, can we make them consistent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the difference is that one need to sort the list and the other only has to get the latest blobIDAndVersion. I figure it's too small to create a function for.

newVersion = blobIDAndVersion.version + 1;

// Start Step 2:
// if this is not for backfill, then just read account metadata from blob
// If this is not for backfill, then just read account metadata from blob, otherwise, initialize it with
// an empty map and fill it up with the accountMap passed to constructor.
accountMap = (!forBackFill) ? readAccountMetadataFromBlobID(blobIDAndVersion.blobID) : new HashMap<>();
// make this list mutable
accountBlobIDs = new ArrayList<>(accountBlobIDs);
// Make this list mutable
blobIDAndVersionsJson = new ArrayList<>(blobIDAndVersionsJson);
} catch (JSONException e) {
accountServiceMetrics.remoteDataCorruptionErrorCount.inc();
errorMessage = "Exception occurred when parsing the blob id list from " + accountBlobIDs;
logger.error(errorMessage);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Exception occurred when parsing the blob id list from " + blobIDAndVersionsJson, e);
} catch (Exception e) {
errorMessage = "Unexpected exception occurred when parsing the blob id list from " + accountBlobIDs;
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Unexpected exception occurred when parsing the blob id list from " + blobIDAndVersionsJson, e);
}
}
// This ZNRecord doesn't exist when first time we update this ZNRecord, thus accountMap will be null.
if (accountMap == null) {
accountMap = new HashMap<>();
accountBlobIDs = new ArrayList<>();
blobIDAndVersionsJson = new ArrayList<>();
}

if (!forBackFill) {
// Start step 3:
AccountInfoMap localAccountInfoMap;
AccountInfoMap localAccountInfoMap = null;
try {
localAccountInfoMap = new AccountInfoMap(accountServiceMetrics, accountMap);
} catch (JSONException e) {
accountServiceMetrics.remoteDataCorruptionErrorCount.inc();
errorMessage = "Exception occurred when building AccountInfoMap from accountMap " + accountMap;
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Exception occurred when building AccountInfoMap from accountMap " + accountMap, e);
}

// if there is any conflict with the existing record, fail the update. Exception thrown in this updater will
// If there is any conflict with the existing record, fail the update. Exception thrown in this updater will
// be caught by Helix and helixStore#update will return false.
if (localAccountInfoMap.hasConflictingAccount(this.accounts)) {
// Throw exception, so that helixStore can capture and terminate the update operation
errorMessage = "Updating accounts failed because one account to update conflicts with existing accounts";
logger.error(errorMessage);
throw new IllegalStateException(errorMessage);
logAndThrowIllegalStateException(
"Updating accounts failed because one account to update conflicts with existing accounts", null);
}
}

for (Account account : this.accounts) {
try {
accountMap.put(String.valueOf(account.getId()), account.toJson(true).toString());
} catch (Exception e) {
errorMessage = "Updating accounts failed because unexpected exception occurred when updating accountId="
+ account.getId() + " accountName=" + account.getName();
// Do not depend on Helix to log, so log the error message here.
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Updating accounts failed because unexpected exception occurred when updating accountId="
+ account.getId() + " accountName=" + account.getName(), e);
}
}

Expand All @@ -277,15 +287,33 @@ public ZNRecord update(ZNRecord znRecord) {
accountServiceMetrics.accountUpdateToAmbryTimeInMs.update(System.currentTimeMillis() - startTimeMs);
} catch (Exception e) {
accountServiceMetrics.accountUpdatesToAmbryServerErrorCount.inc();
errorMessage =
"Updating accounts failed because unexpected error occurred when uploading AccountMetadata to ambry";
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Updating accounts failed because unexpected error occurred when uploading AccountMetadata to ambry", e);
}

// Start step 5:
accountBlobIDs.add(new BlobIDAndVersion(this.newBlobID, newVersion).toJson());
recordToUpdate.setListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY, accountBlobIDs);
if (blobIDAndVersions.size() + 1 > totalNumberOfVersionToKeep) {
Iterator<BlobIDAndVersion> iter = blobIDAndVersions.iterator();
while (blobIDAndVersions.size() + 1 > totalNumberOfVersionToKeep) {
BlobIDAndVersion blobIDAndVersion = iter.next();
iter.remove();
logger.info("Removing blob " + blobIDAndVersion.getBlobID() + " at version " + blobIDAndVersion.getVersion());
try {
router.get().deleteBlob(blobIDAndVersion.getBlobID(), SERVICE_ID).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any edge cases where we would not want to delete the oldest blob from within the update logic.
The main case to consider is probably where the update to zookeeper does not succeed. In this case we would have deleted a blob in ambry that could still exist in the current znode.

However, this may not be a problem if totalNumberOfVersionsToKeep is sufficiently large that the chance of someone else still interested in reading that blob is very small.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete the blobs after successfully updates zookeeper node.

} catch (Exception e) {
logger.error("Failed to delete blob={} from older version because of {}", blobIDAndVersion.getBlobID(), e);
accountServiceMetrics.accountDeletesToAmbryServerErrorCount.inc();
}
}
// Clear json string list since it's not sorted.
blobIDAndVersionsJson.clear();
blobIDAndVersionsJson.addAll(
blobIDAndVersions.stream().map(BlobIDAndVersion::toJson).collect(Collectors.toList()));
}

// Start step 6:
blobIDAndVersionsJson.add(new BlobIDAndVersion(this.newBlobID, newVersion).toJson());
recordToUpdate.setListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY, blobIDAndVersionsJson);
return recordToUpdate;
}

Expand Down Expand Up @@ -352,6 +380,18 @@ public String getBlobID() {
return blobID;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof BlobIDAndVersion)) {
return false;
}
BlobIDAndVersion other = (BlobIDAndVersion) o;
return blobID.equals(other.blobID) && version == other.version;
}

/**
* Deserialize a string that carries a json object to an {@link BlobIDAndVersion}.
* @param json The string that carries a json object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class HelixAccountServiceTest {
private static final String BAD_ACCOUNT_METADATA_STRING = "badAccountMetadataString";
private static final int NUM_REF_ACCOUNT = 10;
private static final int NUM_CONTAINER_PER_ACCOUNT = 4;
private static final int TOTAL_NUMBER_OF_VERSION_TO_KEEP = 100;
private static final Map<Short, Account> idToRefAccountMap = new HashMap<>();
private static final Map<Short, Map<Short, Container>> idToRefContainerMap = new HashMap<>();
private final Properties helixConfigProps = new Properties();
Expand Down Expand Up @@ -880,7 +881,7 @@ public void testFillAccountsToNewZNode() throws Exception {
HelixAccountService helixAccountService = (HelixAccountService) accountService;
RouterStore routerStore =
new RouterStore(helixAccountService.getAccountServiceMetrics(), helixAccountService.getBackupFileManager(),
helixStore, new AtomicReference<>(mockRouter), false);
helixStore, new AtomicReference<>(mockRouter), false, TOTAL_NUMBER_OF_VERSION_TO_KEEP);
Map<String, String> accountMap = routerStore.fetchAccountMetadata();
assertNotNull("Accounts should be backfilled to new znode", accountMap);
assertAccountMapEquals(accountService.getAllAccounts(), accountMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -77,7 +76,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call
BlobInfoAndData blob = allBlobs.get(blobId);
FutureResult<GetBlobResult> future = new FutureResult<>();
if (blob == null) {
Exception e = new ExecutionException(new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist));
Exception e = new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist);
future.done(null, e);
if (callback != null) {
callback.onCompletion(null, e);
Expand Down Expand Up @@ -146,7 +145,7 @@ public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> c
FutureResult<Void> future = new FutureResult<>();
BlobInfoAndData blob = allBlobs.get(blobId);
if (blob == null) {
Exception e = new ExecutionException(new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist));
Exception e = new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist);
future.done(null, e);
if (callback != null) {
callback.onCompletion(null, e);
Expand All @@ -171,6 +170,12 @@ public Future<Void> updateBlobTtl(String blobId, String serviceId, long expiresA

@Override
public void close() throws IOException {
return;
// close will remove all the blobs
lock.lock();
try {
allBlobs.clear();
} finally {
lock.unlock();
}
}
}
Loading