|
@@ -225,6 +225,7 @@ public class HRegion implements HConstants {
|
|
protected final long threadWakeFrequency;
|
|
protected final long threadWakeFrequency;
|
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
private final Integer updateLock = new Integer(0);
|
|
private final Integer updateLock = new Integer(0);
|
|
|
|
+ private final Integer splitLock = new Integer(0);
|
|
private final long desiredMaxFileSize;
|
|
private final long desiredMaxFileSize;
|
|
private final long minSequenceId;
|
|
private final long minSequenceId;
|
|
private final String encodedRegionName;
|
|
private final String encodedRegionName;
|
|
@@ -381,54 +382,56 @@ public class HRegion implements HConstants {
|
|
LOG.info("region " + this.regionInfo.getRegionName() + " already closed");
|
|
LOG.info("region " + this.regionInfo.getRegionName() + " already closed");
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
- lock.writeLock().lock();
|
|
|
|
- try {
|
|
|
|
- synchronized (writestate) {
|
|
|
|
- while (writestate.compacting || writestate.flushing) {
|
|
|
|
- try {
|
|
|
|
- writestate.wait();
|
|
|
|
- } catch (InterruptedException iex) {
|
|
|
|
- // continue
|
|
|
|
|
|
+ synchronized (splitLock) {
|
|
|
|
+ lock.writeLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ synchronized (writestate) {
|
|
|
|
+ while (writestate.compacting || writestate.flushing) {
|
|
|
|
+ try {
|
|
|
|
+ writestate.wait();
|
|
|
|
+ } catch (InterruptedException iex) {
|
|
|
|
+ // continue
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ // Disable compacting and flushing by background threads for this
|
|
|
|
+ // region.
|
|
|
|
+ writestate.writesEnabled = false;
|
|
}
|
|
}
|
|
- // Disable compacting and flushing by background threads for this
|
|
|
|
- // region.
|
|
|
|
- writestate.writesEnabled = false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Wait for active scanners to finish. The write lock we hold will prevent
|
|
|
|
- // new scanners from being created.
|
|
|
|
-
|
|
|
|
- synchronized (activeScannerCount) {
|
|
|
|
- while (activeScannerCount.get() != 0) {
|
|
|
|
- try {
|
|
|
|
- activeScannerCount.wait();
|
|
|
|
-
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- // continue
|
|
|
|
|
|
+
|
|
|
|
+ // Wait for active scanners to finish. The write lock we hold will prevent
|
|
|
|
+ // new scanners from being created.
|
|
|
|
+
|
|
|
|
+ synchronized (activeScannerCount) {
|
|
|
|
+ while (activeScannerCount.get() != 0) {
|
|
|
|
+ try {
|
|
|
|
+ activeScannerCount.wait();
|
|
|
|
+
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ // continue
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- // Write lock means no more row locks can be given out. Wait on
|
|
|
|
- // outstanding row locks to come in before we close so we do not drop
|
|
|
|
- // outstanding updates.
|
|
|
|
- waitOnRowLocks();
|
|
|
|
-
|
|
|
|
- // Don't flush the cache if we are aborting
|
|
|
|
- if (!abort) {
|
|
|
|
- internalFlushcache(snapshotMemcaches());
|
|
|
|
- }
|
|
|
|
|
|
|
|
- List<HStoreFile> result = new ArrayList<HStoreFile>();
|
|
|
|
- for (HStore store: stores.values()) {
|
|
|
|
- result.addAll(store.close());
|
|
|
|
|
|
+ // Write lock means no more row locks can be given out. Wait on
|
|
|
|
+ // outstanding row locks to come in before we close so we do not drop
|
|
|
|
+ // outstanding updates.
|
|
|
|
+ waitOnRowLocks();
|
|
|
|
+
|
|
|
|
+ // Don't flush the cache if we are aborting
|
|
|
|
+ if (!abort) {
|
|
|
|
+ internalFlushcache(snapshotMemcaches());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<HStoreFile> result = new ArrayList<HStoreFile>();
|
|
|
|
+ for (HStore store: stores.values()) {
|
|
|
|
+ result.addAll(store.close());
|
|
|
|
+ }
|
|
|
|
+ this.closed.set(true);
|
|
|
|
+ LOG.info("closed " + this.regionInfo.getRegionName());
|
|
|
|
+ return result;
|
|
|
|
+ } finally {
|
|
|
|
+ lock.writeLock().unlock();
|
|
}
|
|
}
|
|
- this.closed.set(true);
|
|
|
|
- LOG.info("closed " + this.regionInfo.getRegionName());
|
|
|
|
- return result;
|
|
|
|
- } finally {
|
|
|
|
- lock.writeLock().unlock();
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -541,89 +544,91 @@ public class HRegion implements HConstants {
|
|
HRegion[] splitRegion(final RegionUnavailableListener listener)
|
|
HRegion[] splitRegion(final RegionUnavailableListener listener)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
|
|
- Text midKey = new Text();
|
|
|
|
- if (!needsSplit(midKey)) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
|
- Path splits = getSplitsDir();
|
|
|
|
- HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
|
|
|
|
- this.regionInfo.getStartKey(), midKey);
|
|
|
|
- Path dirA = getSplitRegionDir(splits,
|
|
|
|
- HRegionInfo.encodeRegionName(regionAInfo.getRegionName()));
|
|
|
|
- if(fs.exists(dirA)) {
|
|
|
|
- throw new IOException("Cannot split; target file collision at " + dirA);
|
|
|
|
- }
|
|
|
|
- HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
|
|
|
|
- midKey, null);
|
|
|
|
- Path dirB = getSplitRegionDir(splits,
|
|
|
|
- HRegionInfo.encodeRegionName(regionBInfo.getRegionName()));
|
|
|
|
- if(this.fs.exists(dirB)) {
|
|
|
|
- throw new IOException("Cannot split; target file collision at " + dirB);
|
|
|
|
- }
|
|
|
|
|
|
+ synchronized (splitLock) {
|
|
|
|
+ Text midKey = new Text();
|
|
|
|
+ if (closed.get() || !needsSplit(midKey)) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
+ Path splits = getSplitsDir();
|
|
|
|
+ HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
|
|
|
|
+ this.regionInfo.getStartKey(), midKey);
|
|
|
|
+ Path dirA = getSplitRegionDir(splits,
|
|
|
|
+ HRegionInfo.encodeRegionName(regionAInfo.getRegionName()));
|
|
|
|
+ if(fs.exists(dirA)) {
|
|
|
|
+ throw new IOException("Cannot split; target file collision at " + dirA);
|
|
|
|
+ }
|
|
|
|
+ HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
|
|
|
|
+ midKey, null);
|
|
|
|
+ Path dirB = getSplitRegionDir(splits,
|
|
|
|
+ HRegionInfo.encodeRegionName(regionBInfo.getRegionName()));
|
|
|
|
+ if(this.fs.exists(dirB)) {
|
|
|
|
+ throw new IOException("Cannot split; target file collision at " + dirB);
|
|
|
|
+ }
|
|
|
|
|
|
- // Notify the caller that we are about to close the region. This moves
|
|
|
|
- // us to the 'retiring' queue. Means no more updates coming in -- just
|
|
|
|
- // whatever is outstanding.
|
|
|
|
- if (listener != null) {
|
|
|
|
- listener.closing(getRegionName());
|
|
|
|
- }
|
|
|
|
|
|
+ // Notify the caller that we are about to close the region. This moves
|
|
|
|
+ // us to the 'retiring' queue. Means no more updates coming in -- just
|
|
|
|
+ // whatever is outstanding.
|
|
|
|
+ if (listener != null) {
|
|
|
|
+ listener.closing(getRegionName());
|
|
|
|
+ }
|
|
|
|
|
|
- // Now close the HRegion. Close returns all store files or null if not
|
|
|
|
- // supposed to close (? What to do in this case? Implement abort of close?)
|
|
|
|
- // Close also does wait on outstanding rows and calls a flush just-in-case.
|
|
|
|
- List<HStoreFile> hstoreFilesToSplit = close();
|
|
|
|
- if (hstoreFilesToSplit == null) {
|
|
|
|
- LOG.warn("Close came back null (Implement abort of close?)");
|
|
|
|
- throw new RuntimeException("close returned empty vector of HStoreFiles");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Tell listener that region is now closed and that they can therefore
|
|
|
|
- // clean up any outstanding references.
|
|
|
|
- if (listener != null) {
|
|
|
|
- listener.closed(this.getRegionName());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Split each store file.
|
|
|
|
- for(HStoreFile h: hstoreFilesToSplit) {
|
|
|
|
- // A reference to the bottom half of the hsf store file.
|
|
|
|
- HStoreFile.Reference aReference = new HStoreFile.Reference(
|
|
|
|
- this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
|
|
|
|
- HStoreFile.Range.bottom);
|
|
|
|
- HStoreFile a = new HStoreFile(this.conf, splits,
|
|
|
|
- HRegionInfo.encodeRegionName(regionAInfo.getRegionName()),
|
|
|
|
- h.getColFamily(), Math.abs(rand.nextLong()), aReference);
|
|
|
|
- // Reference to top half of the hsf store file.
|
|
|
|
- HStoreFile.Reference bReference = new HStoreFile.Reference(
|
|
|
|
- this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
|
|
|
|
- HStoreFile.Range.top);
|
|
|
|
- HStoreFile b = new HStoreFile(this.conf, splits,
|
|
|
|
- HRegionInfo.encodeRegionName(regionBInfo.getRegionName()),
|
|
|
|
- h.getColFamily(), Math.abs(rand.nextLong()), bReference);
|
|
|
|
- h.splitStoreFile(a, b, this.fs);
|
|
|
|
- }
|
|
|
|
|
|
+ // Now close the HRegion. Close returns all store files or null if not
|
|
|
|
+ // supposed to close (? What to do in this case? Implement abort of close?)
|
|
|
|
+ // Close also does wait on outstanding rows and calls a flush just-in-case.
|
|
|
|
+ List<HStoreFile> hstoreFilesToSplit = close();
|
|
|
|
+ if (hstoreFilesToSplit == null) {
|
|
|
|
+ LOG.warn("Close came back null (Implement abort of close?)");
|
|
|
|
+ throw new RuntimeException("close returned empty vector of HStoreFiles");
|
|
|
|
+ }
|
|
|
|
|
|
- // Done!
|
|
|
|
- // Opening the region copies the splits files from the splits directory
|
|
|
|
- // under each region.
|
|
|
|
- HRegion regionA =
|
|
|
|
- new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
|
|
|
|
- regionA.close();
|
|
|
|
- HRegion regionB =
|
|
|
|
- new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
|
|
|
|
- regionB.close();
|
|
|
|
-
|
|
|
|
- // Cleanup
|
|
|
|
- boolean deleted = fs.delete(splits); // Get rid of splits directory
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
|
|
|
|
|
|
+ // Tell listener that region is now closed and that they can therefore
|
|
|
|
+ // clean up any outstanding references.
|
|
|
|
+ if (listener != null) {
|
|
|
|
+ listener.closed(this.getRegionName());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Split each store file.
|
|
|
|
+ for(HStoreFile h: hstoreFilesToSplit) {
|
|
|
|
+ // A reference to the bottom half of the hsf store file.
|
|
|
|
+ HStoreFile.Reference aReference = new HStoreFile.Reference(
|
|
|
|
+ this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
|
|
|
|
+ HStoreFile.Range.bottom);
|
|
|
|
+ HStoreFile a = new HStoreFile(this.conf, splits,
|
|
|
|
+ HRegionInfo.encodeRegionName(regionAInfo.getRegionName()),
|
|
|
|
+ h.getColFamily(), Math.abs(rand.nextLong()), aReference);
|
|
|
|
+ // Reference to top half of the hsf store file.
|
|
|
|
+ HStoreFile.Reference bReference = new HStoreFile.Reference(
|
|
|
|
+ this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
|
|
|
|
+ HStoreFile.Range.top);
|
|
|
|
+ HStoreFile b = new HStoreFile(this.conf, splits,
|
|
|
|
+ HRegionInfo.encodeRegionName(regionBInfo.getRegionName()),
|
|
|
|
+ h.getColFamily(), Math.abs(rand.nextLong()), bReference);
|
|
|
|
+ h.splitStoreFile(a, b, this.fs);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Done!
|
|
|
|
+ // Opening the region copies the splits files from the splits directory
|
|
|
|
+ // under each region.
|
|
|
|
+ HRegion regionA =
|
|
|
|
+ new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
|
|
|
|
+ regionA.close();
|
|
|
|
+ HRegion regionB =
|
|
|
|
+ new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
|
|
|
|
+ regionB.close();
|
|
|
|
+
|
|
|
|
+ // Cleanup
|
|
|
|
+ boolean deleted = fs.delete(splits); // Get rid of splits directory
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
|
|
|
|
+ }
|
|
|
|
+ HRegion regions[] = new HRegion [] {regionA, regionB};
|
|
|
|
+ LOG.info("Region split of " + this.regionInfo.getRegionName() +
|
|
|
|
+ " complete; " + "new regions: " + regions[0].getRegionName() + ", " +
|
|
|
|
+ regions[1].getRegionName() + ". Split took " +
|
|
|
|
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
|
|
|
+ return regions;
|
|
}
|
|
}
|
|
- HRegion regions[] = new HRegion [] {regionA, regionB};
|
|
|
|
- LOG.info("Region split of " + this.regionInfo.getRegionName() +
|
|
|
|
- " complete; " + "new regions: " + regions[0].getRegionName() + ", " +
|
|
|
|
- regions[1].getRegionName() + ". Split took " +
|
|
|
|
- StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
|
|
|
- return regions;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -1030,6 +1035,7 @@ public class HRegion implements HConstants {
|
|
* avoid a bunch of disk activity.
|
|
* avoid a bunch of disk activity.
|
|
*
|
|
*
|
|
* @param row
|
|
* @param row
|
|
|
|
+ * @param ts
|
|
* @return Map<columnName, byte[]> values
|
|
* @return Map<columnName, byte[]> values
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
@@ -1282,6 +1288,7 @@ public class HRegion implements HConstants {
|
|
* @param row The row to operate on
|
|
* @param row The row to operate on
|
|
* @param family The column family to match
|
|
* @param family The column family to match
|
|
* @param timestamp Timestamp to match
|
|
* @param timestamp Timestamp to match
|
|
|
|
+ * @throws IOException
|
|
*/
|
|
*/
|
|
public void deleteFamily(Text row, Text family, long timestamp)
|
|
public void deleteFamily(Text row, Text family, long timestamp)
|
|
throws IOException{
|
|
throws IOException{
|