|
@@ -21,26 +21,21 @@ package org.apache.hadoop.hbase;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Random;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-
|
|
|
import org.apache.hadoop.dfs.MiniDFSCluster;
|
|
|
-
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
-
|
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
-
|
|
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|
|
import org.apache.hadoop.hbase.io.MapWritable;
|
|
|
-
|
|
|
import org.apache.hadoop.hbase.mapred.TableMap;
|
|
|
import org.apache.hadoop.hbase.mapred.TableOutputCollector;
|
|
|
import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
|
|
@@ -49,15 +44,16 @@ import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
|
|
|
* Test Map/Reduce job over HBase tables
|
|
|
*/
|
|
|
public class TestTableMapReduce extends HBaseTestCase {
|
|
|
- static final String TABLE_NAME = "test";
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestTableMapReduce.class.getName());
|
|
|
+
|
|
|
+ static final String SINGLE_REGION_TABLE_NAME = "srtest";
|
|
|
+ static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
|
|
static final String INPUT_COLUMN = "contents:";
|
|
|
static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
|
|
|
static final String OUTPUT_COLUMN = "text:";
|
|
|
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
|
|
|
|
|
- private Random rand;
|
|
|
- private HTableDescriptor desc;
|
|
|
-
|
|
|
private MiniDFSCluster dfsCluster = null;
|
|
|
private FileSystem fs;
|
|
|
private Path dir;
|
|
@@ -76,51 +72,23 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|
|
@Override
|
|
|
public void setUp() throws Exception {
|
|
|
super.setUp();
|
|
|
- rand = new Random();
|
|
|
- desc = new HTableDescriptor("test");
|
|
|
- desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
|
|
- desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
|
|
-
|
|
|
+ // This size is picked so the table is split into two
|
|
|
+ // after addContent in testMultiRegionTableMapReduce.
|
|
|
+ conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
|
|
|
dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
|
|
|
try {
|
|
|
fs = dfsCluster.getFileSystem();
|
|
|
dir = new Path("/hbase");
|
|
|
fs.mkdirs(dir);
|
|
|
-
|
|
|
- // create the root and meta regions and insert the data region into the meta
|
|
|
-
|
|
|
- HRegion root = createNewHRegion(dir, conf, HGlobals.rootTableDesc, 0L, null, null);
|
|
|
- HRegion meta = createNewHRegion(dir, conf, HGlobals.metaTableDesc, 1L, null, null);
|
|
|
- HRegion.addRegionToMETA(root, meta);
|
|
|
-
|
|
|
- HRegion region = createNewHRegion(dir, conf, desc, rand.nextLong(), null, null);
|
|
|
- HRegion.addRegionToMETA(meta, region);
|
|
|
-
|
|
|
- // insert some data into the test table
|
|
|
-
|
|
|
- for(int i = 0; i < values.length; i++) {
|
|
|
- long lockid = region.startUpdate(new Text("row_"
|
|
|
- + String.format("%1$05d", i)));
|
|
|
-
|
|
|
- region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
|
|
|
- region.commit(lockid, System.currentTimeMillis());
|
|
|
- }
|
|
|
-
|
|
|
- region.close();
|
|
|
- region.getLog().closeAndDelete();
|
|
|
- meta.close();
|
|
|
- meta.getLog().closeAndDelete();
|
|
|
- root.close();
|
|
|
- root.getLog().closeAndDelete();
|
|
|
-
|
|
|
// Start up HBase cluster
|
|
|
-
|
|
|
hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
|
|
|
-
|
|
|
+ LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS));
|
|
|
} catch (Exception e) {
|
|
|
if (dfsCluster != null) {
|
|
|
dfsCluster.shutdown();
|
|
|
+ dfsCluster = null;
|
|
|
}
|
|
|
+ throw e;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -130,11 +98,13 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|
|
@Override
|
|
|
public void tearDown() throws Exception {
|
|
|
super.tearDown();
|
|
|
-
|
|
|
if(hCluster != null) {
|
|
|
hCluster.shutdown();
|
|
|
}
|
|
|
|
|
|
+ if (dfsCluster != null) {
|
|
|
+ dfsCluster.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -190,18 +160,50 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|
|
output.collect(tKey, outval);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Test HBase map/reduce
|
|
|
- * @throws IOException
|
|
|
+ * Test hbase mapreduce jobs against single region and multi-region tables.
|
|
|
*/
|
|
|
- @SuppressWarnings("static-access")
|
|
|
public void testTableMapReduce() throws IOException {
|
|
|
- System.out.println("Print table contents before map/reduce");
|
|
|
- scanTable(conf);
|
|
|
+ localTestSingleRegionTable();
|
|
|
+ localTestMultiRegionTable();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Test against a single region.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void localTestSingleRegionTable() throws IOException {
|
|
|
+ HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME);
|
|
|
+ desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
|
|
+ desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
|
|
+
|
|
|
+ // Create a table.
|
|
|
+ HBaseAdmin admin = new HBaseAdmin(this.conf);
|
|
|
+ admin.createTable(desc);
|
|
|
+
|
|
|
+ // insert some data into the test table
|
|
|
+ HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME));
|
|
|
+
|
|
|
+ for(int i = 0; i < values.length; i++) {
|
|
|
+ long lockid = table.startUpdate(new Text("row_"
|
|
|
+ + String.format("%1$05d", i)));
|
|
|
+
|
|
|
+ try {
|
|
|
+ table.put(lockid, TEXT_INPUT_COLUMN, values[i]);
|
|
|
+ table.commit(lockid, System.currentTimeMillis());
|
|
|
+ lockid = -1;
|
|
|
+ } finally {
|
|
|
+ if (lockid != -1)
|
|
|
+ table.abort(lockid);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Print table contents before map/reduce");
|
|
|
+ scanTable(conf, SINGLE_REGION_TABLE_NAME);
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
|
- MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1);
|
|
|
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
|
|
|
|
|
try {
|
|
|
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
|
|
@@ -209,10 +211,11 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|
|
jobConf.setNumMapTasks(1);
|
|
|
jobConf.setNumReduceTasks(1);
|
|
|
|
|
|
- ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN,
|
|
|
+ TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN,
|
|
|
ProcessContentsMapper.class, jobConf);
|
|
|
|
|
|
- IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf);
|
|
|
+ IdentityTableReduce.initJob(SINGLE_REGION_TABLE_NAME,
|
|
|
+ IdentityTableReduce.class, jobConf);
|
|
|
|
|
|
JobClient.runJob(jobConf);
|
|
|
|
|
@@ -220,12 +223,63 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|
|
mrCluster.shutdown();
|
|
|
}
|
|
|
|
|
|
- System.out.println("Print table contents after map/reduce");
|
|
|
- scanTable(conf);
|
|
|
+ LOG.info("Print table contents after map/reduce");
|
|
|
+ scanTable(conf, SINGLE_REGION_TABLE_NAME);
|
|
|
+
|
|
|
+ // verify map-reduce results
|
|
|
+ verify(conf, SINGLE_REGION_TABLE_NAME);
|
|
|
}
|
|
|
|
|
|
- private void scanTable(Configuration conf) throws IOException {
|
|
|
- HTable table = new HTable(conf, new Text(TABLE_NAME));
|
|
|
+ /*
|
|
|
+ * Test against multiple regions.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void localTestMultiRegionTable() throws IOException {
|
|
|
+ HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
|
|
|
+ desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
|
|
+ desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
|
|
+
|
|
|
+ // Create a table.
|
|
|
+ HBaseAdmin admin = new HBaseAdmin(this.conf);
|
|
|
+ admin.createTable(desc);
|
|
|
+
|
|
|
+ // Populate a table into multiple regions
|
|
|
+ MultiRegionTable.makeMultiRegionTable(conf, hCluster, null,
|
|
|
+ MULTI_REGION_TABLE_NAME, INPUT_COLUMN);
|
|
|
+
|
|
|
+ // Verify table indeed has multiple regions
|
|
|
+ HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME));
|
|
|
+ Text[] startKeys = table.getStartKeys();
|
|
|
+ assertTrue(startKeys.length > 1);
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
|
|
+
|
|
|
+ try {
|
|
|
+ JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
|
|
|
+ jobConf.setJobName("process column contents");
|
|
|
+ jobConf.setNumMapTasks(2);
|
|
|
+ jobConf.setNumReduceTasks(1);
|
|
|
+
|
|
|
+ TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN,
|
|
|
+ ProcessContentsMapper.class, jobConf);
|
|
|
+
|
|
|
+ IdentityTableReduce.initJob(MULTI_REGION_TABLE_NAME,
|
|
|
+ IdentityTableReduce.class, jobConf);
|
|
|
+
|
|
|
+ JobClient.runJob(jobConf);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ mrCluster.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify map-reduce results
|
|
|
+ verify(conf, MULTI_REGION_TABLE_NAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void scanTable(Configuration conf, String tableName)
|
|
|
+ throws IOException {
|
|
|
+ HTable table = new HTable(conf, new Text(tableName));
|
|
|
|
|
|
Text[] columns = {
|
|
|
TEXT_INPUT_COLUMN,
|
|
@@ -239,13 +293,51 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|
|
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
|
|
|
while(scanner.next(key, results)) {
|
|
|
- System.out.print("row: " + key.getRow());
|
|
|
+ LOG.info("row: " + key.getRow());
|
|
|
|
|
|
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
|
|
|
- System.out.print(" column: " + e.getKey() + " value: "
|
|
|
+ LOG.info(" column: " + e.getKey() + " value: "
|
|
|
+ new String(e.getValue()));
|
|
|
}
|
|
|
- System.out.println();
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ scanner.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verify(Configuration conf, String tableName) throws IOException {
|
|
|
+ HTable table = new HTable(conf, new Text(tableName));
|
|
|
+
|
|
|
+ Text[] columns = {
|
|
|
+ TEXT_INPUT_COLUMN,
|
|
|
+ TEXT_OUTPUT_COLUMN
|
|
|
+ };
|
|
|
+ HScannerInterface scanner =
|
|
|
+ table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
|
|
+
|
|
|
+ try {
|
|
|
+ HStoreKey key = new HStoreKey();
|
|
|
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
+
|
|
|
+ while(scanner.next(key, results)) {
|
|
|
+ byte[] firstValue = null;
|
|
|
+ byte[] secondValue = null;
|
|
|
+ int count = 0;
|
|
|
+
|
|
|
+ for(Map.Entry<Text, byte[]> e: results.entrySet()) {
|
|
|
+ if (count == 0)
|
|
|
+ firstValue = e.getValue();
|
|
|
+ if (count == 1)
|
|
|
+ secondValue = e.getValue();
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify second value is the reverse of the first
|
|
|
+ assertEquals(firstValue.length, secondValue.length);
|
|
|
+ for (int i=0; i<firstValue.length; i++) {
|
|
|
+ assertEquals(firstValue[i], secondValue[firstValue.length-i-1]);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
} finally {
|