From f32fc02e39ab4a907198d5e3ab02fdc09c824eb5 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 6 Apr 2017 17:46:20 -0400 Subject: [PATCH] [accumulo] A general "refresh" to the Accumulo binding (#947) * Expand on the README, covering table creation and best-practices for table config * Avoid unnecessary Text object creations (in loops and instead of byte[] usage) * Use a ConcurrentHashMap to better match the DB API * Fix error messages and always call printStackTrace() on exceptions * Use BATCHED_OK instead of OK in insert() (more correct) --- accumulo/README.md | 37 ++- .../ycsb/db/accumulo/AccumuloClient.java | 293 ++++++++++-------- 2 files changed, 194 insertions(+), 136 deletions(-) diff --git a/accumulo/README.md b/accumulo/README.md index fd9b4e8d7a..38e444cb7c 100644 --- a/accumulo/README.md +++ b/accumulo/README.md @@ -36,7 +36,42 @@ Git clone YCSB and compile: cd YCSB mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package -### 3. Load Data and Run Tests +### 3. Create the Accumulo table + +By default, YCSB uses a table with the name "usertable". Users must create this table before loading +data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple +Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per +TabletServer is a good starting point. Unless otherwise specified, the following commands should run +on any version of Accumulo. + + $ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt + $ accumulo shell -u -p -e "createtable usertable" + $ accumulo shell -u -p -e "addsplits -t usertable -sf /tmp/splits.txt" + $ accumulo shell -u -p -e "config -t usertable -s table.cache.block.enable=true" + +Additionally, there are some other configuration properties which can increase performance. These +can be set on the Accumulo table via the shell after it is created. Setting the table durability +to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls +to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy` +is a faster and similarly-efficient option. The mutation queue property controls how many writes +that Accumulo will buffer in memory before performing a flush; this property should be set relative +to the amount of JVM heap the TabletServers are given. + +Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only +exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions. + + accumulo> config -s table.durability=flush + accumulo> config -s tserver.total.mutation.queue.max=256M + accumulo> config -t usertable -s table.file.compress.type=snappy + +On repeated data loads, the following commands may be helpful to re-set the state of the table quickly. + + accumulo> createtable tmp --copy-splits usertable --copy-config usertable + accumulo> deletetable --force usertable + accumulo> renametable tmp usertable + accumulo> compact --wait -t accumulo.metadata + +### 4. Load Data and Run Tests Load the data: diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java index 96b869e2b7..41d6f7f6fa 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java @@ -18,17 +18,25 @@ package com.yahoo.ycsb.db.accumulo; -import com.yahoo.ycsb.ByteArrayByteIterator; -import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.DB; -import com.yahoo.ycsb.DBException; -import com.yahoo.ycsb.Status; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -39,16 +47,16 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CleanUp; import org.apache.hadoop.io.Text; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.TimeUnit; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; /** * Accumulo binding for YCSB. @@ -57,14 +65,11 @@ public class AccumuloClient extends DB { private ZooKeeperInstance inst; private Connector connector; - private String table = ""; - private BatchWriter bw = null; private Text colFam = new Text(""); - private Scanner singleScanner = null; // A scanner for reads/deletes. - private Scanner scanScanner = null; // A scanner for use by scan() + private byte[] colFamBytes = new byte[0]; + private final ConcurrentHashMap writers = new ConcurrentHashMap<>(); static { - Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -76,6 +81,7 @@ public void run() { @Override public void init() throws DBException { colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); + colFamBytes = colFam.toString().getBytes(UTF_8); inst = new ZooKeeperInstance( getProperties().getProperty("accumulo.instanceName"), @@ -85,9 +91,7 @@ public void init() throws DBException { AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); connector = inst.getConnector(principal, token); - } catch (AccumuloException e) { - throw new DBException(e); - } catch (AccumuloSecurityException e) { + } catch (AccumuloException | AccumuloSecurityException e) { throw new DBException(e); } @@ -100,45 +104,56 @@ public void init() throws DBException { @Override public void cleanup() throws DBException { try { - if (bw != null) { - bw.close(); + Iterator iterator = writers.values().iterator(); + while (iterator.hasNext()) { + BatchWriter writer = iterator.next(); + writer.close(); + iterator.remove(); } } catch (MutationsRejectedException e) { throw new DBException(e); } } - /** - * Commonly repeated functionality: Before doing any operation, make sure - * we're working on the correct table. If not, open the correct one. - * - * @param t - * The table to open. - */ - public void checkTable(String t) throws TableNotFoundException { - if (!table.equals(t)) { - getTable(t); - } - } - /** * Called when the user specifies a table that isn't the same as the existing * table. Connect to it and if necessary, close our current connection. * - * @param t + * @param table * The table to open. */ - public void getTable(String t) throws TableNotFoundException { - if (bw != null) { // Close the existing writer if necessary. - try { - bw.close(); - } catch (MutationsRejectedException e) { - // Couldn't spit out the mutations we wanted. - // Ignore this for now. - System.err.println("MutationsRejectedException: " + e.getMessage()); + public BatchWriter getWriter(String table) throws TableNotFoundException { + // tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api. + // We know that YCSB is really only ever going to send us data for one table, so using + // a concurrent data structure is overkill (especially in such a hot code path). + // However, the impact seems to be relatively negligible in trivial local tests and it's + // "more correct" WRT to the API. + BatchWriter writer = writers.get(table); + if (null == writer) { + BatchWriter newWriter = createBatchWriter(table); + BatchWriter oldWriter = writers.putIfAbsent(table, newWriter); + // Someone beat us to creating a BatchWriter for this table, use their BatchWriters + if (null != oldWriter) { + try { + // Make sure to clean up our new batchwriter! + newWriter.close(); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + writer = oldWriter; + } else { + writer = newWriter; } } + return writer; + } + /** + * Creates a BatchWriter with the expected configuration. + * + * @param table The table to write to + */ + private BatchWriter createBatchWriter(String table) throws TableNotFoundException { BatchWriterConfig bwc = new BatchWriterConfig(); bwc.setMaxLatency( Long.parseLong(getProperties() @@ -146,16 +161,15 @@ public void getTable(String t) throws TableNotFoundException { TimeUnit.MILLISECONDS); bwc.setMaxMemory(Long.parseLong( getProperties().getProperty("accumulo.batchWriterSize", "100000"))); - bwc.setMaxWriteThreads(Integer.parseInt( - getProperties().getProperty("accumulo.batchWriterThreads", "1"))); - - bw = connector.createBatchWriter(t, bwc); - - // Create our scanners - singleScanner = connector.createScanner(t, Authorizations.EMPTY); - scanScanner = connector.createScanner(t, Authorizations.EMPTY); - - table = t; // Store the name of the table we have open. + final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads"); + // Try to saturate the client machine. + int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + if (null != numThreadsValue) { + numThreads = Integer.parseInt(numThreadsValue); + } + System.err.println("Using " + numThreads + " threads to write data"); + bwc.setMaxWriteThreads(numThreads); + return connector.createBatchWriter(table, bwc); } /** @@ -165,120 +179,120 @@ public void getTable(String t) throws TableNotFoundException { * @param fields the set of columns to scan * @return an Accumulo {@link Scanner} bound to the given row and columns */ - private Scanner getRow(Text row, Set fields) { - singleScanner.clearColumns(); - singleScanner.setRange(new Range(row)); + private Scanner getRow(String table, Text row, Set fields) throws TableNotFoundException { + Scanner scanner = connector.createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(row)); if (fields != null) { for (String field : fields) { - singleScanner.fetchColumn(colFam, new Text(field)); + scanner.fetchColumn(colFam, new Text(field)); } } - return singleScanner; + return scanner; } @Override - public Status read(String t, String key, Set fields, + public Status read(String table, String key, Set fields, HashMap result) { + Scanner scanner = null; try { - checkTable(t); - } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); - return Status.ERROR; - } - - try { + scanner = getRow(table, new Text(key), null); // Pick out the results we care about. - for (Entry entry : getRow(new Text(key), null)) { + final Text cq = new Text(); + for (Entry entry : scanner) { + entry.getKey().getColumnQualifier(cq); Value v = entry.getValue(); byte[] buf = v.get(); - result.put(entry.getKey().getColumnQualifier().toString(), + result.put(cq.toString(), new ByteArrayByteIterator(buf)); } } catch (Exception e) { - System.err.println("Error trying to reading Accumulo table" + key + e); + System.err.println("Error trying to reading Accumulo table " + table + " " + key); + e.printStackTrace(); return Status.ERROR; + } finally { + if (null != scanner) { + scanner.close(); + } } return Status.OK; } @Override - public Status scan(String t, String startkey, int recordcount, + public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - try { - checkTable(t); - } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); - return Status.ERROR; - } - - // There doesn't appear to be a way to create a range for a given - // LENGTH. Just start and end keys. So we'll do this the hard way for - // now: // Just make the end 'infinity' and only read as much as we need. - scanScanner.clearColumns(); - scanScanner.setRange(new Range(new Text(startkey), null)); - - // Batch size is how many key/values to try to get per call. Here, I'm - // guessing that the number of keys in a row is equal to the number of - // fields we're interested in. - - // We try to fetch one more so as to tell when we've run out of fields. - - // If no fields are provided, we assume one column/row. - if (fields != null) { - // And add each of them as fields we want. - for (String field : fields) { - scanScanner.fetchColumn(colFam, new Text(field)); + Scanner scanner = null; + try { + scanner = connector.createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(new Text(startkey), null)); + + // Have Accumulo send us complete rows, serialized in a single Key-Value pair + IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class); + scanner.addScanIterator(cfg); + + // If no fields are provided, we assume one column/row. + if (fields != null) { + // And add each of them as fields we want. + for (String field : fields) { + scanner.fetchColumn(colFam, new Text(field)); + } } - } - String rowKey = ""; - HashMap currentHM = null; - int count = 0; - - // Begin the iteration. - for (Entry entry : scanScanner) { - // Check for a new row. - if (!rowKey.equals(entry.getKey().getRow().toString())) { + int count = 0; + for (Entry entry : scanner) { + // Deserialize the row + SortedMap row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); + HashMap rowData; + if (null != fields) { + rowData = new HashMap<>(fields.size()); + } else { + rowData = new HashMap<>(); + } + result.add(rowData); + // Parse the data in the row, avoid unnecessary Text object creation + final Text cq = new Text(); + for (Entry rowEntry : row.entrySet()) { + rowEntry.getKey().getColumnQualifier(cq); + rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get())); + } if (count++ == recordcount) { // Done reading the last row. break; } - rowKey = entry.getKey().getRow().toString(); - if (fields != null) { - // Initial Capacity for all keys. - currentHM = new HashMap(fields.size()); - } else { - // An empty result map. - currentHM = new HashMap(); - } - result.add(currentHM); } - // Now add the key to the hashmap. - Value v = entry.getValue(); - byte[] buf = v.get(); - currentHM.put(entry.getKey().getColumnQualifier().toString(), - new ByteArrayByteIterator(buf)); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table."); + e.printStackTrace(); + return Status.ERROR; + } catch (IOException e) { + System.err.println("Error deserializing data from Accumulo."); + e.printStackTrace(); + return Status.ERROR; + } finally { + if (null != scanner) { + scanner.close(); + } } return Status.OK; } @Override - public Status update(String t, String key, + public Status update(String table, String key, HashMap values) { + BatchWriter bw = null; try { - checkTable(t); + bw = getWriter(table); } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); + System.err.println("Error opening batch writer to Accumulo table " + table); + e.printStackTrace(); return Status.ERROR; } - Mutation mutInsert = new Mutation(new Text(key)); + Mutation mutInsert = new Mutation(key.getBytes(UTF_8)); for (Map.Entry entry : values.entrySet()) { - mutInsert.put(colFam, new Text(entry.getKey()), - System.currentTimeMillis(), new Value(entry.getValue().toArray())); + mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray()); } try { @@ -289,7 +303,7 @@ public Status update(String t, String key, return Status.ERROR; } - return Status.OK; + return Status.BATCHED_OK; } @Override @@ -299,17 +313,19 @@ public Status insert(String t, String key, } @Override - public Status delete(String t, String key) { + public Status delete(String table, String key) { + BatchWriter bw; try { - checkTable(t); + bw = getWriter(table); } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); + System.err.println("Error trying to connect to Accumulo table."); + e.printStackTrace(); return Status.ERROR; } try { - deleteRow(new Text(key)); - } catch (MutationsRejectedException e) { + deleteRow(table, new Text(key), bw); + } catch (TableNotFoundException | MutationsRejectedException e) { System.err.println("Error performing delete."); e.printStackTrace(); return Status.ERROR; @@ -323,24 +339,31 @@ public Status delete(String t, String key) { } // These functions are adapted from RowOperations.java: - private void deleteRow(Text row) throws MutationsRejectedException { - deleteRow(getRow(row, null)); + private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException, + TableNotFoundException { + // TODO Use a batchDeleter instead + deleteRow(getRow(table, row, null), bw); } /** * Deletes a row, given a Scanner of JUST that row. */ - private void deleteRow(Scanner scanner) throws MutationsRejectedException { + private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException { Mutation deleter = null; // iterate through the keys + final Text row = new Text(); + final Text cf = new Text(); + final Text cq = new Text(); for (Entry entry : scanner) { // create a mutation for the row if (deleter == null) { - deleter = new Mutation(entry.getKey().getRow()); + entry.getKey().getRow(row); + deleter = new Mutation(row); } + entry.getKey().getColumnFamily(cf); + entry.getKey().getColumnQualifier(cq); // the remove function adds the key with the delete flag set to true - deleter.putDelete(entry.getKey().getColumnFamily(), - entry.getKey().getColumnQualifier()); + deleter.putDelete(cf, cq); } bw.addMutation(deleter);