Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mv iterator refresh #88

Merged
merged 4 commits into from
Dec 21, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion c_src/build_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
fi
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well

LEVELDB_VSN="mv-fadvise-control-2.0"
LEVELDB_VSN="mv-iterator-refresh"

SNAPPY_VSN="1.0.4"

Expand Down
11 changes: 7 additions & 4 deletions c_src/eleveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ERL_NIF_TERM ATOM_ERROR_DB_PUT;
ERL_NIF_TERM ATOM_NOT_FOUND;
ERL_NIF_TERM ATOM_VERIFY_CHECKSUMS;
ERL_NIF_TERM ATOM_FILL_CACHE;
ERL_NIF_TERM ATOM_ITERATOR_REFRESH;
ERL_NIF_TERM ATOM_SYNC;
ERL_NIF_TERM ATOM_ERROR_DB_DELETE;
ERL_NIF_TERM ATOM_CLEAR;
Expand Down Expand Up @@ -423,6 +424,8 @@ ERL_NIF_TERM parse_read_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::ReadO
opts.verify_checksums = (option[1] == eleveldb::ATOM_TRUE);
else if (option[0] == eleveldb::ATOM_FILL_CACHE)
opts.fill_cache = (option[1] == eleveldb::ATOM_TRUE);
else if (option[0] == eleveldb::ATOM_ITERATOR_REFRESH)
opts.iterator_refresh = (option[1] == eleveldb::ATOM_TRUE);
}

return eleveldb::ATOM_OK;
Expand Down Expand Up @@ -732,7 +735,7 @@ async_iterator_move(
return enif_make_badarg(env);

// Reuse ref from iterator creation
const ERL_NIF_TERM& caller_ref = itr_ptr->m_Snapshot->itr_ref;
const ERL_NIF_TERM& caller_ref = itr_ptr->itr_ref;

/* We can be invoked with two different arities from Erlang. If our "action_atom" parameter is not
in fact an atom, then it is actually a seek target. Let's find out which we are: */
Expand All @@ -748,7 +751,6 @@ async_iterator_move(
if(ATOM_PREFETCH == action_or_target) action = eleveldb::MoveTask::PREFETCH;
} // if


//
// Three situations:
// #1 not a PREFETCH next call
Expand All @@ -763,7 +765,7 @@ async_iterator_move(
itr_ptr->ReleaseReuseMove();

submit_new_request=true;
ret_term = enif_make_copy(env, itr_ptr->m_Snapshot->itr_ref);
ret_term = enif_make_copy(env, itr_ptr->itr_ref);

// force reply to be a message
itr_ptr->m_Iter->m_HandoffAtomic=1;
Expand All @@ -775,7 +777,7 @@ async_iterator_move(
else if (eleveldb::compare_and_swap(&itr_ptr->m_Iter->m_HandoffAtomic, 0, 1))
{
// nope, no prefetch ... await a message to erlang queue
ret_term = enif_make_copy(env, itr_ptr->m_Snapshot->itr_ref);
ret_term = enif_make_copy(env, itr_ptr->itr_ref);

// is this truly a wait for prefetch ... or actually the first prefetch request
if (!itr_ptr->m_Iter->m_PrefetchStarted)
Expand Down Expand Up @@ -1121,6 +1123,7 @@ try
ATOM(eleveldb::ATOM_NOT_FOUND, "not_found");
ATOM(eleveldb::ATOM_VERIFY_CHECKSUMS, "verify_checksums");
ATOM(eleveldb::ATOM_FILL_CACHE,"fill_cache");
ATOM(eleveldb::ATOM_ITERATOR_REFRESH,"iterator_refresh");
ATOM(eleveldb::ATOM_SYNC, "sync");
ATOM(eleveldb::ATOM_ERROR_DB_DELETE, "db_delete");
ATOM(eleveldb::ATOM_CLEAR, "clear");
Expand Down
5 changes: 3 additions & 2 deletions c_src/refobjects.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ DbObject::~DbObject()
void
DbObject::Shutdown()
{
#if 1
bool again;
ItrObject * itr_ptr;

Expand All @@ -348,7 +347,6 @@ DbObject::Shutdown()
ItrObject::InitiateCloseRequest(itr_ptr);

} while(again);
#endif

RefDec();

Expand Down Expand Up @@ -496,6 +494,9 @@ ItrObject::~ItrObject()

delete m_ReadOptions;

if (NULL!=itr_ref_env)
enif_free_env(itr_ref_env);

if (NULL!=m_DbPtr.get())
m_DbPtr->RemoveReference(this);

Expand Down
119 changes: 57 additions & 62 deletions c_src/refobjects.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#define INCL_REFOBJECTS_H

#include <stdint.h>
#include <sys/time.h>
#include <list>

#include "leveldb/db.h"
Expand Down Expand Up @@ -212,52 +213,6 @@ class DbObject : public ErlRefObject
}; // class DbObject


/**
* A self deleting wrapper to contain leveldb snapshot pointer.
* Needed because multiple LevelIteratorWrappers could be using
* it ... and finishing at different times.
*/

class LevelSnapshotWrapper : public RefObject
{
public:
ReferencePtr<DbObject> m_DbPtr; //!< need to keep db open for delete of this object
const leveldb::Snapshot * m_Snapshot;

// this is an odd place to put this info, but it
// happens to have the exact same lifespan
ERL_NIF_TERM itr_ref;
ErlNifEnv *itr_ref_env;

LevelSnapshotWrapper(DbObject * DbPtr, const leveldb::Snapshot * Snapshot)
: m_DbPtr(DbPtr), m_Snapshot(Snapshot), itr_ref_env(NULL)
{
};

virtual ~LevelSnapshotWrapper()
{
if (NULL!=itr_ref_env)
enif_free_env(itr_ref_env);

if (NULL!=m_Snapshot)
{
// leveldb performs actual "delete" call on m_Shapshot's pointer
m_DbPtr->m_Db->ReleaseSnapshot(m_Snapshot);
m_Snapshot=NULL;
} // if
} // ~LevelSnapshotWrapper

const leveldb::Snapshot * get() {return(m_Snapshot);};
const leveldb::Snapshot * operator->() {return(m_Snapshot);};

private:
LevelSnapshotWrapper(const LevelSnapshotWrapper &); // no copy
LevelSnapshotWrapper& operator=(const LevelSnapshotWrapper &); // no assignment

}; // LevelSnapshotWrapper



/**
* A self deleting wrapper to contain leveldb iterator.
* Used when an ItrObject needs to skip around and might
Expand All @@ -269,40 +224,75 @@ class LevelIteratorWrapper : public RefObject
{
public:
ReferencePtr<DbObject> m_DbPtr; //!< need to keep db open for delete of this object
ReferencePtr<LevelSnapshotWrapper> m_Snap;//!< keep snapshot active while this object is
const leveldb::Snapshot * m_Snapshot;
leveldb::Iterator * m_Iterator;
volatile uint32_t m_HandoffAtomic; //!< matthew's atomic foreground/background prefetch flag.
bool m_KeysOnly; //!< only return key values
bool m_PrefetchStarted; //!< true after first prefetch command

LevelIteratorWrapper(DbObject * DbPtr, LevelSnapshotWrapper * Snapshot,
leveldb::Iterator * Iterator, bool KeysOnly)
: m_DbPtr(DbPtr), m_Snap(Snapshot), m_Iterator(Iterator),
m_HandoffAtomic(0), m_KeysOnly(KeysOnly), m_PrefetchStarted(false)
leveldb::ReadOptions * m_Options; //!< shared copy of ItrObject::options
ERL_NIF_TERM itr_ref; //!< shared copy of ItrObject::itr_ref

// only used if m_Options.iterator_refresh == true
std::string m_RecentKey; //!< Most recent key returned
time_t m_IteratorStale; //!< time iterator should refresh
bool m_StillUse; //!< true if no error or key end seen

LevelIteratorWrapper(DbObject * DbPtr, bool KeysOnly,
leveldb::ReadOptions * Options, ERL_NIF_TERM itr_ref)
: m_DbPtr(DbPtr), m_Snapshot(NULL), m_Iterator(NULL),
m_HandoffAtomic(0), m_KeysOnly(KeysOnly), m_PrefetchStarted(false),
m_Options(Options), itr_ref(itr_ref),
m_IteratorStale(0), m_StillUse(true)
{
RebuildIterator();
};

virtual ~LevelIteratorWrapper()
{
if (NULL!=m_Iterator)
{
delete m_Iterator;
m_Iterator=NULL;
} // if
PurgeIterator();
} // ~LevelIteratorWrapper

leveldb::Iterator * get() {return(m_Iterator);};
leveldb::Iterator * operator->() {return(m_Iterator);};

bool Valid() {return(m_Iterator->Valid());};
bool Valid() {return(NULL!=m_Iterator && m_Iterator->Valid());};
leveldb::Slice key() {return(m_Iterator->key());};
leveldb::Slice value() {return(m_Iterator->value());};

// iterator_refresh related routines
void PurgeIterator()
{
if (NULL!=m_Snapshot)
{
// leveldb performs actual "delete" call on m_Shapshot's pointer
m_DbPtr->m_Db->ReleaseSnapshot(m_Snapshot);
m_Snapshot=NULL;
} // if

if (NULL!=m_Iterator)
{
delete m_Iterator;
m_Iterator=NULL;
} // if
} // PurgeIterator

void RebuildIterator()
{
struct timeval tv;

gettimeofday(&tv, NULL);
m_IteratorStale=tv.tv_sec + 300; // +5min

PurgeIterator();
m_Snapshot = m_DbPtr->m_Db->GetSnapshot();
m_Options->snapshot = m_Snapshot;
m_Iterator = m_DbPtr->m_Db->NewIterator(*m_Options);
} // RebuildIterator

private:
LevelIteratorWrapper(const LevelIteratorWrapper &); // no copy
LevelIteratorWrapper& operator=(const LevelIteratorWrapper &); // no assignment


}; // LevelIteratorWrapper


Expand All @@ -314,15 +304,19 @@ class ItrObject : public ErlRefObject
{
public:
ReferencePtr<LevelIteratorWrapper> m_Iter;
ReferencePtr<LevelSnapshotWrapper> m_Snapshot;
// ReferencePtr<LevelSnapshotWrapper> m_Snapshot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be deleted


bool keys_only;
leveldb::ReadOptions * m_ReadOptions;
leveldb::ReadOptions * m_ReadOptions; //!< Owned by this object, must delete

volatile class MoveTask * reuse_move;//!< iterator work object that is reused instead of lots malloc/free
volatile class MoveTask * reuse_move; //!< iterator work object that is reused instead of lots malloc/free

ReferencePtr<DbObject> m_DbPtr;

ERL_NIF_TERM itr_ref; //!< what was caller ref to async_iterator
ErlNifEnv *itr_ref_env; //!< Erlang Env to hold itr_ref


protected:
static ErlNifResourceType* m_Itr_RESOURCE;

Expand All @@ -348,6 +342,7 @@ class ItrObject : public ErlRefObject
ItrObject();
ItrObject(const ItrObject &); // no copy
ItrObject & operator=(const ItrObject &); // no assignment

}; // class ItrObject

} // namespace eleveldb
Expand Down
59 changes: 56 additions & 3 deletions c_src/workitems.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,45 @@ OpenTask::operator()()
work_result
MoveTask::operator()()
{
leveldb::Iterator* itr = m_ItrWrap->get();
leveldb::Iterator* itr;

itr=m_ItrWrap->get();


//
// race condition of prefetch clearing db iterator while
// async_iterator_move looking at it.
//

// iterator_refresh operation
if (m_ItrWrap->m_Options->iterator_refresh && m_ItrWrap->m_StillUse)
{
struct timeval tv;

gettimeofday(&tv, NULL);

if (m_ItrWrap->m_IteratorStale < tv.tv_sec || NULL==itr)
{
m_ItrWrap->RebuildIterator();
itr=m_ItrWrap->get();

// recover position
if (NULL!=itr && 0!=m_ItrWrap->m_RecentKey.size())
{
leveldb::Slice key_slice(m_ItrWrap->m_RecentKey);

itr->Seek(key_slice);
m_ItrWrap->m_StillUse=itr->Valid();
if (!m_ItrWrap->m_StillUse)
{
itr=NULL;
m_ItrWrap->PurgeIterator();
} // if
} // if
} // if
} // if

// back to normal operation
if(NULL == itr)
return work_result(local_env(), ATOM_ERROR, ATOM_ITERATOR_CLOSED);

Expand Down Expand Up @@ -215,6 +252,22 @@ MoveTask::operator()()

} // switch

// Post processing before telling the world the results
// (while only one thread might be looking at objects)
if (m_ItrWrap->m_Options->iterator_refresh)
{
if (itr->Valid())
{
m_ItrWrap->m_RecentKey.assign(itr->key().data(), itr->key().size());
} // if
else
{
// release iterator now, not later
m_ItrWrap->m_StillUse=false;
m_ItrWrap->PurgeIterator();
itr=NULL;
} // else
} // if

// who got back first, us or the erlang loop
if (compare_and_swap(&m_ItrWrap->m_HandoffAtomic, 0, 1))
Expand All @@ -228,7 +281,7 @@ MoveTask::operator()()
// setup next race for the response
m_ItrWrap->m_HandoffAtomic=0;

if(itr->Valid())
if(NULL!=itr && itr->Valid())
{
if (PREFETCH==action)
prepare_recycle();
Expand Down Expand Up @@ -260,7 +313,7 @@ MoveTask::local_env()

if (!terms_set)
{
caller_ref_term = enif_make_copy(local_env_, m_ItrWrap->m_Snap->itr_ref);
caller_ref_term = enif_make_copy(local_env_, m_ItrWrap->itr_ref);
caller_pid_term = enif_make_pid(local_env_, &local_pid);
terms_set=true;
} // if
Expand Down
16 changes: 4 additions & 12 deletions c_src/workitems.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,24 +274,16 @@ class IterTask : public WorkTask
virtual work_result operator()()
{
ItrObject * itr_ptr;
const leveldb::Snapshot * snapshot;
leveldb::Iterator * iterator;

// NOTE: transfering ownership of options to ItrObject
itr_ptr=ItrObject::CreateItrObject(m_DbPtr.get(), keys_only, options);

snapshot = m_DbPtr->m_Db->GetSnapshot();
itr_ptr->m_Snapshot.assign(new LevelSnapshotWrapper(m_DbPtr.get(), snapshot));
options->snapshot = snapshot;

// Copy caller_ref to reuse in future iterator_move calls
itr_ptr->m_Snapshot->itr_ref_env = enif_alloc_env();
itr_ptr->m_Snapshot->itr_ref = enif_make_copy(itr_ptr->m_Snapshot->itr_ref_env,
caller_ref());
itr_ptr->itr_ref_env = enif_alloc_env();
itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref());

iterator = m_DbPtr->m_Db->NewIterator(*options);
itr_ptr->m_Iter.assign(new LevelIteratorWrapper(m_DbPtr.get(), itr_ptr->m_Snapshot.get(),
iterator, keys_only));
itr_ptr->m_Iter.assign(new LevelIteratorWrapper(m_DbPtr.get(), keys_only,
options, itr_ptr->itr_ref));

ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr);

Expand Down
Loading