|
@@ -21,10 +21,14 @@ package org.apache.hadoop.hbase;
|
|
|
import java.io.IOException;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
|
|
|
+import org.apache.hadoop.hbase.util.Writables;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
|
|
/**
|
|
|
- * Tests user specifiable time stamps
|
|
|
+ * Tests user specifiable time stamps putting, getting and scanning. Also
|
|
|
+ * tests same in presence of deletes. Test cores are written so can be
|
|
|
+ * run against an HRegion and against an HTable: i.e. both local and remote.
|
|
|
*/
|
|
|
public class TestTimestamp extends HBaseTestCase {
|
|
|
private static final long T0 = 10L;
|
|
@@ -32,74 +36,196 @@ public class TestTimestamp extends HBaseTestCase {
|
|
|
private static final long T2 = 200L;
|
|
|
|
|
|
private static final String COLUMN_NAME = "contents:";
|
|
|
- private static final String TABLE_NAME = "test";
|
|
|
- private static final String VERSION1 = "version1";
|
|
|
- private static final String LATEST = "latest";
|
|
|
|
|
|
private static final Text COLUMN = new Text(COLUMN_NAME);
|
|
|
- private static final Text[] COLUMNS = {
|
|
|
- COLUMN
|
|
|
- };
|
|
|
- private static final Text TABLE = new Text(TABLE_NAME);
|
|
|
+ private static final Text[] COLUMNS = {COLUMN};
|
|
|
private static final Text ROW = new Text("row");
|
|
|
+
|
|
|
+ // When creating column descriptor, how many versions of a cell to allow.
|
|
|
+ private static final int VERSIONS = 3;
|
|
|
|
|
|
/**
|
|
|
* Test that delete works according to description in <a
|
|
|
- * href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>
|
|
|
- * when it comes to timestamps.
|
|
|
+ * href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void testDelete() throws IOException {
|
|
|
- HRegion r = createRegion();
|
|
|
+ final HRegion r = createRegion();
|
|
|
try {
|
|
|
- HRegionLoader loader = new HRegionLoader(r);
|
|
|
- // Add a couple of values for three different timestamps.
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T2);
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
|
|
|
- // If I delete w/o specifying a timestamp, this means I'm deleting the
|
|
|
- // latest.
|
|
|
- delete(r, System.currentTimeMillis());
|
|
|
- // Verify that I get back T2 through T0.
|
|
|
+ doTestDelete(new HRegionIncommon(r), new FlushCache() {
|
|
|
+ public void flushcache() throws IOException {
|
|
|
+ r.flushcache(false);
|
|
|
+ }
|
|
|
+ });
|
|
|
} finally {
|
|
|
r.close();
|
|
|
r.getLog().closeAndDelete();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private void delete(final HRegion r, final long ts) throws IOException {
|
|
|
- long lockid = r.startUpdate(ROW);
|
|
|
- r.delete(lockid, COLUMN);
|
|
|
- r.commit(lockid, ts == -1? System.currentTimeMillis(): ts);
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Test scanning against different timestamps.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void testTimestampScanning() throws IOException {
|
|
|
- HRegion r = createRegion();
|
|
|
+ final HRegion r = createRegion();
|
|
|
try {
|
|
|
- HRegionLoader loader = new HRegionLoader(r);
|
|
|
- // Add a couple of values for three different timestamps.
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
|
|
|
- addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
|
|
|
- // Get count of latest items.
|
|
|
- int count = assertScanContentTimestamp(r, System.currentTimeMillis());
|
|
|
- // Assert I get same count when I scan at each timestamp.
|
|
|
- assertEquals(count, assertScanContentTimestamp(r, T0));
|
|
|
- assertEquals(count, assertScanContentTimestamp(r, T1));
|
|
|
- // Flush everything out to disk and then retry
|
|
|
- r.flushcache(false);
|
|
|
- assertEquals(count, assertScanContentTimestamp(r, T0));
|
|
|
- assertEquals(count, assertScanContentTimestamp(r, T1));
|
|
|
+ doTestTimestampScanning(new HRegionIncommon(r), new FlushCache() {
|
|
|
+ public void flushcache() throws IOException {
|
|
|
+ r.flushcache(false);
|
|
|
+ }
|
|
|
+ });
|
|
|
} finally {
|
|
|
r.close();
|
|
|
r.getLog().closeAndDelete();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Basic test of timestamps.
|
|
|
+ * Do the above tests from client side.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testTimestamps() throws IOException {
|
|
|
+ final MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
|
|
|
+ try {
|
|
|
+ HTable t = createTable();
|
|
|
+ Incommon incommon = new HTableIncommon(t);
|
|
|
+ doTestDelete(incommon, new FlushCache() {
|
|
|
+ public void flushcache() throws IOException {
|
|
|
+ cluster.flushcache();
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // Perhaps drop and readd the table between tests so the former does
|
|
|
+ // not pollute this latter? Or put into separate tests.
|
|
|
+ doTestTimestampScanning(incommon, new FlushCache() {
|
|
|
+ public void flushcache() throws IOException {
|
|
|
+ cluster.flushcache();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Run test that delete works according to description in <a
|
|
|
+ * href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>.
|
|
|
+ * @param incommon
|
|
|
+ * @param flusher
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void doTestDelete(final Incommon incommon, FlushCache flusher)
|
|
|
+ throws IOException {
|
|
|
+ // Add values at various timestamps (Values are timestampes as bytes).
|
|
|
+ put(incommon, T0);
|
|
|
+ put(incommon, T1);
|
|
|
+ put(incommon, T2);
|
|
|
+ put(incommon);
|
|
|
+ // Verify that returned versions match passed timestamps.
|
|
|
+ assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T2, T1});
|
|
|
+ // If I delete w/o specifying a timestamp, this means I'm deleting the
|
|
|
+ // latest.
|
|
|
+ delete(incommon);
|
|
|
+ // Verify that I get back T2 through T1 -- that the latest version has
|
|
|
+ // been deleted.
|
|
|
+ assertVersions(incommon, new long [] {T2, T1, T0});
|
|
|
+
|
|
|
+ // Flush everything out to disk and then retry
|
|
|
+ flusher.flushcache();
|
|
|
+ assertVersions(incommon, new long [] {T2, T1, T0});
|
|
|
+
|
|
|
+ // Now add, back a latest so I can test remove other than the latest.
|
|
|
+ put(incommon);
|
|
|
+ assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T2, T1});
|
|
|
+ delete(incommon, T2);
|
|
|
+ assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T1, T0});
|
|
|
+ // Flush everything out to disk and then retry
|
|
|
+ flusher.flushcache();
|
|
|
+ assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T1, T0});
|
|
|
+
|
|
|
+ // Now try deleting all from T2 back inclusive (We first need to add T2
|
|
|
+ // back into the mix and to make things a little interesting, delete and
|
|
|
+ // then readd T1.
|
|
|
+ put(incommon, T2);
|
|
|
+ delete(incommon, T1);
|
|
|
+ put(incommon, T1);
|
|
|
+ incommon.deleteAll(ROW, COLUMN, T2);
|
|
|
+ // Should only be current value in set. Assert this is so
|
|
|
+ assertOnlyLatest(incommon, HConstants.LATEST_TIMESTAMP);
|
|
|
+
|
|
|
+ // Flush everything out to disk and then redo above tests
|
|
|
+ flusher.flushcache();
|
|
|
+ assertOnlyLatest(incommon, HConstants.LATEST_TIMESTAMP);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertOnlyLatest(final Incommon incommon,
|
|
|
+ final long currentTime)
|
|
|
+ throws IOException {
|
|
|
+ byte [][] bytesBytes = incommon.get(ROW, COLUMN, 3/*Ask for too much*/);
|
|
|
+ assertEquals(1, bytesBytes.length);
|
|
|
+ long time = Writables.bytesToLong(bytesBytes[0]);
|
|
|
+ assertEquals(time, currentTime);
|
|
|
+ assertNull(incommon.get(ROW, COLUMN, T1, 3 /*Too many*/));
|
|
|
+ assertTrue(assertScanContentTimestamp(incommon, T1) == 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Assert that returned versions match passed in timestamps and that results
|
|
|
+ * are returned in the right order. Assert that values when converted to
|
|
|
+ * longs match the corresponding passed timestamp.
|
|
|
+ * @param r
|
|
|
+ * @param tss
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void assertVersions(final Incommon incommon, final long [] tss)
|
|
|
+ throws IOException {
|
|
|
+ // Assert that 'latest' is what we expect.
|
|
|
+ byte [] bytes = incommon.get(ROW, COLUMN);
|
|
|
+ assertEquals(Writables.bytesToLong(bytes), tss[0]);
|
|
|
+ // Now assert that if we ask for multiple versions, that they come out in
|
|
|
+ // order.
|
|
|
+ byte [][] bytesBytes = incommon.get(ROW, COLUMN, tss.length);
|
|
|
+ assertEquals(bytesBytes.length, tss.length);
|
|
|
+ for (int i = 0; i < bytesBytes.length; i++) {
|
|
|
+ long ts = Writables.bytesToLong(bytesBytes[i]);
|
|
|
+ assertEquals(ts, tss[i]);
|
|
|
+ }
|
|
|
+ // Specify a timestamp get multiple versions.
|
|
|
+ bytesBytes = incommon.get(ROW, COLUMN, tss[0], bytesBytes.length - 1);
|
|
|
+ for (int i = 1; i < bytesBytes.length; i++) {
|
|
|
+ long ts = Writables.bytesToLong(bytesBytes[i]);
|
|
|
+ assertEquals(ts, tss[i]);
|
|
|
+ }
|
|
|
+ // Test scanner returns expected version
|
|
|
+ assertScanContentTimestamp(incommon, tss[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Run test scanning different timestamps.
|
|
|
+ * @param incommon
|
|
|
+ * @param flusher
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void doTestTimestampScanning(final Incommon incommon,
|
|
|
+ final FlushCache flusher)
|
|
|
+ throws IOException {
|
|
|
+ // Add a couple of values for three different timestamps.
|
|
|
+ put(incommon, T0);
|
|
|
+ put(incommon, T1);
|
|
|
+ put(incommon, HConstants.LATEST_TIMESTAMP);
|
|
|
+ // Get count of latest items.
|
|
|
+ int count = assertScanContentTimestamp(incommon,
|
|
|
+ HConstants.LATEST_TIMESTAMP);
|
|
|
+ // Assert I get same count when I scan at each timestamp.
|
|
|
+ assertEquals(count, assertScanContentTimestamp(incommon, T0));
|
|
|
+ assertEquals(count, assertScanContentTimestamp(incommon, T1));
|
|
|
+ // Flush everything out to disk and then retry
|
|
|
+ flusher.flushcache();
|
|
|
+ assertEquals(count, assertScanContentTimestamp(incommon, T0));
|
|
|
+ assertEquals(count, assertScanContentTimestamp(incommon, T1));
|
|
|
+ }
|
|
|
|
|
|
/*
|
|
|
* Assert that the scan returns only values < timestamp.
|
|
@@ -108,19 +234,21 @@ public class TestTimestamp extends HBaseTestCase {
|
|
|
* @return Count of items scanned.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private int assertScanContentTimestamp(final HRegion r, final long ts)
|
|
|
+ private int assertScanContentTimestamp(final Incommon in, final long ts)
|
|
|
throws IOException {
|
|
|
+ HScannerInterface scanner =
|
|
|
+ in.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts);
|
|
|
int count = 0;
|
|
|
- HInternalScannerInterface scanner =
|
|
|
- r.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts, null);
|
|
|
try {
|
|
|
HStoreKey key = new HStoreKey();
|
|
|
TreeMap<Text, byte []>value = new TreeMap<Text, byte[]>();
|
|
|
while (scanner.next(key, value)) {
|
|
|
assertTrue(key.getTimestamp() <= ts);
|
|
|
- Text row = key.getRow();
|
|
|
- assertEquals(row.toString(),
|
|
|
- new String(value.get(COLUMN), HConstants.UTF8_ENCODING));
|
|
|
+ // Content matches the key or HConstants.LATEST_TIMESTAMP.
|
|
|
+ // (Key does not match content if we 'put' with LATEST_TIMESTAMP).
|
|
|
+ long l = Writables.bytesToLong(value.get(COLUMN));
|
|
|
+ assertTrue(key.getTimestamp() == l ||
|
|
|
+ HConstants.LATEST_TIMESTAMP == l);
|
|
|
count++;
|
|
|
value.clear();
|
|
|
}
|
|
@@ -129,118 +257,48 @@ public class TestTimestamp extends HBaseTestCase {
|
|
|
}
|
|
|
return count;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Basic test of timestamps.
|
|
|
- * TODO: Needs rewrite after hadoop-1784 gets fixed.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public void testTimestamps() throws IOException {
|
|
|
- MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
|
|
|
- try {
|
|
|
- HTable table = createTable();
|
|
|
-
|
|
|
- // store a value specifying an update time
|
|
|
- put(table, VERSION1.getBytes(HConstants.UTF8_ENCODING), T0);
|
|
|
-
|
|
|
- // store a value specifying 'now' as the update time
|
|
|
- put(table, LATEST.getBytes(HConstants.UTF8_ENCODING), -1);
|
|
|
-
|
|
|
- // delete values older than T1
|
|
|
- long lockid = table.startUpdate(ROW);
|
|
|
- table.delete(lockid, COLUMN);
|
|
|
- table.commit(lockid, T1);
|
|
|
-
|
|
|
- // now retrieve...
|
|
|
- assertGets(table);
|
|
|
-
|
|
|
- // flush everything out to disk
|
|
|
- HRegionServer s = cluster.regionThreads.get(0).getRegionServer();
|
|
|
- for(HRegion r: s.onlineRegions.values() ) {
|
|
|
- r.flushcache(false);
|
|
|
- }
|
|
|
-
|
|
|
- // now retrieve...
|
|
|
- assertGets(table);
|
|
|
-
|
|
|
- // Test scanners
|
|
|
- assertScanCount(table, -1, 1);
|
|
|
- assertScanCount(table, T1, 0);
|
|
|
- } catch (Exception e) {
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- /*
|
|
|
- * Test count of results scanning.
|
|
|
- * @param table
|
|
|
- * @param ts
|
|
|
- * @param expectedCount
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void assertScanCount(final HTable table, final long ts,
|
|
|
- final int expectedCount)
|
|
|
+ private void put(final Incommon loader, final long ts)
|
|
|
throws IOException {
|
|
|
- HScannerInterface scanner = (ts == -1)?
|
|
|
- table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW):
|
|
|
- table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts);
|
|
|
- try {
|
|
|
- HStoreKey key = new HStoreKey();
|
|
|
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
- int count = 0;
|
|
|
- while(scanner.next(key, results)) {
|
|
|
- count++;
|
|
|
- }
|
|
|
- assertEquals(count, expectedCount);
|
|
|
- assertEquals(results.size(), expectedCount);
|
|
|
-
|
|
|
- } finally {
|
|
|
- scanner.close();
|
|
|
- }
|
|
|
+ put(loader, Writables.longToBytes(ts), ts);
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Test can do basic gets.
|
|
|
- * Used by testTimestamp above.
|
|
|
- * @param table
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void assertGets(final HTable table) throws IOException {
|
|
|
- // the most recent version:
|
|
|
- byte[] bytes = table.get(ROW, COLUMN);
|
|
|
- assertTrue(bytes != null && bytes.length != 0);
|
|
|
- assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
|
|
|
-
|
|
|
- // any version <= time T1
|
|
|
- byte[][] values = table.get(ROW, COLUMN, T1, 3);
|
|
|
- assertNull(values);
|
|
|
-
|
|
|
- // the version from T0
|
|
|
- values = table.get(ROW, COLUMN, T0, 3);
|
|
|
- assertTrue(values.length == 1
|
|
|
- && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
|
|
|
-
|
|
|
- // three versions older than now
|
|
|
- values = table.get(ROW, COLUMN, 3);
|
|
|
- assertTrue(values.length == 1
|
|
|
- && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
|
|
|
+ private void put(final Incommon loader)
|
|
|
+ throws IOException {
|
|
|
+ long ts = HConstants.LATEST_TIMESTAMP;
|
|
|
+ put(loader, Writables.longToBytes(ts), ts);
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * Put values.
|
|
|
- * @param table
|
|
|
+ * Put values.
|
|
|
+ * @param loader
|
|
|
* @param bytes
|
|
|
* @param ts
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void put(final HTable table, final byte [] bytes, final long ts)
|
|
|
+ private void put(final Incommon loader, final byte [] bytes,
|
|
|
+ final long ts)
|
|
|
throws IOException {
|
|
|
- long lockid = table.startUpdate(ROW);
|
|
|
- table.put(lockid, COLUMN, bytes);
|
|
|
- if (ts == -1) {
|
|
|
- table.commit(lockid);
|
|
|
+ long lockid = loader.startBatchUpdate(ROW);
|
|
|
+ loader.put(lockid, COLUMN, bytes);
|
|
|
+ if (ts == HConstants.LATEST_TIMESTAMP) {
|
|
|
+ loader.commit(lockid);
|
|
|
+ } else {
|
|
|
+ loader.commit(lockid, ts);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void delete(final Incommon loader) throws IOException {
|
|
|
+ delete(loader, HConstants.LATEST_TIMESTAMP);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void delete(final Incommon loader, final long ts) throws IOException {
|
|
|
+ long lockid = loader.startBatchUpdate(ROW);
|
|
|
+ loader.delete(lockid, COLUMN);
|
|
|
+ if (ts == HConstants.LATEST_TIMESTAMP) {
|
|
|
+ loader.commit(lockid);
|
|
|
} else {
|
|
|
- table.commit(lockid, ts);
|
|
|
+ loader.commit(lockid, ts);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -250,17 +308,18 @@ public class TestTimestamp extends HBaseTestCase {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private HTable createTable() throws IOException {
|
|
|
- HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
|
|
|
+ HTableDescriptor desc = new HTableDescriptor(getName());
|
|
|
desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
|
|
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
|
|
admin.createTable(desc);
|
|
|
- return new HTable(conf, TABLE);
|
|
|
+ return new HTable(conf, new Text(getName()));
|
|
|
}
|
|
|
|
|
|
private HRegion createRegion() throws IOException {
|
|
|
HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
|
|
|
HTableDescriptor htd = createTableDescriptor(getName());
|
|
|
- htd.addFamily(new HColumnDescriptor(COLUMN_NAME));
|
|
|
+ htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
|
|
|
+ CompressionType.NONE, false, Integer.MAX_VALUE, null));
|
|
|
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
|
|
|
return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
|
|
|
}
|