Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HelixAccountService: limit the number of versions in the list #1252

Merged
merged 7 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ public class HelixAccountService implements AccountService {
this.backupFileManager = new BackupFileManager(this.accountServiceMetrics, config);
AccountMetadataStore backFillStore = null;
if (config.useNewZNodePath) {
accountMetadataStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, false);
accountMetadataStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, false,
config.totalNumberOfVersionToKeep);
// postpone initializeFetchAndSchedule to setupRouter function.
} else {
accountMetadataStore = new LegacyMetadataStore(this.accountServiceMetrics, backupFileManager, helixStore);
initialFetchAndSchedule();
if (config.backFillAccountsToNewZNode) {
backFillStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, true);
backFillStore = new RouterStore(this.accountServiceMetrics, backupFileManager, helixStore, router, true,
config.totalNumberOfVersionToKeep);
}
}
this.backFillStore = backFillStore;
Expand Down
178 changes: 112 additions & 66 deletions ambry-account/src/main/java/com/github/ambry/account/RouterStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.HelixPropertyStore;
import org.json.JSONException;
Expand Down Expand Up @@ -61,8 +66,8 @@ class RouterStore extends AccountMetadataStore {
private static final Short CONTAINER_ID = Container.HELIX_ACCOUNT_SERVICE_CONTAINER_ID;
private static final String SERVICE_ID = "helixAccountService";

private final int totalNumberOfVersionToKeep;
private final AtomicReference<Router> router;

// If forBackFill is true, then when updating the account metadata, we don't create backup files and we don't merge
// accounts from ambry-server with the provided accounts set.
private final boolean forBackFill;
Expand All @@ -74,12 +79,15 @@ class RouterStore extends AccountMetadataStore {
* @param helixStore The {@link HelixPropertyStore} to fetch and update data.
* @param router The {@link Router} instance to retrieve and put blobs.
* @param forBackFill True if this {@link RouterStore} is created for backfill accounts to new zookeeper node.
* @param totalNumberOfVersionToKeep The total number of previous versions of account metadata to keep.
*/
RouterStore(AccountServiceMetrics accountServiceMetrics, BackupFileManager backupFileManager,
HelixPropertyStore<ZNRecord> helixStore, AtomicReference<Router> router, boolean forBackFill) {
HelixPropertyStore<ZNRecord> helixStore, AtomicReference<Router> router, boolean forBackFill,
int totalNumberOfVersionToKeep) {
super(accountServiceMetrics, backupFileManager, helixStore, ACCOUNT_METADATA_BLOB_IDS_PATH);
this.router = router;
this.forBackFill = forBackFill;
this.totalNumberOfVersionToKeep = totalNumberOfVersionToKeep;
}

@Override
Expand All @@ -88,20 +96,18 @@ Map<String, String> fetchAccountMetadataFromZNRecord(ZNRecord record) {
logger.error("Router is not yet initialized");
return null;
}
List<String> accountBlobIDs = record.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
if (accountBlobIDs == null || accountBlobIDs.size() == 0) {
List<String> blobIDAndVersionsJson = record.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
if (blobIDAndVersionsJson == null || blobIDAndVersionsJson.size() == 0) {
logger.info("ZNRecord={} to read on path={} does not have a simple list with key={}", record,
ACCOUNT_METADATA_BLOB_IDS_PATH, ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
return null;
} else {
// parse the json string list and get the blob id with the latest version
BlobIDAndVersion blobIDAndVersion = null;
for (String accountBlobIDInJson : accountBlobIDs) {
BlobIDAndVersion current = BlobIDAndVersion.fromJson(accountBlobIDInJson);
if (blobIDAndVersion == null || blobIDAndVersion.version < current.version) {
blobIDAndVersion = current;
}
}
// Parse the json string list and get the blob id with the latest version
// Since the the blobIDAndVersionsJson.size() is greater than 0, max will return an optional with solid value.
BlobIDAndVersion blobIDAndVersion = blobIDAndVersionsJson.stream()
.map(BlobIDAndVersion::fromJson)
.max(Comparator.comparing(BlobIDAndVersion::getVersion))
.get();

logger.trace("Start reading remote account data from blob {} and versioned at {}.", blobIDAndVersion.blobID,
blobIDAndVersion.version);
Expand All @@ -127,7 +133,7 @@ Map<String, String> readAccountMetadataFromBlobID(String blobID) {

JSONObject object = new JSONObject(new String(bytes, Charsets.UTF_8));
Map<String, String> map = new HashMap<>();
object.keySet().stream().forEach(key -> map.put(key, object.getString(key)));
object.keySet().forEach(key -> map.put(key, object.getString(key)));
return map;
} catch (Exception e) {
logger.error("Failed to read account metadata from blob id={}", blobID, e);
Expand Down Expand Up @@ -163,13 +169,24 @@ static String writeAccountMapToRouter(Map<String, String> accountMap, Router rou
return router.putBlob(properties, null, channel, PutBlobOptions.DEFAULT).get();
}

/**
* Helper function to log the error message out and throw an {@link IllegalStateException}.
* @param errorMessage The error message.
* @param cause The cause exception.
*/
private void logAndThrowIllegalStateException(String errorMessage, Exception cause) {
logger.error(errorMessage, cause);
throw new IllegalStateException(errorMessage, cause);
}

/**
* A {@link DataUpdater} to be used for updating {@link #ACCOUNT_METADATA_BLOB_IDS_PATH} inside of
* {@link #updateAccounts(Collection)}
*/
private class ZKUpdater implements AccountMetadataStore.ZKUpdater {
private final Collection<Account> accounts;
private String newBlobID = null;
private List<String> oldBlobIDsToDelete = null;

/**
* @param accounts The {@link Account}s to update.
Expand All @@ -180,12 +197,14 @@ private class ZKUpdater implements AccountMetadataStore.ZKUpdater {

@Override
public ZNRecord update(ZNRecord znRecord) {
// There are several steps to finish an update
// 1. Fetch the list from the ZNRecord
// 2. Fetch the AccountMetadata from the blob id if the list exist in the ZNRecord
// 3. Construct a new AccountMetadata
// 4. save it as a blob in the ambry server
// 5. Add the new blob id back to the list.
/* There are several steps to finish an update
* 1. Fetch the list from the ZNRecord
* 2. Fetch the AccountMetadata from the blob id if the list exist in the ZNRecord
* 3. Construct a new AccountMetadata
* 4. Save it as a blob in the ambry server
* 5. Remove oldest version if number of version exceeds the maximum value.
* 6. Add the new blob id to the list.
*/

// Start step 1:
ZNRecord recordToUpdate;
Expand All @@ -197,76 +216,64 @@ public ZNRecord update(ZNRecord znRecord) {
} else {
recordToUpdate = znRecord;
}

String errorMessage = null;
List<String> accountBlobIDs = recordToUpdate.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
// This is the version number for the new blob id.
int newVersion = 1;
Map<String, String> accountMap = null;
if (accountBlobIDs != null && accountBlobIDs.size() != 0) {
// parse the json string list and get the blob id with the latest version
List<BlobIDAndVersion> blobIDAndVersions = new ArrayList<>();
List<String> blobIDAndVersionsJson = recordToUpdate.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
blobIDAndVersionsJson =
blobIDAndVersionsJson == null ? new LinkedList<>() : new LinkedList<>(blobIDAndVersionsJson);
Map<String, String> accountMap = new HashMap<>();

if (blobIDAndVersionsJson.size() != 0) {
try {
BlobIDAndVersion blobIDAndVersion = null;
for (String accountBlobIDInJson : accountBlobIDs) {
BlobIDAndVersion current = BlobIDAndVersion.fromJson(accountBlobIDInJson);
if (blobIDAndVersion == null || blobIDAndVersion.version < current.version) {
blobIDAndVersion = current;
}
}
// Parse the json string list and get the BlobIDAndVersion with the latest version number.
blobIDAndVersionsJson.forEach(
accountBlobIDInJson -> blobIDAndVersions.add(BlobIDAndVersion.fromJson(accountBlobIDInJson)));
Collections.sort(blobIDAndVersions, Comparator.comparing(BlobIDAndVersion::getVersion));
BlobIDAndVersion blobIDAndVersion = blobIDAndVersions.get(blobIDAndVersions.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this piece of code has same logic (correct me if I am wrong) with line106-109. If yes, can we make them consistent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the difference is that one need to sort the list and the other only has to get the latest blobIDAndVersion. I figure it's too small to create a function for.

newVersion = blobIDAndVersion.version + 1;

// Start Step 2:
// if this is not for backfill, then just read account metadata from blob
// If this is not for backfill, then just read account metadata from blob, otherwise, initialize it with
// an empty map and fill it up with the accountMap passed to constructor.
accountMap = (!forBackFill) ? readAccountMetadataFromBlobID(blobIDAndVersion.blobID) : new HashMap<>();
// make this list mutable
accountBlobIDs = new ArrayList<>(accountBlobIDs);
} catch (JSONException e) {
accountServiceMetrics.remoteDataCorruptionErrorCount.inc();
errorMessage = "Exception occurred when parsing the blob id list from " + accountBlobIDs;
logger.error(errorMessage);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Exception occurred when parsing the blob id list from " + blobIDAndVersionsJson, e);
} catch (Exception e) {
errorMessage = "Unexpected exception occurred when parsing the blob id list from " + accountBlobIDs;
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Unexpected exception occurred when parsing the blob id list from " + blobIDAndVersionsJson, e);
}
}
// This ZNRecord doesn't exist when first time we update this ZNRecord, thus accountMap will be null.
if (accountMap == null) {
accountMap = new HashMap<>();
accountBlobIDs = new ArrayList<>();
}

if (!forBackFill) {
// Start step 3:
AccountInfoMap localAccountInfoMap;
AccountInfoMap localAccountInfoMap = null;
try {
localAccountInfoMap = new AccountInfoMap(accountServiceMetrics, accountMap);
} catch (JSONException e) {
accountServiceMetrics.remoteDataCorruptionErrorCount.inc();
errorMessage = "Exception occurred when building AccountInfoMap from accountMap " + accountMap;
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Exception occurred when building AccountInfoMap from accountMap " + accountMap, e);
}

// if there is any conflict with the existing record, fail the update. Exception thrown in this updater will
// If there is any conflict with the existing record, fail the update. Exception thrown in this updater will
// be caught by Helix and helixStore#update will return false.
if (localAccountInfoMap.hasConflictingAccount(this.accounts)) {
// Throw exception, so that helixStore can capture and terminate the update operation
errorMessage = "Updating accounts failed because one account to update conflicts with existing accounts";
logger.error(errorMessage);
throw new IllegalStateException(errorMessage);
logAndThrowIllegalStateException(
"Updating accounts failed because one account to update conflicts with existing accounts", null);
}
}

for (Account account : this.accounts) {
try {
accountMap.put(String.valueOf(account.getId()), account.toJson(true).toString());
} catch (Exception e) {
errorMessage = "Updating accounts failed because unexpected exception occurred when updating accountId="
+ account.getId() + " accountName=" + account.getName();
// Do not depend on Helix to log, so log the error message here.
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Updating accounts failed because unexpected exception occurred when updating accountId="
+ account.getId() + " accountName=" + account.getName(), e);
}
}

Expand All @@ -277,15 +284,27 @@ public ZNRecord update(ZNRecord znRecord) {
accountServiceMetrics.accountUpdateToAmbryTimeInMs.update(System.currentTimeMillis() - startTimeMs);
} catch (Exception e) {
accountServiceMetrics.accountUpdatesToAmbryServerErrorCount.inc();
errorMessage =
"Updating accounts failed because unexpected error occurred when uploading AccountMetadata to ambry";
logger.error(errorMessage, e);
throw new IllegalStateException(errorMessage, e);
logAndThrowIllegalStateException(
"Updating accounts failed because unexpected error occurred when uploading AccountMetadata to ambry", e);
}

// Start step 5:
accountBlobIDs.add(new BlobIDAndVersion(this.newBlobID, newVersion).toJson());
recordToUpdate.setListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY, accountBlobIDs);
if (blobIDAndVersions.size() + 1 > totalNumberOfVersionToKeep) {
Iterator<BlobIDAndVersion> iter = blobIDAndVersions.iterator();
oldBlobIDsToDelete = new ArrayList<>();
while (blobIDAndVersions.size() + 1 > totalNumberOfVersionToKeep) {
BlobIDAndVersion blobIDAndVersion = iter.next();
iter.remove();
logger.info("Adding blob " + blobIDAndVersion.getBlobID() + " at version " + blobIDAndVersion.getVersion()
+ " to delete");
oldBlobIDsToDelete.add(blobIDAndVersion.getBlobID());
}
blobIDAndVersionsJson = blobIDAndVersions.stream().map(BlobIDAndVersion::toJson).collect(Collectors.toList());
}

// Start step 6:
blobIDAndVersionsJson.add(new BlobIDAndVersion(this.newBlobID, newVersion).toJson());
recordToUpdate.setListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY, blobIDAndVersionsJson);
return recordToUpdate;
}

Expand All @@ -294,13 +313,28 @@ public void afterUpdate(boolean isUpdateSucceeded) {
if (!isUpdateSucceeded && newBlobID != null) {
// Delete the ambry blob regardless what error fails the update.
try {
logger.info("Removing blob " + newBlobID + " since the update failed");
// Block this execution? or maybe wait for a while then get out?
router.get().deleteBlob(newBlobID, SERVICE_ID).get();
} catch (Exception e) {
logger.error("Failed to delete blob={} because of {}", newBlobID, e);
logger.error("Failed to delete blob=" + newBlobID, e);
accountServiceMetrics.accountDeletesToAmbryServerErrorCount.inc();
}
}
// Notice this logic might end up with the dangling blobs, when the process crashes before the for loop.
// But since the frequency to update account metadata is pretty rare, it won't be a big problem.
if (isUpdateSucceeded && oldBlobIDsToDelete != null) {
for (String blobID : oldBlobIDsToDelete) {
try {
logger.info("Removing blob " + blobID);
// Block this execution? or maybe wait for a while then get out?
router.get().deleteBlob(blobID, SERVICE_ID).get();
} catch (Exception e) {
logger.error("Failed to delete blob=" + blobID, e);
accountServiceMetrics.accountDeletesToAmbryServerErrorCount.inc();
}
}
}
}
}

Expand Down Expand Up @@ -352,6 +386,18 @@ public String getBlobID() {
return blobID;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof BlobIDAndVersion)) {
return false;
}
BlobIDAndVersion other = (BlobIDAndVersion) o;
return blobID.equals(other.blobID) && version == other.version;
}

/**
* Deserialize a string that carries a json object to an {@link BlobIDAndVersion}.
* @param json The string that carries a json object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class HelixAccountServiceTest {
private static final String BAD_ACCOUNT_METADATA_STRING = "badAccountMetadataString";
private static final int NUM_REF_ACCOUNT = 10;
private static final int NUM_CONTAINER_PER_ACCOUNT = 4;
private static final int TOTAL_NUMBER_OF_VERSION_TO_KEEP = 100;
private static final Map<Short, Account> idToRefAccountMap = new HashMap<>();
private static final Map<Short, Map<Short, Container>> idToRefContainerMap = new HashMap<>();
private final Properties helixConfigProps = new Properties();
Expand Down Expand Up @@ -880,7 +881,7 @@ public void testFillAccountsToNewZNode() throws Exception {
HelixAccountService helixAccountService = (HelixAccountService) accountService;
RouterStore routerStore =
new RouterStore(helixAccountService.getAccountServiceMetrics(), helixAccountService.getBackupFileManager(),
helixStore, new AtomicReference<>(mockRouter), false);
helixStore, new AtomicReference<>(mockRouter), false, TOTAL_NUMBER_OF_VERSION_TO_KEEP);
Map<String, String> accountMap = routerStore.fetchAccountMetadata();
assertNotNull("Accounts should be backfilled to new znode", accountMap);
assertAccountMapEquals(accountService.getAllAccounts(), accountMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -77,7 +76,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call
BlobInfoAndData blob = allBlobs.get(blobId);
FutureResult<GetBlobResult> future = new FutureResult<>();
if (blob == null) {
Exception e = new ExecutionException(new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist));
Exception e = new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist);
future.done(null, e);
if (callback != null) {
callback.onCompletion(null, e);
Expand Down Expand Up @@ -146,7 +145,7 @@ public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> c
FutureResult<Void> future = new FutureResult<>();
BlobInfoAndData blob = allBlobs.get(blobId);
if (blob == null) {
Exception e = new ExecutionException(new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist));
Exception e = new RouterException("NotFound", RouterErrorCode.BlobDoesNotExist);
future.done(null, e);
if (callback != null) {
callback.onCompletion(null, e);
Expand All @@ -171,6 +170,12 @@ public Future<Void> updateBlobTtl(String blobId, String serviceId, long expiresA

@Override
public void close() throws IOException {
return;
// close will remove all the blobs
lock.lock();
try {
allBlobs.clear();
} finally {
lock.unlock();
}
}
}
Loading