-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve tombstone management #128
Changes from 28 commits
eb182ba
1a9c994
d594f90
8805790
a331587
239ed71
afc5c3f
1712650
dd1c8b4
a99916f
2af49f0
662fba8
990b5ca
4019fd0
89711f3
dd068bf
0695ba1
0c3699a
c0f8760
91a781b
3c4ffa5
f27c640
a981d53
2690354
cef207b
72d87aa
78a2667
7bb2db0
01d4538
1a444dd
f0d5ffc
2518628
1563feb
9db9248
432f3b1
a50a1c0
959ecce
5f4d145
c9bf3a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,16 @@ | |
|
||
#include <stdio.h> | ||
|
||
void DEBUG2(const char *fmt, ...) { } | ||
/* #include <stdarg.h> */ | ||
/* void DEBUG2(const char *fmt, ...) */ | ||
/* { */ | ||
/* va_list ap; */ | ||
/* va_start(ap, fmt); */ | ||
/* vfprintf(stderr, fmt, ap); */ | ||
/* va_end(ap); */ | ||
/* } */ | ||
|
||
#ifdef BITCASK_DEBUG | ||
#include <stdarg.h> | ||
void DEBUG(const char *fmt, ...) | ||
|
@@ -159,7 +169,7 @@ typedef struct | |
uint32_t newest_folder; // Start time for the last keyfolder | ||
uint64_t iter_generation; | ||
uint64_t pending_updated; | ||
uint64_t pending_start; // os:timestamp() as 64-bit integer | ||
uint64_t pending_start; // UNIX epoch seconds (since 1970) | ||
ErlNifPid* pending_awaken; // processes to wake once pending merged into entries | ||
unsigned int pending_awaken_count; | ||
unsigned int pending_awaken_size; | ||
|
@@ -212,6 +222,11 @@ typedef struct | |
#define set_pending_tombstone(e) {(e)->tstamp = 0; \ | ||
(e)->offset = 0; } | ||
|
||
// Use a magic number for signaling that a database is both in read-write | ||
// mode and that we want to do a get while ignoring the iteration status | ||
// of the keydir. | ||
#define MAGIC_OVERRIDE_ITERATING_STATUS 0x42424242 | ||
|
||
// Atoms (initialized in on_load) | ||
static ERL_NIF_TERM ATOM_ALLOCATION_ERROR; | ||
static ERL_NIF_TERM ATOM_ALREADY_EXISTS; | ||
|
@@ -291,7 +306,7 @@ static ErlNifFunc nif_funcs[] = | |
{"keydir_new", 0, bitcask_nifs_keydir_new0}, | ||
{"keydir_new", 1, bitcask_nifs_keydir_new1}, | ||
{"keydir_mark_ready", 1, bitcask_nifs_keydir_mark_ready}, | ||
{"keydir_put_int", 9, bitcask_nifs_keydir_put_int}, | ||
{"keydir_put_int", 10, bitcask_nifs_keydir_put_int}, | ||
{"keydir_get_int", 4, bitcask_nifs_keydir_get_int}, | ||
{"keydir_remove", 3, bitcask_nifs_keydir_remove}, | ||
{"keydir_remove_int", 6, bitcask_nifs_keydir_remove}, | ||
|
@@ -304,6 +319,7 @@ static ErlNifFunc nif_funcs[] = | |
{"keydir_trim_fstats", 2, bitcask_nifs_keydir_trim_fstats}, | ||
|
||
{"increment_file_id", 1, bitcask_nifs_increment_file_id}, | ||
{"increment_file_id", 2, bitcask_nifs_increment_file_id}, | ||
|
||
{"lock_acquire_int", 2, bitcask_nifs_lock_acquire}, | ||
{"lock_release_int", 1, bitcask_nifs_lock_release}, | ||
|
@@ -1058,6 +1074,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
bitcask_keydir_handle* handle; | ||
bitcask_keydir_entry_proxy entry; | ||
ErlNifBinary key; | ||
uint32_t nowsec; | ||
uint32_t newest_put; | ||
uint32_t old_file_id; | ||
uint64_t old_offset; | ||
|
@@ -1068,15 +1085,17 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
enif_get_uint(env, argv[3], &(entry.total_sz)) && | ||
enif_get_uint64_bin(env, argv[4], &(entry.offset)) && | ||
enif_get_uint(env, argv[5], &(entry.tstamp)) && | ||
enif_get_uint(env, argv[6], &(newest_put)) && | ||
enif_get_uint(env, argv[7], &(old_file_id)) && | ||
enif_get_uint64_bin(env, argv[8], &(old_offset))) | ||
enif_get_uint(env, argv[6], &(nowsec)) && | ||
enif_get_uint(env, argv[7], &(newest_put)) && | ||
enif_get_uint(env, argv[8], &(old_file_id)) && | ||
enif_get_uint64_bin(env, argv[9], &(old_offset))) | ||
{ | ||
bitcask_keydir* keydir = handle->keydir; | ||
entry.key = (char*)key.data; | ||
entry.key_sz = key.size; | ||
|
||
LOCK(keydir); | ||
DEBUG2("LINE %d put\r\n", __LINE__); | ||
|
||
DEBUG("+++ Put key = %d file_id=%d offset=%d total_sz=%d tstamp=%u old_file_id=%d\r\n", | ||
(int)(key.data[3]), | ||
|
@@ -1090,21 +1109,35 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
// If conditional put and not found, bail early | ||
if (!f.found && old_file_id != 0) | ||
{ | ||
DEBUG2("LINE %d put -> already_exists\r\n", __LINE__); | ||
UNLOCK(keydir); | ||
return ATOM_ALREADY_EXISTS; | ||
} | ||
|
||
// If put would resize and iterating, start pending hash | ||
if (kh_put_will_resize(entries, keydir->entries) && | ||
keydir->keyfolders != 0 && | ||
if (keydir->keyfolders != 0 && | ||
(keydir->pending == NULL)) | ||
{ | ||
keydir->pending = kh_init(entries); | ||
keydir->pending_start = time(NULL); | ||
keydir->pending_start = nowsec; | ||
} | ||
|
||
if (!f.found || f.is_tombstone) | ||
{ | ||
if ((newest_put && | ||
(entry.file_id < keydir->biggest_file_id)) || | ||
old_file_id != 0) { | ||
/* | ||
* Really, it doesn't exist. But the atom 'already_exists' | ||
* is also a signal that a merge has incremented the | ||
* keydir->biggest_file_id and that we need to retry this | ||
* operation after Erlang-land has re-written the key & val | ||
* to a new location in the same-or-bigger file id. | ||
*/ | ||
DEBUG2("LINE %d put -> already_exists\r\n", __LINE__); | ||
UNLOCK(keydir); | ||
return ATOM_ALREADY_EXISTS; | ||
} | ||
|
||
keydir->key_count++; | ||
keydir->key_bytes += key.size; | ||
|
||
|
@@ -1117,6 +1150,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
DEBUG("+++ Put new\r\n"); | ||
DEBUG_KEYDIR(keydir); | ||
|
||
DEBUG2("LINE %d put -> ok (!found || !tombstone)\r\n", __LINE__); | ||
UNLOCK(keydir); | ||
return ATOM_OK; | ||
} | ||
|
@@ -1127,6 +1161,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
old_offset == f.proxy.offset)) | ||
{ | ||
DEBUG("++ Conditional not match\r\n"); | ||
DEBUG2("LINE %d put -> already_exists/cond bad match\r\n", __LINE__); | ||
UNLOCK(keydir); | ||
return ATOM_ALREADY_EXISTS; | ||
} | ||
|
@@ -1163,6 +1198,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
} | ||
|
||
put_entry(keydir, &f, &entry); | ||
DEBUG2("LINE %d put -> ok\r\n", __LINE__); | ||
UNLOCK(keydir); | ||
DEBUG("Finished put\r\n"); | ||
DEBUG_KEYDIR(keydir); | ||
|
@@ -1176,6 +1212,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF | |
update_fstats(env, keydir, entry.file_id, entry.tstamp, | ||
0, 1, 0, entry.total_sz); | ||
} | ||
DEBUG2("LINE %d put -> already_exists end\r\n", __LINE__); | ||
UNLOCK(keydir); | ||
DEBUG("No update\r\n"); | ||
return ATOM_ALREADY_EXISTS; | ||
|
@@ -1206,8 +1243,10 @@ ERL_NIF_TERM bitcask_nifs_keydir_get_int(ErlNifEnv* env, int argc, const ERL_NIF | |
|
||
DEBUG("+++ Get issued\r\n"); | ||
|
||
int iterating_status = (rw_p == MAGIC_OVERRIDE_ITERATING_STATUS) ? | ||
0 : handle->iterating; | ||
find_result f; | ||
find_keydir_entry(keydir, &key, time, handle->iterating, &f); | ||
find_keydir_entry(keydir, &key, time, iterating_status, &f); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make names clear, we should probably change some names. In find_keydir_entry, iterating should be called use_snapshot, get_from_snapshot or similar. This is not from your PR. I wrote that one. My bad :). A similar name should be given to iterating_status. |
||
|
||
if (f.found && !f.is_tombstone && (rw_p || !f.no_snapshot)) | ||
{ | ||
|
@@ -1283,7 +1322,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_remove(ErlNifEnv* env, int argc, const ERL_NIF_ | |
{ | ||
UNLOCK(keydir); | ||
DEBUG("+++Conditional no match\r\n"); | ||
return ATOM_OK; | ||
return ATOM_ALREADY_EXISTS; | ||
} | ||
|
||
// Remove the key from the keydir stats | ||
|
@@ -1297,12 +1336,14 @@ ERL_NIF_TERM bitcask_nifs_keydir_remove(ErlNifEnv* env, int argc, const ERL_NIF_ | |
// If found an entry in the pending hash, convert it to a tombstone | ||
if (fr.pending_entry) | ||
{ | ||
DEBUG2("LINE %d pending put\r\n", __LINE__); | ||
set_pending_tombstone(fr.pending_entry); | ||
} | ||
// If frozen, add tombstone to pending hash (iteration must have | ||
// started between put/remove call in bitcask:delete. | ||
else if (keydir->pending) | ||
{ | ||
DEBUG2("LINE %d pending put\r\n", __LINE__); | ||
bitcask_keydir_entry* pending_entry = | ||
add_entry(keydir, keydir->pending, &fr.proxy); | ||
set_pending_tombstone(pending_entry); | ||
|
@@ -1401,6 +1442,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_copy(ErlNifEnv* env, int argc, const ERL_NIF_TE | |
} | ||
if (keydir->pending != NULL) | ||
{ | ||
DEBUG2("LINE %d pending copy\r\n", __LINE__); | ||
for (itr = kh_begin(keydir->pending); itr != kh_end(keydir->pending); ++itr) | ||
{ | ||
// Allocate our entry to be inserted into the new table and copy the record | ||
|
@@ -1450,14 +1492,17 @@ static int can_itr_keydir(bitcask_keydir* keydir, uint64_t ts, int maxage, int m | |
if (keydir->pending == NULL || // not frozen or caller wants to reuse | ||
(maxage < 0 && maxputs < 0)) // the exiting freeze | ||
{ | ||
DEBUG2("LINE %d can_itr\r\n", __LINE__); | ||
return 1; | ||
} | ||
else if (ts == 0 || ts < keydir->pending_start) | ||
{ // if clock skew (or forced wait), force key folding to wait | ||
DEBUG2("LINE %d can_itr\r\n", __LINE__); | ||
return 0; // which will fix keydir->pending_start | ||
} | ||
else | ||
{ | ||
DEBUG2("LINE %d can_itr\r\n", __LINE__); | ||
uint64_t age = ts - keydir->pending_start; | ||
return ((maxage < 0 || age <= maxage) && | ||
(maxputs < 0 || keydir->pending_updated <= maxputs)); | ||
|
@@ -1500,6 +1545,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr(ErlNifEnv* env, int argc, const ERL_NIF_TER | |
keydir->newest_folder = ts; | ||
keydir->keyfolders++; | ||
handle->iterator = kh_begin(keydir->entries); | ||
DEBUG2("LINE %d itr started, keydir->pending = 0x%lx\r\n", __LINE__, keydir->pending); | ||
UNLOCK(handle->keydir); | ||
return ATOM_OK; | ||
} | ||
|
@@ -1521,6 +1567,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr(ErlNifEnv* env, int argc, const ERL_NIF_TER | |
} | ||
enif_self(env, &keydir->pending_awaken[keydir->pending_awaken_count]); | ||
keydir->pending_awaken_count++; | ||
DEBUG2("LINE %d itr\r\n", __LINE__); | ||
UNLOCK(handle->keydir); | ||
return ATOM_OUT_OF_DATE; | ||
} | ||
|
@@ -1553,6 +1600,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr_next(ErlNifEnv* env, int argc, const ERL_NI | |
{ | ||
if (kh_exist(keydir->entries, handle->iterator)) | ||
{ | ||
DEBUG2("LINE %d itr_next\r\n", __LINE__); | ||
bitcask_keydir_entry* entry = kh_key(keydir->entries, handle->iterator); | ||
ErlNifBinary key; | ||
bitcask_keydir_entry_proxy proxy; | ||
|
@@ -1627,6 +1675,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr_release(ErlNifEnv* env, int argc, const ERL | |
// If last iterator closing, unfreeze keydir and merge pending entries. | ||
if (handle->keydir->keyfolders == 0 && handle->keydir->pending != NULL) | ||
{ | ||
DEBUG2("LINE %d itr_release\r\n", __LINE__); | ||
merge_pending_entries(env, handle->keydir); | ||
handle->keydir->iter_generation++; | ||
} | ||
|
@@ -1716,12 +1765,22 @@ ERL_NIF_TERM bitcask_nifs_keydir_release(ErlNifEnv* env, int argc, const ERL_NIF | |
ERL_NIF_TERM bitcask_nifs_increment_file_id(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | ||
{ | ||
bitcask_keydir_handle* handle; | ||
uint32_t conditional_file_id = 0; | ||
|
||
if (enif_get_resource(env, argv[0], bitcask_keydir_RESOURCE, (void**)&handle)) | ||
{ | ||
|
||
if (argv[1] != 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it really valid to test that a ERL_NIF_TERM is non-zero? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I imagine that what you really want to do here is check argc > 1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in a soon-to-be-poooshed commit. |
||
enif_get_uint(env, argv[1], &(conditional_file_id)); | ||
} | ||
LOCK(handle->keydir); | ||
(handle->keydir->biggest_file_id)++; | ||
if (conditional_file_id == 0) { | ||
(handle->keydir->biggest_file_id)++; | ||
} else { | ||
if (conditional_file_id > handle->keydir->biggest_file_id) { | ||
handle->keydir->biggest_file_id = conditional_file_id; | ||
} | ||
} | ||
uint32_t id = handle->keydir->biggest_file_id; | ||
UNLOCK(handle->keydir); | ||
return enif_make_tuple2(env, ATOM_OK, enif_make_uint(env, id)); | ||
|
@@ -2348,6 +2407,7 @@ static void merge_pending_entries(ErlNifEnv* env, bitcask_keydir* keydir) | |
|
||
// Free all resources for keydir folding | ||
kh_destroy(entries, keydir->pending); | ||
DEBUG2("LINE %d keydir->pending = NULL\r\n", __LINE__); | ||
keydir->pending = NULL; | ||
|
||
keydir->pending_updated = 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to kill the new multiple folds improvement. Is it intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can't do this, given how reliant we now are on AAE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand, sorry, both based on my sketchy knowledge of khash's internals, the multi-fold work, and the fact that if you restore that ky_put_will_resize() check then the PULSE test will start spewing failures very quickly because the keydir isn't properly frozen.
If the put is going to resize the hash, I certainly agree that a freeze is required in order to keep the folders' sorting order stable. However, this function is doing a mutation, and with a mutation of any kind (if there are any keyfolders) we must freeze. My brain can't think of a reason why you'd have keyfolders != 0 and doing a mutation where you would not want to freeze ... but then again, it's -18F here in Minneapolis today, my brain may be slushy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the work for multi-folds was aimed at making this possible: delaying the inevitable freeze until a put changed the number of khash slots. Iterators are linked to a timestamp and tolerate finding entries added after the iteration was started. Entries can now be simple entries or linked lists, which can contain multiple versions of an entry. Puts then do not replace the entry, but add another version to this list (or convert a plain entry to a list). Iterators will choose from this list of timestamped entries the one from the snapshot they belong to. When the pending hash is merged (freeze is over!), all entries are merged back to good ol' plain entries. It's a lot of fun, you should try it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, then how about a change to the PULSE model to deal with freezes that aren't really freezes? Or change all of the other logic to compare timestamps in the same way so that real frozenness returns?
Is the goal of delaying the freeze is simply a RAM-consumption optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it is there to make sure the number of concurrent keyfolders is not limited by when the next write is coming, which is usually "there, it's already here". In fact, memory consumption could be higher with this, as you can end up with many versions of values. It depends on initial value of the khash table and typical growth during freeze I suppose. I have not looked at the implications for the PULSE model, but it does sound like we'll need to work on that soon to merge this work before 2.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I apparently do not understand the goals of the multiple folds work. I'll try to review the original PR & comments today, because I'm stuck in a pit of befuddlement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright ... I put the will_resize check back in the code. The
bitcask_pulse
test finds this problem with a single folder (thefork_merge
step in the counterexample):The problem is that the fold in step # 37 sees the key 14 twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I've a different counterexample that skips keys: a fold doesn't see keys that have been put and have never been deleted.
Here is an annotated timeline of the race:
If the keydir is frozen, then the keydir update by the merge causes the keydir to freeze, and the folder sees <<"kk02">>'s entry in file # 2, the keydir's frozenness will allow the folder to see that the <<"kk02">>'s keydir entry is that same place in file # 2.
But I'm not seeing an easy way to fix this merge race without consuming more memory ... I need to sleep on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, well, my comment about a fix without using more RAM may or may not survive.
For the 2nd problem mentioned above, the one with the annotated timeline. I am not understanding why the mutation made by the "merging proc" isn't visible by the "folding proc" at the place where the fold reaches that key in file # 2.