Skip to content

Commit

Permalink
Merge pull request #3 from Seagate/features/asokvad260
Browse files Browse the repository at this point in the history
Features/asokvad260
  • Loading branch information
Marshall Pierce committed May 15, 2014
2 parents 973678b + 229b907 commit 18c22f3
Show file tree
Hide file tree
Showing 17 changed files with 532 additions and 246 deletions.
7 changes: 4 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ set(PREFIX "${CMAKE_BINARY_DIR}/vendor")

include(ExternalProject)

set(PROTOBUFUTIL_VERSION "v0.2.2")
set(PROTOBUFUTIL_MD5 "e1de2f4b5068b1d6464bacd3b3af3b72")
set(PROTOBUFUTIL_VERSION "0.2.3")
set(PROTOBUFUTIL_MD5 "b60a0ff47222ab70e0ddb7a8e6ef6900")

ExternalProject_add(
protobufutil
Expand Down Expand Up @@ -133,7 +133,8 @@ add_library(kinetic_client
src/main/nonblocking_string.cc
src/main/socket_wrapper.cc
src/main/blocking_kinetic_connection.cc
src/main/connection_handle.cc
src/main/threadsafe_blocking_kinetic_connection.cc

src/main/key_range_iterator.cc
)
add_dependencies(kinetic_client
Expand Down
33 changes: 15 additions & 18 deletions DoxygenMainPage.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,23 @@ Next, create a connection options object to indicate the IP, port, user identity

Finally, open the connection:

std::unique_ptr<kinetic::ConnectionHandle> connection;
kinetic::KineticStatus status = kinetic_connection_factory.NewConnection(options, timeout_in_seconds, connection);
std::unique_ptr<kinetic::NonblockingKineticConnection> connection;
kinetic::KineticStatus status = kinetic_connection_factory.NewNonblockingConnection(options, timeout_in_seconds, connection);

*Note*: If the connection needs to be shared between threads, call `NewThreadsafeConnection` instead.
*Note*: If the connection needs to be shared between threads, call `NewThreadsafeNonblockingConnection` instead.

To check whether the connection succeeded, check the value of `status.ok()`. If it's false, additional error information is available by calling `status.statusCode()` and `status.message()`.

The underlying connection can be accessed by calling `connection.blocking()` or `connection.nonblocking()`. The interface returned by `blocking()` and `nonblocking()` both access the same underlying TCP connection.
The factory can create threadasafe and non-threadsafe variants of blocking and nonblocking clients. Blocking clients can also be created from an existing nonblocking client using the `BlockingKineticClient` constructor which takes a `NonblockingKineticConnection`.
The benefit of this approach is that both clients will use the same underyling connection. This is not necessarily threadsafe. If you use a nonblocking and blocking client which use the same underlying connection, then there are no guarantees about which thread is will process callbacks.

Performing blocking GET/PUT operations
--------------------------------------
PUT and GET calls make use of the `kientic::KineticRecord` class, which bundles the value/version/tag/algorithm fourpule allows construction via a series of `shared_ptr` instances (avoiding a copy) or a series of `string`s (avoiding extra typing).

To PUT a key:

connection->blocking().Put(
blocking_connection->Put(
key,
version,
kinetic::IGNORE_VERSION,
Expand All @@ -52,7 +53,7 @@ Like most operations in the Kinetic C++ client, `Put` returns a `KineticStatus`
GETting a key works similarly:

std::unique_ptr<KineticRecord> record;
connection->blocking().Get(key, record);
blocking_connection->Get(key, record);

Performing non-blocking GET/PUT operations
------------------------------------------
Expand Down Expand Up @@ -80,20 +81,20 @@ class to receive success and failure messages:
Then enqueue a series of operations:

auto record = make_shared<KineticRecord>(value, version, tag, Message_Algorithm_SHA1);
connection->nonblocking().Put(key1, version, kinetic::IGNORE_VERSION, record, callback);
nonblocking_connection->Put(key1, version, kinetic::IGNORE_VERSION, record, callback);
Message_Algorithm_SHA1);
connection->nonblocking().Put(key2, version, kinetic::IGNORE_VERSION, record, callback);
nonblocking_connection->Put(key2, version, kinetic::IGNORE_VERSION, record, callback);
Message_Algorithm_SHA1);
connection->nonblocking().Put(key3, version, kinetic::IGNORE_VERSION, record, callback);
nonblocking_connection->Put(key3, version, kinetic::IGNORE_VERSION, record, callback);

Finally, call `Run` repeatedly to actually execute the operations:

fd_set read_fds, write_fds;
int nfd = 0;
connection->nonblocking().Run(&read_fds, &write_fds, &nfd);
nonblocking_connection->Run(&read_fds, &write_fds, &nfd);
while (there_is_work_to_do) {
while (select(nfd + 1, &read_fds, &write_fds, NULL, NULL) <= 0);
connection->nonblocking().Run(&read_fds, &write_fds, &num_fds);
nonblocking_connection->.Run(&read_fds, &write_fds, &num_fds);
}

If desired, other fds can be added to the `fd_set`s so that the `select` call can wait for IO to be ready on fds controlled by the Kinetic API or other parts of the application.
Expand All @@ -117,12 +118,8 @@ GETs work very similarly. First, a callback implementation:

Next, enqueue some operations:

connection->nonblocking().Get(key_1, callback);
connection->nonblocking().Get(key_2, callback);
connection->nonblocking().Get(key_3, callback);
nonblocking_connection->Get(key_1, callback);
nonblocking_connection->Get(key_2, callback);
nonblocking_connection->Get(key_3, callback);

The `Run` loop works exactly the same way as it does for the PUT case. Multiple GET/PUT/management operations can all be enqueued and they will be executed one at a time in order by repeated `Run` calls.

Closing a connection
--------------------
The `ConnectionHandle` destructor closes the connection. This means that the he connection remains open until the `unique_ptr` goes out of scope or the `ConnectionHandle` is manually freed.
9 changes: 7 additions & 2 deletions include/kinetic/blocking_kinetic_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ class BlockingKineticConnection {
/// @param[in] network_timeout_seconds If an operation goes more than network_timeout_seconds
/// seconds without receiving data the operation will fail
explicit BlockingKineticConnection(
NonblockingKineticConnection* nonblocking_connection,
shared_ptr<NonblockingKineticConnection> nonblocking_connection,
unsigned int network_timeout_seconds);

explicit BlockingKineticConnection(
unique_ptr<NonblockingKineticConnection> nonblocking_connection,
unsigned int network_timeout_seconds);

virtual ~BlockingKineticConnection();

/// If the drive has a non-zero cluster version, requests will fail unless the developer
Expand Down Expand Up @@ -157,7 +162,7 @@ class BlockingKineticConnection {
/// Helper method for translating a StatusCode from the drive into an API client KineticStatus
/// object
KineticStatus GetKineticStatus(StatusCode code);
NonblockingKineticConnection* nonblocking_connection_;
shared_ptr<NonblockingKineticConnection> nonblocking_connection_;
const unsigned int network_timeout_seconds_;
DISALLOW_COPY_AND_ASSIGN(BlockingKineticConnection);
};
Expand Down
60 changes: 0 additions & 60 deletions include/kinetic/connection_handle.h

This file was deleted.

72 changes: 56 additions & 16 deletions include/kinetic/kinetic_connection_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
#include "protobufutil/message_stream.h"

#include "kinetic/connection_options.h"
#include "kinetic/connection_handle.h"
#include "kinetic/hmac_provider.h"
#include "kinetic/nonblocking_kinetic_connection.h"
#include "kinetic/blocking_kinetic_connection.h"
#include "kinetic/nonblocking_kinetic_connection.h"
#include "kinetic/threadsafe_nonblocking_connection.h"
#include "kinetic/threadsafe_blocking_kinetic_connection.h"
#include "kinetic/status.h"

namespace kinetic {
Expand All @@ -41,7 +42,7 @@ class KineticConnectionFactory {
public:
explicit KineticConnectionFactory(HmacProvider hmac_provider);

/// Creates and opens a new connection using the given options. If the returned
/// Creates and opens a new nonblocking connection using the given options. If the returned
/// Status indicates success then the connection is ready to perform
/// actions and the caller should delete it when done using it. If the
/// Status indicates failure, then no connection will be created and
Expand All @@ -50,24 +51,63 @@ class KineticConnectionFactory {
/// @param[in] options Specifies host, port, user id, etc
/// @param[in] network_timeout_seconds If an operation goes more than this many seconds without
/// data the operation fails
/// @param[out] connection Populated with a ConnectionHandle if the request
/// @param[out] connection Populated with a NonblockingKineticConnection if the request
/// succeeds
virtual Status NewConnection(
const ConnectionOptions &options,
unsigned int network_timeout_seconds,
unique_ptr<ConnectionHandle>& connection);
virtual Status NewNonblockingConnection(
const ConnectionOptions& options,
unique_ptr <NonblockingKineticConnection>& connection);

virtual Status NewNonblockingConnection(
const ConnectionOptions& options,
shared_ptr <NonblockingKineticConnection>& connection);

/// Like NewNonblockingConnection, except the connection is safe for use by multiple threads.
virtual Status NewThreadsafeNonblockingConnection(
const ConnectionOptions& options,
unique_ptr <NonblockingKineticConnection>& connection);

virtual Status NewThreadsafeNonblockingConnection(
const ConnectionOptions& options,
shared_ptr <NonblockingKineticConnection>& connection);

/// Creates and opens a new blocking connection using the given options. If the returned
/// Status indicates success then the connection is ready to perform
/// actions and the caller should delete it when done using it. If the
/// Status indicates failure, then no connection will be created and
/// the caller must not attempt to use or delete it.
///
/// @param[in] options Specifies host, port, user id, etc
/// @param[in] network_timeout_seconds If an operation goes more than this many seconds without
/// data the operation fails
/// @param[out] connection Populated with a BlockingKineticConnection if the request
/// succeeds
virtual Status NewBlockingConnection(
const ConnectionOptions& options,
unique_ptr <BlockingKineticConnection>& connection,
unsigned int network_timeout_seconds);

virtual Status NewBlockingConnection(
const ConnectionOptions& options,
shared_ptr <BlockingKineticConnection>& connection,
unsigned int network_timeout_seconds);

/// Like NewBlockingConnection, except the connection is safe for use by multiple threads
virtual Status NewThreadsafeBlockingConnection(
const ConnectionOptions& options,
unique_ptr <BlockingKineticConnection>& connection,
unsigned int network_timeout_seconds);

/// Like NewConnection, except the connections available via the ConnectionHandle are safe for
/// use by multiple threads.
virtual Status NewThreadsafeConnection(const ConnectionOptions &options,
unsigned int network_timeout_seconds,
unique_ptr<ConnectionHandle>& connection);
virtual Status NewThreadsafeBlockingConnection(
const ConnectionOptions& options,
shared_ptr <BlockingKineticConnection>& connection,
unsigned int network_timeout_seconds);

private:
HmacProvider hmac_provider_;
Status doNewConnection(ConnectionOptions const &options,
unsigned int network_timeout_seconds,
unique_ptr<ConnectionHandle>& connection, bool threadsafe);
Status doNewConnection(
ConnectionOptions const& options,
unique_ptr <NonblockingKineticConnection>& connection,
bool threadsafe);
};

/// Helper method that creates a new KineticConnectionFactory with
Expand Down
Loading

0 comments on commit 18c22f3

Please sign in to comment.