|
@@ -30,7 +30,6 @@ import org.apache.hadoop.dfs.MiniDFSCluster;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hbase.HBaseAdmin;
|
|
import org.apache.hadoop.hbase.HBaseAdmin;
|
|
-import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
|
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
import org.apache.hadoop.hbase.HConstants;
|
|
import org.apache.hadoop.hbase.HConstants;
|
|
import org.apache.hadoop.hbase.HScannerInterface;
|
|
import org.apache.hadoop.hbase.HScannerInterface;
|
|
@@ -55,7 +54,7 @@ public class TestTableMapReduce extends MultiRegionTable {
|
|
@SuppressWarnings("hiding")
|
|
@SuppressWarnings("hiding")
|
|
private static final Log LOG =
|
|
private static final Log LOG =
|
|
LogFactory.getLog(TestTableMapReduce.class.getName());
|
|
LogFactory.getLog(TestTableMapReduce.class.getName());
|
|
-
|
|
|
|
|
|
+
|
|
static final String SINGLE_REGION_TABLE_NAME = "srtest";
|
|
static final String SINGLE_REGION_TABLE_NAME = "srtest";
|
|
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
|
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
|
static final String INPUT_COLUMN = "contents:";
|
|
static final String INPUT_COLUMN = "contents:";
|
|
@@ -63,6 +62,11 @@ public class TestTableMapReduce extends MultiRegionTable {
|
|
static final String OUTPUT_COLUMN = "text:";
|
|
static final String OUTPUT_COLUMN = "text:";
|
|
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
|
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
|
|
|
|
|
|
|
+ private static final Text[] columns = {
|
|
|
|
+ TEXT_INPUT_COLUMN,
|
|
|
|
+ TEXT_OUTPUT_COLUMN
|
|
|
|
+ };
|
|
|
|
+
|
|
private MiniDFSCluster dfsCluster = null;
|
|
private MiniDFSCluster dfsCluster = null;
|
|
private FileSystem fs;
|
|
private FileSystem fs;
|
|
private Path dir;
|
|
private Path dir;
|
|
@@ -232,7 +236,7 @@ public class TestTableMapReduce extends MultiRegionTable {
|
|
}
|
|
}
|
|
|
|
|
|
LOG.info("Print table contents before map/reduce");
|
|
LOG.info("Print table contents before map/reduce");
|
|
- scanTable(conf, SINGLE_REGION_TABLE_NAME, true);
|
|
|
|
|
|
+ scanTable(SINGLE_REGION_TABLE_NAME, true);
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
@SuppressWarnings("deprecation")
|
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
|
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
|
@@ -256,10 +260,10 @@ public class TestTableMapReduce extends MultiRegionTable {
|
|
}
|
|
}
|
|
|
|
|
|
LOG.info("Print table contents after map/reduce");
|
|
LOG.info("Print table contents after map/reduce");
|
|
- scanTable(conf, SINGLE_REGION_TABLE_NAME, true);
|
|
|
|
|
|
+ scanTable(SINGLE_REGION_TABLE_NAME, true);
|
|
|
|
|
|
// verify map-reduce results
|
|
// verify map-reduce results
|
|
- verify(conf, SINGLE_REGION_TABLE_NAME);
|
|
|
|
|
|
+ verify(SINGLE_REGION_TABLE_NAME);
|
|
|
|
|
|
} finally {
|
|
} finally {
|
|
table.close();
|
|
table.close();
|
|
@@ -311,21 +315,17 @@ public class TestTableMapReduce extends MultiRegionTable {
|
|
}
|
|
}
|
|
|
|
|
|
// verify map-reduce results
|
|
// verify map-reduce results
|
|
- verify(conf, MULTI_REGION_TABLE_NAME);
|
|
|
|
|
|
+ verify(MULTI_REGION_TABLE_NAME);
|
|
|
|
|
|
} finally {
|
|
} finally {
|
|
table.close();
|
|
table.close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void scanTable(HBaseConfiguration conf, String tableName,
|
|
|
|
- boolean printValues) throws IOException {
|
|
|
|
|
|
+ private void scanTable(String tableName, boolean printValues)
|
|
|
|
+ throws IOException {
|
|
HTable table = new HTable(conf, new Text(tableName));
|
|
HTable table = new HTable(conf, new Text(tableName));
|
|
|
|
|
|
- Text[] columns = {
|
|
|
|
- TEXT_INPUT_COLUMN,
|
|
|
|
- TEXT_OUTPUT_COLUMN
|
|
|
|
- };
|
|
|
|
HScannerInterface scanner =
|
|
HScannerInterface scanner =
|
|
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
|
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
|
|
|
|
|
@@ -350,14 +350,17 @@ public class TestTableMapReduce extends MultiRegionTable {
|
|
}
|
|
}
|
|
|
|
|
|
@SuppressWarnings("null")
|
|
@SuppressWarnings("null")
|
|
- private void verify(HBaseConfiguration conf, String tableName)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ private void verify(String tableName) throws IOException {
|
|
|
|
+ // Sleep before we start the verify to ensure that when the scanner takes
|
|
|
|
+ // its snapshot, all the updates have made it into the cache.
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(conf.getLong("hbase.regionserver.optionalcacheflushinterval",
|
|
|
|
+ 60L * 1000L));
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ // ignore
|
|
|
|
+ }
|
|
HTable table = new HTable(conf, new Text(tableName));
|
|
HTable table = new HTable(conf, new Text(tableName));
|
|
|
|
|
|
- Text[] columns = {
|
|
|
|
- TEXT_INPUT_COLUMN,
|
|
|
|
- TEXT_OUTPUT_COLUMN
|
|
|
|
- };
|
|
|
|
HScannerInterface scanner =
|
|
HScannerInterface scanner =
|
|
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
|
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
|
|
|
|