|
@@ -102,54 +102,69 @@ public class PerformanceEvaluation implements HConstants {
|
|
SEQUENTIAL_WRITE,
|
|
SEQUENTIAL_WRITE,
|
|
SCAN});
|
|
SCAN});
|
|
|
|
|
|
- private final Configuration conf;
|
|
|
|
- private final HClient client;
|
|
|
|
|
|
+ volatile Configuration conf;
|
|
private boolean miniCluster = false;
|
|
private boolean miniCluster = false;
|
|
private int N = 1;
|
|
private int N = 1;
|
|
private int R = ROWS_PER_GB;
|
|
private int R = ROWS_PER_GB;
|
|
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
|
|
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
|
|
|
|
|
|
- /*
|
|
|
|
|
|
+ /**
|
|
* Regex to parse lines in input file passed to mapreduce task.
|
|
* Regex to parse lines in input file passed to mapreduce task.
|
|
*/
|
|
*/
|
|
public static final Pattern LINE_PATTERN =
|
|
public static final Pattern LINE_PATTERN =
|
|
Pattern.compile("startRow=(\\d+),\\s+" +
|
|
Pattern.compile("startRow=(\\d+),\\s+" +
|
|
"perClientRunRows=(\\d+),\\s+totalRows=(\\d+),\\s+clients=(\\d+)");
|
|
"perClientRunRows=(\\d+),\\s+totalRows=(\\d+),\\s+clients=(\\d+)");
|
|
|
|
|
|
- /*
|
|
|
|
|
|
+ /**
|
|
* Enum for map metrics. Keep it out here rather than inside in the Map
|
|
* Enum for map metrics. Keep it out here rather than inside in the Map
|
|
* inner-class so we can find associated properties.
|
|
* inner-class so we can find associated properties.
|
|
*/
|
|
*/
|
|
- protected static enum Counter {ELAPSED_TIME, ROWS}
|
|
|
|
|
|
+ protected static enum Counter {
|
|
|
|
+ /** elapsed time */
|
|
|
|
+ ELAPSED_TIME,
|
|
|
|
+ /** number of rows */
|
|
|
|
+ ROWS}
|
|
|
|
|
|
|
|
|
|
- public PerformanceEvaluation(final HBaseConfiguration c) {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Constructor
|
|
|
|
+ * @param c Configuration object
|
|
|
|
+ */
|
|
|
|
+ public PerformanceEvaluation(final Configuration c) {
|
|
this.conf = c;
|
|
this.conf = c;
|
|
- this.client = new HClient(conf);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- /*
|
|
|
|
|
|
+ /**
|
|
* Implementations can have their status set.
|
|
* Implementations can have their status set.
|
|
*/
|
|
*/
|
|
static interface Status {
|
|
static interface Status {
|
|
|
|
+ /**
|
|
|
|
+ * Sets status
|
|
|
|
+ * @param msg status message
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
void setStatus(final String msg) throws IOException;
|
|
void setStatus(final String msg) throws IOException;
|
|
}
|
|
}
|
|
|
|
|
|
- /*
|
|
|
|
|
|
+ /**
|
|
* MapReduce job that runs a performance evaluation client in each map task.
|
|
* MapReduce job that runs a performance evaluation client in each map task.
|
|
*/
|
|
*/
|
|
public static class EvaluationMapTask extends MapReduceBase
|
|
public static class EvaluationMapTask extends MapReduceBase
|
|
implements Mapper {
|
|
implements Mapper {
|
|
|
|
+ /** configuration parameter name that contains the command */
|
|
public final static String CMD_KEY = "EvaluationMapTask.command";
|
|
public final static String CMD_KEY = "EvaluationMapTask.command";
|
|
private String cmd;
|
|
private String cmd;
|
|
private PerformanceEvaluation pe;
|
|
private PerformanceEvaluation pe;
|
|
|
|
|
|
|
|
+ /** {@inheritDoc} */
|
|
@Override
|
|
@Override
|
|
public void configure(JobConf j) {
|
|
public void configure(JobConf j) {
|
|
this.cmd = j.get(CMD_KEY);
|
|
this.cmd = j.get(CMD_KEY);
|
|
- this.pe = new PerformanceEvaluation(new HBaseConfiguration());
|
|
|
|
|
|
+
|
|
|
|
+ this.pe = new PerformanceEvaluation(j);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /** {@inheritDoc} */
|
|
public void map(@SuppressWarnings("unused") final WritableComparable key,
|
|
public void map(@SuppressWarnings("unused") final WritableComparable key,
|
|
final Writable value, final OutputCollector output,
|
|
final Writable value, final OutputCollector output,
|
|
final Reporter reporter)
|
|
final Reporter reporter)
|
|
@@ -160,7 +175,7 @@ public class PerformanceEvaluation implements HConstants {
|
|
int perClientRunRows = Integer.parseInt(m.group(2));
|
|
int perClientRunRows = Integer.parseInt(m.group(2));
|
|
int totalRows = Integer.parseInt(m.group(3));
|
|
int totalRows = Integer.parseInt(m.group(3));
|
|
Status status = new Status() {
|
|
Status status = new Status() {
|
|
- public void setStatus(String msg) throws IOException {
|
|
|
|
|
|
+ public void setStatus(String msg) {
|
|
reporter.setStatus(msg);
|
|
reporter.setStatus(msg);
|
|
}
|
|
}
|
|
};
|
|
};
|
|
@@ -182,8 +197,8 @@ public class PerformanceEvaluation implements HConstants {
|
|
* @return True if we created the table.
|
|
* @return True if we created the table.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private boolean checkTable(final HClient c) throws IOException {
|
|
|
|
- HTableDescriptor [] extantTables = c.listTables();
|
|
|
|
|
|
+ private boolean checkTable(HBaseAdmin admin) throws IOException {
|
|
|
|
+ HTableDescriptor [] extantTables = admin.listTables();
|
|
boolean tableExists = false;
|
|
boolean tableExists = false;
|
|
if (extantTables.length > 0) {
|
|
if (extantTables.length > 0) {
|
|
// Check to see if our table already exists. Print warning if it does.
|
|
// Check to see if our table already exists. Print warning if it does.
|
|
@@ -196,7 +211,7 @@ public class PerformanceEvaluation implements HConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (!tableExists) {
|
|
if (!tableExists) {
|
|
- c.createTable(tableDescriptor);
|
|
|
|
|
|
+ admin.createTable(tableDescriptor);
|
|
LOG.info("Table " + tableDescriptor + " created");
|
|
LOG.info("Table " + tableDescriptor + " created");
|
|
}
|
|
}
|
|
return !tableExists;
|
|
return !tableExists;
|
|
@@ -210,7 +225,7 @@ public class PerformanceEvaluation implements HConstants {
|
|
*/
|
|
*/
|
|
private void runNIsMoreThanOne(final String cmd)
|
|
private void runNIsMoreThanOne(final String cmd)
|
|
throws IOException {
|
|
throws IOException {
|
|
- checkTable(this.client);
|
|
|
|
|
|
+ checkTable(new HBaseAdmin(conf));
|
|
|
|
|
|
// Run a mapreduce job. Run as many maps as asked-for clients.
|
|
// Run a mapreduce job. Run as many maps as asked-for clients.
|
|
// Before we start up the job, write out an input file with instruction
|
|
// Before we start up the job, write out an input file with instruction
|
|
@@ -269,20 +284,23 @@ public class PerformanceEvaluation implements HConstants {
|
|
*/
|
|
*/
|
|
static abstract class Test {
|
|
static abstract class Test {
|
|
protected final Random rand = new Random(System.currentTimeMillis());
|
|
protected final Random rand = new Random(System.currentTimeMillis());
|
|
- protected final HClient client;
|
|
|
|
protected final int startRow;
|
|
protected final int startRow;
|
|
protected final int perClientRunRows;
|
|
protected final int perClientRunRows;
|
|
protected final int totalRows;
|
|
protected final int totalRows;
|
|
private final Status status;
|
|
private final Status status;
|
|
|
|
+ protected HBaseAdmin admin;
|
|
|
|
+ protected HTable table;
|
|
|
|
+ protected volatile Configuration conf;
|
|
|
|
|
|
- Test(final HClient c, final int startRow, final int perClientRunRows,
|
|
|
|
- final int totalRows, final Status status) {
|
|
|
|
|
|
+ Test(final Configuration conf, final int startRow,
|
|
|
|
+ final int perClientRunRows, final int totalRows, final Status status) {
|
|
super();
|
|
super();
|
|
- this.client = c;
|
|
|
|
this.startRow = startRow;
|
|
this.startRow = startRow;
|
|
this.perClientRunRows = perClientRunRows;
|
|
this.perClientRunRows = perClientRunRows;
|
|
this.totalRows = totalRows;
|
|
this.totalRows = totalRows;
|
|
this.status = status;
|
|
this.status = status;
|
|
|
|
+ this.table = null;
|
|
|
|
+ this.conf = conf;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -305,9 +323,11 @@ public class PerformanceEvaluation implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
void testSetup() throws IOException {
|
|
void testSetup() throws IOException {
|
|
- this.client.openTable(tableDescriptor.getName());
|
|
|
|
|
|
+ this.admin = new HBaseAdmin(conf);
|
|
|
|
+ this.table = new HTable(conf, tableDescriptor.getName());
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("unused")
|
|
void testTakedown() throws IOException {
|
|
void testTakedown() throws IOException {
|
|
// Empty
|
|
// Empty
|
|
}
|
|
}
|
|
@@ -355,16 +375,17 @@ public class PerformanceEvaluation implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
class RandomReadTest extends Test {
|
|
class RandomReadTest extends Test {
|
|
- RandomReadTest(final HClient c, final int startRow,
|
|
|
|
- final int perClientRunRows, final int totalRows, final Status status) {
|
|
|
|
- super(c, startRow, perClientRunRows, totalRows, status);
|
|
|
|
|
|
+ RandomReadTest(final Configuration conf, final int startRow,
|
|
|
|
+ final int perClientRunRows, final int totalRows, final Status status) {
|
|
|
|
+ super(conf, startRow, perClientRunRows, totalRows, status);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
void testRow(@SuppressWarnings("unused") final int i) throws IOException {
|
|
void testRow(@SuppressWarnings("unused") final int i) throws IOException {
|
|
- this.client.get(getRandomRow(), COLUMN_NAME);
|
|
|
|
|
|
+ this.table.get(getRandomRow(), COLUMN_NAME);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ @Override
|
|
protected int getReportingPeriod() {
|
|
protected int getReportingPeriod() {
|
|
//
|
|
//
|
|
return this.perClientRunRows / 100;
|
|
return this.perClientRunRows / 100;
|
|
@@ -377,17 +398,17 @@ public class PerformanceEvaluation implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
class RandomWriteTest extends Test {
|
|
class RandomWriteTest extends Test {
|
|
- RandomWriteTest(final HClient c, final int startRow,
|
|
|
|
- final int perClientRunRows, final int totalRows, final Status status) {
|
|
|
|
- super(c, startRow, perClientRunRows, totalRows, status);
|
|
|
|
|
|
+ RandomWriteTest(final Configuration conf, final int startRow,
|
|
|
|
+ final int perClientRunRows, final int totalRows, final Status status) {
|
|
|
|
+ super(conf, startRow, perClientRunRows, totalRows, status);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
void testRow(@SuppressWarnings("unused") final int i) throws IOException {
|
|
void testRow(@SuppressWarnings("unused") final int i) throws IOException {
|
|
Text row = getRandomRow();
|
|
Text row = getRandomRow();
|
|
- long lockid = client.startUpdate(row);
|
|
|
|
- client.put(lockid, COLUMN_NAME, generateValue());
|
|
|
|
- client.commit(lockid);
|
|
|
|
|
|
+ long lockid = table.startUpdate(row);
|
|
|
|
+ table.put(lockid, COLUMN_NAME, generateValue());
|
|
|
|
+ table.commit(lockid);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -401,15 +422,15 @@ public class PerformanceEvaluation implements HConstants {
|
|
private HStoreKey key = new HStoreKey();
|
|
private HStoreKey key = new HStoreKey();
|
|
private TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
private TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
|
|
|
- ScanTest(final HClient c, final int startRow, final int perClientRunRows,
|
|
|
|
- final int totalRows, final Status status) {
|
|
|
|
- super(c, startRow, perClientRunRows, totalRows, status);
|
|
|
|
|
|
+ ScanTest(final Configuration conf, final int startRow,
|
|
|
|
+ final int perClientRunRows, final int totalRows, final Status status) {
|
|
|
|
+ super(conf, startRow, perClientRunRows, totalRows, status);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
void testSetup() throws IOException {
|
|
void testSetup() throws IOException {
|
|
super.testSetup();
|
|
super.testSetup();
|
|
- this.testScanner = client.obtainScanner(new Text[] {COLUMN_NAME},
|
|
|
|
|
|
+ this.testScanner = table.obtainScanner(new Text[] {COLUMN_NAME},
|
|
new Text(Integer.toString(this.startRow)));
|
|
new Text(Integer.toString(this.startRow)));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -435,14 +456,14 @@ public class PerformanceEvaluation implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
class SequentialReadTest extends Test {
|
|
class SequentialReadTest extends Test {
|
|
- SequentialReadTest(final HClient c, final int startRow,
|
|
|
|
|
|
+ SequentialReadTest(final Configuration conf, final int startRow,
|
|
final int perClientRunRows, final int totalRows, final Status status) {
|
|
final int perClientRunRows, final int totalRows, final Status status) {
|
|
- super(c, startRow, perClientRunRows, totalRows, status);
|
|
|
|
|
|
+ super(conf, startRow, perClientRunRows, totalRows, status);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
void testRow(final int i) throws IOException {
|
|
void testRow(final int i) throws IOException {
|
|
- client.get(new Text(Integer.toString(i)), COLUMN_NAME);
|
|
|
|
|
|
+ table.get(new Text(Integer.toString(i)), COLUMN_NAME);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -452,16 +473,16 @@ public class PerformanceEvaluation implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
class SequentialWriteTest extends Test {
|
|
class SequentialWriteTest extends Test {
|
|
- SequentialWriteTest(final HClient c, final int startRow,
|
|
|
|
|
|
+ SequentialWriteTest(final Configuration conf, final int startRow,
|
|
final int perClientRunRows, final int totalRows, final Status status) {
|
|
final int perClientRunRows, final int totalRows, final Status status) {
|
|
- super(c, startRow, perClientRunRows, totalRows, status);
|
|
|
|
|
|
+ super(conf, startRow, perClientRunRows, totalRows, status);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
void testRow(final int i) throws IOException {
|
|
void testRow(final int i) throws IOException {
|
|
- long lockid = client.startUpdate(new Text(Integer.toString(i)));
|
|
|
|
- client.put(lockid, COLUMN_NAME, generateValue());
|
|
|
|
- client.commit(lockid);
|
|
|
|
|
|
+ long lockid = table.startUpdate(new Text(Integer.toString(i)));
|
|
|
|
+ table.put(lockid, COLUMN_NAME, generateValue());
|
|
|
|
+ table.commit(lockid);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -477,25 +498,25 @@ public class PerformanceEvaluation implements HConstants {
|
|
perClientRunRows + " rows");
|
|
perClientRunRows + " rows");
|
|
long totalElapsedTime = 0;
|
|
long totalElapsedTime = 0;
|
|
if (cmd.equals(RANDOM_READ)) {
|
|
if (cmd.equals(RANDOM_READ)) {
|
|
- Test t = new RandomReadTest(this.client, startRow, perClientRunRows,
|
|
|
|
|
|
+ Test t = new RandomReadTest(this.conf, startRow, perClientRunRows,
|
|
totalRows, status);
|
|
totalRows, status);
|
|
totalElapsedTime = t.test();
|
|
totalElapsedTime = t.test();
|
|
} else if (cmd.equals(RANDOM_READ_MEM)) {
|
|
} else if (cmd.equals(RANDOM_READ_MEM)) {
|
|
throw new UnsupportedOperationException("Not yet implemented");
|
|
throw new UnsupportedOperationException("Not yet implemented");
|
|
} else if (cmd.equals(RANDOM_WRITE)) {
|
|
} else if (cmd.equals(RANDOM_WRITE)) {
|
|
- Test t = new RandomWriteTest(this.client, startRow, perClientRunRows,
|
|
|
|
|
|
+ Test t = new RandomWriteTest(this.conf, startRow, perClientRunRows,
|
|
totalRows, status);
|
|
totalRows, status);
|
|
totalElapsedTime = t.test();
|
|
totalElapsedTime = t.test();
|
|
} else if (cmd.equals(SCAN)) {
|
|
} else if (cmd.equals(SCAN)) {
|
|
- Test t = new ScanTest(this.client, startRow, perClientRunRows,
|
|
|
|
|
|
+ Test t = new ScanTest(this.conf, startRow, perClientRunRows,
|
|
totalRows, status);
|
|
totalRows, status);
|
|
totalElapsedTime = t.test();
|
|
totalElapsedTime = t.test();
|
|
} else if (cmd.equals(SEQUENTIAL_READ)) {
|
|
} else if (cmd.equals(SEQUENTIAL_READ)) {
|
|
- Test t = new SequentialReadTest(this.client, startRow, perClientRunRows,
|
|
|
|
|
|
+ Test t = new SequentialReadTest(this.conf, startRow, perClientRunRows,
|
|
totalRows, status);
|
|
totalRows, status);
|
|
totalElapsedTime = t.test();
|
|
totalElapsedTime = t.test();
|
|
} else if (cmd.equals(SEQUENTIAL_WRITE)) {
|
|
} else if (cmd.equals(SEQUENTIAL_WRITE)) {
|
|
- Test t = new SequentialWriteTest(this.client, startRow, perClientRunRows,
|
|
|
|
|
|
+ Test t = new SequentialWriteTest(this.conf, startRow, perClientRunRows,
|
|
totalRows, status);
|
|
totalRows, status);
|
|
totalElapsedTime = t.test();
|
|
totalElapsedTime = t.test();
|
|
} else {
|
|
} else {
|
|
@@ -513,9 +534,11 @@ public class PerformanceEvaluation implements HConstants {
|
|
LOG.info(msg);
|
|
LOG.info(msg);
|
|
}
|
|
}
|
|
};
|
|
};
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ HBaseAdmin admin = null;
|
|
try {
|
|
try {
|
|
- checkTable(this.client);
|
|
|
|
|
|
+ admin = new HBaseAdmin(this.conf);
|
|
|
|
+ checkTable(admin);
|
|
|
|
|
|
if (cmd.equals(RANDOM_READ) || cmd.equals(RANDOM_READ_MEM) ||
|
|
if (cmd.equals(RANDOM_READ) || cmd.equals(RANDOM_READ_MEM) ||
|
|
cmd.equals(SCAN) || cmd.equals(SEQUENTIAL_READ)) {
|
|
cmd.equals(SCAN) || cmd.equals(SEQUENTIAL_READ)) {
|
|
@@ -529,7 +552,9 @@ public class PerformanceEvaluation implements HConstants {
|
|
LOG.error("Failed", e);
|
|
LOG.error("Failed", e);
|
|
} finally {
|
|
} finally {
|
|
LOG.info("Deleting table " + tableDescriptor.getName());
|
|
LOG.info("Deleting table " + tableDescriptor.getName());
|
|
- this.client.deleteTable(tableDescriptor.getName());
|
|
|
|
|
|
+ if (admin != null) {
|
|
|
|
+ admin.deleteTable(tableDescriptor.getName());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|