Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDBClient cleanup #351

Closed
wants to merge 9 commits into from
121 changes: 44 additions & 77 deletions mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;

import com.mongodb.client.model.InsertManyOptions;
import org.bson.Document;
import org.bson.types.Binary;

Expand Down Expand Up @@ -53,14 +54,16 @@
*/
public class MongoDbClient extends DB {

/** Update options to do an upsert. */
private static final UpdateOptions UPSERT = new UpdateOptions()
.upsert(true);

/** Used to include a field in a response. */
protected static final Integer INCLUDE = Integer.valueOf(1);

/** The database name to access. */
/** The options to use for inserting many documents */
protected static final InsertManyOptions INSERT_MANY_OPTIONS =
new InsertManyOptions().ordered(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be renamed to show what it stands for instead of the type of object it is. Maybe INSERT_UNORDERED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done.


/**
* The database name to access.
*/
private static String databaseName;

/** The database name to access. */
Expand All @@ -85,7 +88,7 @@ public class MongoDbClient extends DB {
private static int batchSize;

/** The bulk inserts pending for the thread. */
private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>();
private final List<Document> bulkInserts = new ArrayList<Document>();

/**
* Cleanup any state for this DB. Called once per DB instance; there is one
Expand Down Expand Up @@ -199,7 +202,9 @@ public void init() throws DBException {
writeConcern = uri.getOptions().getWriteConcern();

mongoClient = new MongoClient(uri);
database = mongoClient.getDatabase(databaseName);
database = mongoClient.getDatabase(databaseName)
.withReadPreference(readPreference)
.withWriteConcern(writeConcern);

System.out.println("mongo client connection created with "
+ url);
Expand Down Expand Up @@ -234,59 +239,21 @@ public int insert(String table, String key,
try {
MongoCollection<Document> collection = database
.getCollection(table);
Document criteria = new Document("_id", key);
Document toInsert = new Document("_id", key);
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
toInsert.put(entry.getKey(), entry.getValue().toArray());
}

// Do a single upsert.
if (batchSize <= 1) {
UpdateResult result = collection.withWriteConcern(writeConcern)
.replaceOne(criteria, toInsert, UPSERT);
if (!result.wasAcknowledged()
|| result.getMatchedCount() > 0
|| (result.isModifiedCountAvailable() && (result
.getModifiedCount() > 0))
|| result.getUpsertedId() != null) {
return 0;
}

System.err.println("Nothing inserted for key " + key);
return 1;
}

// Use a bulk insert.
try {
bulkInserts.add(new InsertOneModel<Document>(toInsert));
if (bulkInserts.size() < batchSize) {
return 0;
}

BulkWriteResult result = collection.withWriteConcern(
writeConcern).bulkWrite(bulkInserts,
new BulkWriteOptions().ordered(false));
if (!result.wasAcknowledged()
|| result.getInsertedCount() == bulkInserts.size()) {
bulkInserts.clear();
return 0;
}

System.err
.println("Number of inserted documents doesn't match the number sent, "
+ result.getInsertedCount()
+ " inserted, sent " + bulkInserts.size());
bulkInserts.add(toInsert);
if (bulkInserts.size() == batchSize) {
collection.insertMany(bulkInserts, INSERT_MANY_OPTIONS);
bulkInserts.clear();
return 1;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace();
return 1;
}
return 0;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace();
return 1;
}
Expand Down Expand Up @@ -314,22 +281,19 @@ public int read(String table, String key, Set<String> fields,
MongoCollection<Document> collection = database
.getCollection(table);
Document query = new Document("_id", key);
Document fieldsToReturn = new Document();

Document queryResult = null;
FindIterable<Document> findIterable = collection.find(query);

if (fields != null) {
Iterator<String> iter = fields.iterator();
while (iter.hasNext()) {
fieldsToReturn.put(iter.next(), INCLUDE);
Document projection = new Document();
for (String field : fields) {
projection.put(field, INCLUDE);
}
queryResult = collection.withReadPreference(readPreference)
.find(query).projection(fieldsToReturn).first();
}
else {
queryResult = collection.withReadPreference(readPreference)
.find(query).first();
findIterable.projection(projection);
}

Document queryResult = findIterable.first();

if (queryResult != null) {
fillMap(result, queryResult);
}
Expand Down Expand Up @@ -362,36 +326,40 @@ public int read(String table, String key, Set<String> fields,
@Override
public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
FindIterable<Document> cursor = null;
MongoCursor<Document> iter = null;
MongoCursor<Document> cursor = null;
try {
MongoCollection<Document> collection = database
.getCollection(table);

Document scanRange = new Document("$gte", startkey);
Document query = new Document("_id", scanRange);
Document sort = new Document("_id", INCLUDE);
Document projection = null;

FindIterable<Document> findIterable = collection.find(query)
.sort(sort)
.limit(recordcount);

if (fields != null) {
projection = new Document();
Document projection = new Document();
for (String fieldName : fields) {
projection.put(fieldName, INCLUDE);
}
findIterable.projection(projection);
}

cursor = collection.withReadPreference(readPreference).find(query)
.projection(projection).sort(sort).limit(recordcount);
cursor = findIterable.iterator();

// Do the query.
iter = cursor.iterator();
if (!iter.hasNext()) {
if (!cursor.hasNext()) {
System.err.println("Nothing found in scan for key " + startkey);
return 1;
}
while (iter.hasNext()) {

result.ensureCapacity(recordcount);

while (cursor.hasNext()) {
HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();

Document obj = iter.next();
Document obj = cursor.next();
fillMap(resultMap, obj);

result.add(resultMap);
Expand All @@ -404,8 +372,8 @@ public int scan(String table, String startkey, int recordcount,
return 1;
}
finally {
if (iter != null) {
iter.close();
if (cursor != null) {
cursor.close();
}
}
}
Expand Down Expand Up @@ -438,8 +406,7 @@ public int update(String table, String key,
}
Document update = new Document("$set", fieldsToSet);

UpdateResult result = collection.withWriteConcern(writeConcern)
.updateOne(query, update);
UpdateResult result = collection.updateOne(query, update);
if (result.wasAcknowledged() && result.getMatchedCount() == 0) {
System.err.println("Nothing updated for key " + key);
return 1;
Expand Down
4 changes: 2 additions & 2 deletions mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public final class OptionsSupport {

/** Value for an unavailable property. */
protected static final String UNAVAILABLE = "n/a";
private static final String UNAVAILABLE = "n/a";

/**
* Updates the URL with the appropriate attributes if legacy properties are
Expand Down Expand Up @@ -76,7 +76,7 @@ else if ("acknowledged".equals(writeConcernType)) {
result = addUrlOption(result, "w", "1");
}
else if ("journaled".equals(writeConcernType)) {
result = addUrlOption(result, "journal", "true");
result = addUrlOption(result, "j", "true");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation for the standard URI says to use "journal":

http://docs.mongodb.org/manual/reference/connection-string/#uri.journal

Does that not work with the Java Driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It never has, actually. I reported it as https://jira.mongodb.org/browse/JAVA-1890, but for now let's switch to "j".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. Hummm. The problem is that the async driver only supports the "journal" parameter. Since "journal" is the documented name I think we should leave that. I wouldn't be opposed to adding both a "journal" and "j" parameter to the URI with a reference to remove it once the Jira ticket is released.

}
else if ("replica_acknowledged".equals(writeConcernType)) {
result = addUrlOption(result, "w", "2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testUpdateUrlWriteConcern() {
assertThat(
updateUrl("mongodb://locahost:27017/?foo=bar",
props("mongodb.writeConcern", "journaled")),
is("mongodb://locahost:27017/?foo=bar&journal=true"));
is("mongodb://locahost:27017/?foo=bar&j=true"));
assertThat(
updateUrl("mongodb://locahost:27017/?foo=bar",
props("mongodb.writeConcern", "replica_acknowledged")),
Expand Down