|
@@ -18,7 +18,10 @@ package org.apache.hadoop.hbase;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
@@ -31,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
-import org.apache.hadoop.io.BytesWritable;
|
|
|
|
|
|
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|
import org.apache.hadoop.io.MapFile;
|
|
import org.apache.hadoop.io.MapFile;
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -40,14 +43,14 @@ import org.apache.hadoop.io.WritableComparable;
|
|
|
|
|
|
import org.onelab.filter.*;
|
|
import org.onelab.filter.*;
|
|
|
|
|
|
-/*******************************************************************************
|
|
|
|
|
|
+/**
|
|
* HStore maintains a bunch of data files. It is responsible for maintaining
|
|
* HStore maintains a bunch of data files. It is responsible for maintaining
|
|
* the memory/file hierarchy and for periodic flushes to disk and compacting
|
|
* the memory/file hierarchy and for periodic flushes to disk and compacting
|
|
* edits to the file.
|
|
* edits to the file.
|
|
*
|
|
*
|
|
* Locking and transactions are handled at a higher level. This API should not
|
|
* Locking and transactions are handled at a higher level. This API should not
|
|
* be called directly by any writer, but rather by an HRegion manager.
|
|
* be called directly by any writer, but rather by an HRegion manager.
|
|
- ******************************************************************************/
|
|
|
|
|
|
+ */
|
|
class HStore implements HConstants {
|
|
class HStore implements HConstants {
|
|
private static final Log LOG = LogFactory.getLog(HStore.class);
|
|
private static final Log LOG = LogFactory.getLog(HStore.class);
|
|
|
|
|
|
@@ -71,8 +74,8 @@ class HStore implements HConstants {
|
|
Path filterDir;
|
|
Path filterDir;
|
|
Filter bloomFilter;
|
|
Filter bloomFilter;
|
|
|
|
|
|
- Integer compactLock = 0;
|
|
|
|
- Integer flushLock = 0;
|
|
|
|
|
|
+ Integer compactLock = new Integer(0);
|
|
|
|
+ Integer flushLock = new Integer(0);
|
|
|
|
|
|
final HLocking lock = new HLocking();
|
|
final HLocking lock = new HLocking();
|
|
|
|
|
|
@@ -81,10 +84,6 @@ class HStore implements HConstants {
|
|
|
|
|
|
Random rand = new Random();
|
|
Random rand = new Random();
|
|
|
|
|
|
- //////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // Constructors, destructors, etc
|
|
|
|
- //////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* An HStore is a set of zero or more MapFiles, which stretch backwards over
|
|
* An HStore is a set of zero or more MapFiles, which stretch backwards over
|
|
* time. A given HStore is responsible for a certain set of columns for a
|
|
* time. A given HStore is responsible for a certain set of columns for a
|
|
@@ -109,12 +108,12 @@ class HStore implements HConstants {
|
|
* <p>It's assumed that after this constructor returns, the reconstructionLog
|
|
* <p>It's assumed that after this constructor returns, the reconstructionLog
|
|
* file will be deleted (by whoever has instantiated the HStore).
|
|
* file will be deleted (by whoever has instantiated the HStore).
|
|
*
|
|
*
|
|
- * @param dir - log file directory
|
|
|
|
- * @param regionName - name of region
|
|
|
|
- * @param family - name of column family
|
|
|
|
- * @param fs - file system object
|
|
|
|
- * @param reconstructionLog - existing log file to apply if any
|
|
|
|
- * @param conf - configuration object
|
|
|
|
|
|
+ * @param dir log file directory
|
|
|
|
+ * @param regionName name of region
|
|
|
|
+ * @param family name of column family
|
|
|
|
+ * @param fs file system object
|
|
|
|
+ * @param reconstructionLog existing log file to apply if any
|
|
|
|
+ * @param conf configuration object
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
HStore(Path dir, Text regionName, HColumnDescriptor family,
|
|
HStore(Path dir, Text regionName, HColumnDescriptor family,
|
|
@@ -178,9 +177,8 @@ class HStore implements HConstants {
|
|
// file, the entry in 'mapdir' must be deleted.
|
|
// file, the entry in 'mapdir' must be deleted.
|
|
Vector<HStoreFile> hstoreFiles
|
|
Vector<HStoreFile> hstoreFiles
|
|
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
|
|
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
|
|
- for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
|
|
|
|
- HStoreFile hsf = it.next();
|
|
|
|
- mapFiles.put(hsf.loadInfo(fs), hsf);
|
|
|
|
|
|
+ for(HStoreFile hsf: hstoreFiles) {
|
|
|
|
+ mapFiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
|
|
}
|
|
}
|
|
|
|
|
|
// Now go through all the HSTORE_LOGINFOFILEs and figure out the
|
|
// Now go through all the HSTORE_LOGINFOFILEs and figure out the
|
|
@@ -192,8 +190,7 @@ class HStore implements HConstants {
|
|
// means it was built prior to the previous run of HStore, and so it cannot
|
|
// means it was built prior to the previous run of HStore, and so it cannot
|
|
// contain any updates also contained in the log.
|
|
// contain any updates also contained in the log.
|
|
long maxSeqID = -1;
|
|
long maxSeqID = -1;
|
|
- for (Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
|
|
|
|
- HStoreFile hsf = it.next();
|
|
|
|
|
|
+ for (HStoreFile hsf: hstoreFiles) {
|
|
long seqid = hsf.loadInfo(fs);
|
|
long seqid = hsf.loadInfo(fs);
|
|
if(seqid > 0) {
|
|
if(seqid > 0) {
|
|
if(seqid > maxSeqID) {
|
|
if(seqid > maxSeqID) {
|
|
@@ -202,68 +199,8 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // If a bloom filter is enabled, try to read it in.
|
|
|
|
- // If it doesn't exist, create it.
|
|
|
|
|
|
+ doReconstructionLog(reconstructionLog, maxSeqID);
|
|
|
|
|
|
- // Read the reconstructionLog to see whether we need to build a brand-new
|
|
|
|
- // MapFile out of non-flushed log entries.
|
|
|
|
- //
|
|
|
|
- // We can ignore any log message that has a sequence ID that's equal to or
|
|
|
|
- // lower than maxSeqID. (Because we know such log messages are already
|
|
|
|
- // reflected in the MapFiles.)
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("reading reconstructionLog");
|
|
|
|
- }
|
|
|
|
- if(reconstructionLog != null && fs.exists(reconstructionLog)) {
|
|
|
|
- long maxSeqIdInLog = -1;
|
|
|
|
- TreeMap<HStoreKey, BytesWritable> reconstructedCache
|
|
|
|
- = new TreeMap<HStoreKey, BytesWritable>();
|
|
|
|
- SequenceFile.Reader login
|
|
|
|
- = new SequenceFile.Reader(fs, reconstructionLog, conf);
|
|
|
|
- try {
|
|
|
|
- HLogKey key = new HLogKey();
|
|
|
|
- HLogEdit val = new HLogEdit();
|
|
|
|
- while(login.next(key, val)) {
|
|
|
|
- maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
|
|
|
|
- if (key.getLogSeqNum() <= maxSeqID) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- // Check this edit is for me. Also, guard against writing
|
|
|
|
- // METACOLUMN info such as HBASE::CACHEFLUSH entries
|
|
|
|
- Text column = val.getColumn();
|
|
|
|
- if (column.equals(HLog.METACOLUMN)
|
|
|
|
- || !key.getRegionName().equals(this.regionName)
|
|
|
|
- || !HStoreKey.extractFamily(column).equals(this.familyName)) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Passing on edit " + key.getRegionName() + ", "
|
|
|
|
- + column.toString() + ": " + new String(val.getVal().get())
|
|
|
|
- + ", my region: " + this.regionName + ", my column: "
|
|
|
|
- + this.familyName);
|
|
|
|
- }
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- byte[] bytes = new byte[val.getVal().getSize()];
|
|
|
|
- System.arraycopy(val.getVal().get(), 0, bytes, 0, bytes.length);
|
|
|
|
- HStoreKey k = new HStoreKey(key.getRow(), column,val.getTimestamp());
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Applying edit " + k.toString() + "="
|
|
|
|
- + new String(bytes, UTF8_ENCODING));
|
|
|
|
- }
|
|
|
|
- reconstructedCache.put(k, new BytesWritable(bytes));
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- login.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(reconstructedCache.size() > 0) {
|
|
|
|
- // We create a "virtual flush" at maxSeqIdInLog+1.
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("flushing reconstructionCache");
|
|
|
|
- }
|
|
|
|
- flushCacheHelper(reconstructedCache, maxSeqIdInLog+1, true);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
// Compact all the MapFiles into a single file. The resulting MapFile
|
|
// Compact all the MapFiles into a single file. The resulting MapFile
|
|
// should be "timeless"; that is, it should not have an associated seq-ID,
|
|
// should be "timeless"; that is, it should not have an associated seq-ID,
|
|
// because all log messages have been reflected in the TreeMaps at this
|
|
// because all log messages have been reflected in the TreeMaps at this
|
|
@@ -286,6 +223,70 @@ class HStore implements HConstants {
|
|
LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
|
|
LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /*
|
|
|
|
+ * Read the reconstructionLog to see whether we need to build a brand-new
|
|
|
|
+ * MapFile out of non-flushed log entries.
|
|
|
|
+ *
|
|
|
|
+ * We can ignore any log message that has a sequence ID that's equal to or
|
|
|
|
+ * lower than maxSeqID. (Because we know such log messages are already
|
|
|
|
+ * reflected in the MapFiles.)
|
|
|
|
+ */
|
|
|
|
+ private void doReconstructionLog(final Path reconstructionLog,
|
|
|
|
+ final long maxSeqID)
|
|
|
|
+ throws UnsupportedEncodingException, IOException {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("reading reconstructionLog");
|
|
|
|
+ }
|
|
|
|
+ if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ long maxSeqIdInLog = -1;
|
|
|
|
+ TreeMap<HStoreKey, byte []> reconstructedCache =
|
|
|
|
+ new TreeMap<HStoreKey, byte []>();
|
|
|
|
+ SequenceFile.Reader login =
|
|
|
|
+ new SequenceFile.Reader(this.fs, reconstructionLog, this.conf);
|
|
|
|
+ try {
|
|
|
|
+ HLogKey key = new HLogKey();
|
|
|
|
+ HLogEdit val = new HLogEdit();
|
|
|
|
+ while (login.next(key, val)) {
|
|
|
|
+ maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
|
|
|
|
+ if (key.getLogSeqNum() <= maxSeqID) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ // Check this edit is for me. Also, guard against writing
|
|
|
|
+ // METACOLUMN info such as HBASE::CACHEFLUSH entries
|
|
|
|
+ Text column = val.getColumn();
|
|
|
|
+ if (column.equals(HLog.METACOLUMN)
|
|
|
|
+ || !key.getRegionName().equals(this.regionName)
|
|
|
|
+ || !HStoreKey.extractFamily(column).equals(this.familyName)) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Passing on edit " + key.getRegionName() + ", "
|
|
|
|
+ + column.toString() + ": " + new String(val.getVal())
|
|
|
|
+ + ", my region: " + this.regionName + ", my column: "
|
|
|
|
+ + this.familyName);
|
|
|
|
+ }
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Applying edit " + k.toString() + "=" +
|
|
|
|
+ new String(val.getVal(), UTF8_ENCODING));
|
|
|
|
+ }
|
|
|
|
+ reconstructedCache.put(k, val.getVal());
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ login.close();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (reconstructedCache.size() > 0) {
|
|
|
|
+ // We create a "virtual flush" at maxSeqIdInLog+1.
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("flushing reconstructionCache");
|
|
|
|
+ }
|
|
|
|
+ flushCacheHelper(reconstructedCache, maxSeqIdInLog + 1, true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
// Bloom filters
|
|
// Bloom filters
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
@@ -423,15 +424,20 @@ class HStore implements HConstants {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Get a MapFile writer
|
|
* Get a MapFile writer
|
|
- * This allows us to substitute a BloomFilterWriter if a bloom filter is enabled
|
|
|
|
|
|
+ * This allows us to substitute a BloomFilterWriter if a bloom filter is
|
|
|
|
+ * enabled
|
|
|
|
+ *
|
|
|
|
+ * @param dirName Directory with store files.
|
|
|
|
+ * @return Map file.
|
|
|
|
+ * @throws IOException
|
|
*/
|
|
*/
|
|
MapFile.Writer getMapFileWriter(String dirName) throws IOException {
|
|
MapFile.Writer getMapFileWriter(String dirName) throws IOException {
|
|
- if(bloomFilter != null) {
|
|
|
|
|
|
+ if (bloomFilter != null) {
|
|
return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
|
|
return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
|
|
- BytesWritable.class, compression);
|
|
|
|
|
|
+ ImmutableBytesWritable.class, compression);
|
|
}
|
|
}
|
|
return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
|
|
return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
|
|
- BytesWritable.class, compression);
|
|
|
|
|
|
+ ImmutableBytesWritable.class, compression);
|
|
}
|
|
}
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
@@ -440,6 +446,7 @@ class HStore implements HConstants {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Turn off all the MapFile readers
|
|
* Turn off all the MapFile readers
|
|
|
|
+ *
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
void close() throws IOException {
|
|
void close() throws IOException {
|
|
@@ -478,14 +485,15 @@ class HStore implements HConstants {
|
|
* @return - Vector of all the HStoreFiles in use
|
|
* @return - Vector of all the HStoreFiles in use
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
|
|
|
|
- long logCacheFlushId) throws IOException {
|
|
|
|
-
|
|
|
|
|
|
+ Vector<HStoreFile> flushCache(TreeMap<HStoreKey, byte []> inputCache,
|
|
|
|
+ long logCacheFlushId)
|
|
|
|
+ throws IOException {
|
|
return flushCacheHelper(inputCache, logCacheFlushId, true);
|
|
return flushCacheHelper(inputCache, logCacheFlushId, true);
|
|
}
|
|
}
|
|
|
|
|
|
- Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, BytesWritable> inputCache,
|
|
|
|
- long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
|
|
|
|
|
|
+ Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
|
|
|
|
+ long logCacheFlushId, boolean addToAvailableMaps)
|
|
|
|
+ throws IOException {
|
|
|
|
|
|
synchronized(flushLock) {
|
|
synchronized(flushLock) {
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
@@ -503,12 +511,11 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
MapFile.Writer out = getMapFileWriter(mapfile.toString());
|
|
MapFile.Writer out = getMapFileWriter(mapfile.toString());
|
|
-
|
|
|
|
try {
|
|
try {
|
|
- for (Map.Entry<HStoreKey, BytesWritable> es: inputCache.entrySet()) {
|
|
|
|
|
|
+ for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
|
|
HStoreKey curkey = es.getKey();
|
|
HStoreKey curkey = es.getKey();
|
|
if (this.familyName.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
|
|
if (this.familyName.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
|
|
- out.append(curkey, es.getValue());
|
|
|
|
|
|
+ out.append(curkey, new ImmutableBytesWritable(es.getValue()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
@@ -539,13 +546,13 @@ class HStore implements HConstants {
|
|
this.lock.obtainWriteLock();
|
|
this.lock.obtainWriteLock();
|
|
|
|
|
|
try {
|
|
try {
|
|
- maps.put(logCacheFlushId, getMapFileReader(mapfile.toString()));
|
|
|
|
- mapFiles.put(logCacheFlushId, flushedFile);
|
|
|
|
|
|
+ Long flushid = Long.valueOf(logCacheFlushId);
|
|
|
|
+ maps.put(flushid, getMapFileReader(mapfile.toString()));
|
|
|
|
+ mapFiles.put(flushid, flushedFile);
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
LOG.debug("HStore available for " + this.regionName + "/"
|
|
LOG.debug("HStore available for " + this.regionName + "/"
|
|
+ this.familyName + " flush id=" + logCacheFlushId);
|
|
+ this.familyName + " flush id=" + logCacheFlushId);
|
|
}
|
|
}
|
|
-
|
|
|
|
} finally {
|
|
} finally {
|
|
this.lock.releaseWriteLock();
|
|
this.lock.releaseWriteLock();
|
|
}
|
|
}
|
|
@@ -627,7 +634,7 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
- LOG.debug("max sequence id =" + maxSeenSeqID);
|
|
|
|
|
|
+ LOG.debug("max sequence id: " + maxSeenSeqID);
|
|
}
|
|
}
|
|
|
|
|
|
HStoreFile compactedOutputFile
|
|
HStoreFile compactedOutputFile
|
|
@@ -645,10 +652,8 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
// Step through them, writing to the brand-new TreeMap
|
|
// Step through them, writing to the brand-new TreeMap
|
|
-
|
|
|
|
MapFile.Writer compactedOut =
|
|
MapFile.Writer compactedOut =
|
|
getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
|
|
getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
|
|
-
|
|
|
|
try {
|
|
try {
|
|
|
|
|
|
// We create a new set of MapFile.Reader objects so we don't screw up
|
|
// We create a new set of MapFile.Reader objects so we don't screw up
|
|
@@ -665,14 +670,15 @@ class HStore implements HConstants {
|
|
|
|
|
|
MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
|
|
MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
|
|
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
|
|
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
|
|
- BytesWritable[] vals = new BytesWritable[toCompactFiles.size()];
|
|
|
|
|
|
+ ImmutableBytesWritable[] vals =
|
|
|
|
+ new ImmutableBytesWritable[toCompactFiles.size()];
|
|
boolean[] done = new boolean[toCompactFiles.size()];
|
|
boolean[] done = new boolean[toCompactFiles.size()];
|
|
int pos = 0;
|
|
int pos = 0;
|
|
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
|
|
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
|
|
HStoreFile hsf = it.next();
|
|
HStoreFile hsf = it.next();
|
|
readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
|
|
readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
|
|
keys[pos] = new HStoreKey();
|
|
keys[pos] = new HStoreKey();
|
|
- vals[pos] = new BytesWritable();
|
|
|
|
|
|
+ vals[pos] = new ImmutableBytesWritable();
|
|
done[pos] = false;
|
|
done[pos] = false;
|
|
pos++;
|
|
pos++;
|
|
}
|
|
}
|
|
@@ -942,7 +948,7 @@ class HStore implements HConstants {
|
|
|
|
|
|
// Fail here? No worries.
|
|
// Fail here? No worries.
|
|
|
|
|
|
- long orderVal = finalCompactedFile.loadInfo(fs);
|
|
|
|
|
|
+ Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
|
|
|
|
|
// 6. Loading the new TreeMap.
|
|
// 6. Loading the new TreeMap.
|
|
|
|
|
|
@@ -973,27 +979,24 @@ class HStore implements HConstants {
|
|
*
|
|
*
|
|
* The returned object should map column names to byte arrays (byte[]).
|
|
* The returned object should map column names to byte arrays (byte[]).
|
|
*/
|
|
*/
|
|
- void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
|
|
|
|
|
|
+ void getFull(HStoreKey key, TreeMap<Text, byte []> results)
|
|
|
|
+ throws IOException {
|
|
this.lock.obtainReadLock();
|
|
this.lock.obtainReadLock();
|
|
try {
|
|
try {
|
|
MapFile.Reader[] maparray
|
|
MapFile.Reader[] maparray
|
|
= maps.values().toArray(new MapFile.Reader[maps.size()]);
|
|
= maps.values().toArray(new MapFile.Reader[maps.size()]);
|
|
-
|
|
|
|
- for(int i = maparray.length-1; i >= 0; i--) {
|
|
|
|
|
|
+ for (int i = maparray.length - 1; i >= 0; i--) {
|
|
MapFile.Reader map = maparray[i];
|
|
MapFile.Reader map = maparray[i];
|
|
-
|
|
|
|
synchronized(map) {
|
|
synchronized(map) {
|
|
- BytesWritable readval = new BytesWritable();
|
|
|
|
map.reset();
|
|
map.reset();
|
|
|
|
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
|
|
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
|
|
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
|
|
-
|
|
|
|
do {
|
|
do {
|
|
Text readcol = readkey.getColumn();
|
|
Text readcol = readkey.getColumn();
|
|
- if(results.get(readcol) == null
|
|
|
|
|
|
+ if (results.get(readcol) == null
|
|
&& key.matchesWithoutColumn(readkey)) {
|
|
&& key.matchesWithoutColumn(readkey)) {
|
|
- results.put(new Text(readcol), readval);
|
|
|
|
- readval = new BytesWritable();
|
|
|
|
-
|
|
|
|
|
|
+ results.put(new Text(readcol), readval.get());
|
|
|
|
+ readval = new ImmutableBytesWritable();
|
|
} else if(key.getRow().compareTo(readkey.getRow()) > 0) {
|
|
} else if(key.getRow().compareTo(readkey.getRow()) > 0) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
@@ -1013,12 +1016,12 @@ class HStore implements HConstants {
|
|
*
|
|
*
|
|
* If 'numVersions' is negative, the method returns all available versions.
|
|
* If 'numVersions' is negative, the method returns all available versions.
|
|
*/
|
|
*/
|
|
- BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
|
|
|
|
|
|
+ byte [][] get(HStoreKey key, int numVersions) throws IOException {
|
|
if (numVersions <= 0) {
|
|
if (numVersions <= 0) {
|
|
throw new IllegalArgumentException("Number of versions must be > 0");
|
|
throw new IllegalArgumentException("Number of versions must be > 0");
|
|
}
|
|
}
|
|
|
|
|
|
- Vector<BytesWritable> results = new Vector<BytesWritable>();
|
|
|
|
|
|
+ List<byte []> results = new ArrayList<byte []>();
|
|
this.lock.obtainReadLock();
|
|
this.lock.obtainReadLock();
|
|
try {
|
|
try {
|
|
MapFile.Reader[] maparray
|
|
MapFile.Reader[] maparray
|
|
@@ -1028,7 +1031,7 @@ class HStore implements HConstants {
|
|
MapFile.Reader map = maparray[i];
|
|
MapFile.Reader map = maparray[i];
|
|
|
|
|
|
synchronized(map) {
|
|
synchronized(map) {
|
|
- BytesWritable readval = new BytesWritable();
|
|
|
|
|
|
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
|
|
map.reset();
|
|
map.reset();
|
|
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
|
|
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
|
|
if (readkey == null) {
|
|
if (readkey == null) {
|
|
@@ -1039,14 +1042,14 @@ class HStore implements HConstants {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
if (readkey.matchesRowCol(key)) {
|
|
if (readkey.matchesRowCol(key)) {
|
|
- results.add(readval);
|
|
|
|
- readval = new BytesWritable();
|
|
|
|
|
|
+ results.add(readval.get());
|
|
|
|
+ readval = new ImmutableBytesWritable();
|
|
while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
|
|
while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
|
|
if (numVersions > 0 && (results.size() >= numVersions)) {
|
|
if (numVersions > 0 && (results.size() >= numVersions)) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
- results.add(readval);
|
|
|
|
- readval = new BytesWritable();
|
|
|
|
|
|
+ results.add(readval.get());
|
|
|
|
+ readval = new ImmutableBytesWritable();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1056,8 +1059,7 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
return results.size() == 0 ?
|
|
return results.size() == 0 ?
|
|
- null :results.toArray(new BytesWritable[results.size()]);
|
|
|
|
-
|
|
|
|
|
|
+ null : ImmutableBytesWritable.toArray(results);
|
|
} finally {
|
|
} finally {
|
|
this.lock.releaseReadLock();
|
|
this.lock.releaseReadLock();
|
|
}
|
|
}
|
|
@@ -1077,17 +1079,12 @@ class HStore implements HConstants {
|
|
|
|
|
|
this.lock.obtainReadLock();
|
|
this.lock.obtainReadLock();
|
|
try {
|
|
try {
|
|
- long mapIndex = 0L;
|
|
|
|
-
|
|
|
|
|
|
+ Long mapIndex = Long.valueOf(0L);
|
|
// Iterate through all the MapFiles
|
|
// Iterate through all the MapFiles
|
|
-
|
|
|
|
- for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator();
|
|
|
|
- it.hasNext(); ) {
|
|
|
|
-
|
|
|
|
- Map.Entry<Long, HStoreFile> e = it.next();
|
|
|
|
|
|
+ for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
|
|
HStoreFile curHSF = e.getValue();
|
|
HStoreFile curHSF = e.getValue();
|
|
- long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
|
|
|
|
-
|
|
|
|
|
|
+ long size = fs.getLength(
|
|
|
|
+ new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
|
|
if(size > maxSize) { // This is the largest one so far
|
|
if(size > maxSize) { // This is the largest one so far
|
|
maxSize = size;
|
|
maxSize = size;
|
|
mapIndex = e.getKey();
|
|
mapIndex = e.getKey();
|
|
@@ -1095,12 +1092,9 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
MapFile.Reader r = maps.get(mapIndex);
|
|
MapFile.Reader r = maps.get(mapIndex);
|
|
-
|
|
|
|
midKey.set(((HStoreKey)r.midKey()).getRow());
|
|
midKey.set(((HStoreKey)r.midKey()).getRow());
|
|
-
|
|
|
|
} catch(IOException e) {
|
|
} catch(IOException e) {
|
|
LOG.warn(e);
|
|
LOG.warn(e);
|
|
-
|
|
|
|
} finally {
|
|
} finally {
|
|
this.lock.releaseReadLock();
|
|
this.lock.releaseReadLock();
|
|
}
|
|
}
|
|
@@ -1171,14 +1165,12 @@ class HStore implements HConstants {
|
|
}
|
|
}
|
|
|
|
|
|
this.keys = new HStoreKey[readers.length];
|
|
this.keys = new HStoreKey[readers.length];
|
|
- this.vals = new BytesWritable[readers.length];
|
|
|
|
|
|
+ this.vals = new byte[readers.length][];
|
|
|
|
|
|
// Advance the readers to the first pos.
|
|
// Advance the readers to the first pos.
|
|
-
|
|
|
|
for(i = 0; i < readers.length; i++) {
|
|
for(i = 0; i < readers.length; i++) {
|
|
keys[i] = new HStoreKey();
|
|
keys[i] = new HStoreKey();
|
|
- vals[i] = new BytesWritable();
|
|
|
|
-
|
|
|
|
|
|
+
|
|
if(firstRow.getLength() != 0) {
|
|
if(firstRow.getLength() != 0) {
|
|
if(findFirstRow(i, firstRow)) {
|
|
if(findFirstRow(i, firstRow)) {
|
|
continue;
|
|
continue;
|
|
@@ -1208,16 +1200,15 @@ class HStore implements HConstants {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
boolean findFirstRow(int i, Text firstRow) throws IOException {
|
|
boolean findFirstRow(int i, Text firstRow) throws IOException {
|
|
|
|
+ ImmutableBytesWritable ibw = new ImmutableBytesWritable();
|
|
HStoreKey firstKey
|
|
HStoreKey firstKey
|
|
- = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]);
|
|
|
|
-
|
|
|
|
- if(firstKey == null) {
|
|
|
|
-
|
|
|
|
|
|
+ = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw);
|
|
|
|
+ if (firstKey == null) {
|
|
// Didn't find it. Close the scanner and return TRUE
|
|
// Didn't find it. Close the scanner and return TRUE
|
|
-
|
|
|
|
closeSubScanner(i);
|
|
closeSubScanner(i);
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
+ this.vals[i] = ibw.get();
|
|
keys[i].setRow(firstKey.getRow());
|
|
keys[i].setRow(firstKey.getRow());
|
|
keys[i].setColumn(firstKey.getColumn());
|
|
keys[i].setColumn(firstKey.getColumn());
|
|
keys[i].setVersion(firstKey.getTimestamp());
|
|
keys[i].setVersion(firstKey.getTimestamp());
|
|
@@ -1232,11 +1223,12 @@ class HStore implements HConstants {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
boolean getNext(int i) throws IOException {
|
|
boolean getNext(int i) throws IOException {
|
|
- vals[i] = new BytesWritable();
|
|
|
|
- if(! readers[i].next(keys[i], vals[i])) {
|
|
|
|
|
|
+ ImmutableBytesWritable ibw = new ImmutableBytesWritable();
|
|
|
|
+ if (!readers[i].next(keys[i], ibw)) {
|
|
closeSubScanner(i);
|
|
closeSubScanner(i);
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
+ vals[i] = ibw.get();
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|