|
@@ -38,6 +38,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.Executor;
|
|
|
|
|
|
import javax.management.NotCompliantMBeanException;
|
|
@@ -95,6 +96,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.io.MultipleIOException;
|
|
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
@@ -118,7 +120,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override // FsDatasetSpi
|
|
|
public List<FsVolumeImpl> getVolumes() {
|
|
|
return volumes.volumes;
|
|
@@ -211,11 +212,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
final FsVolumeList volumes;
|
|
|
final Map<String, DatanodeStorage> storageMap;
|
|
|
final FsDatasetAsyncDiskService asyncDiskService;
|
|
|
+ final Daemon lazyWriter;
|
|
|
final FsDatasetCache cacheManager;
|
|
|
private final Configuration conf;
|
|
|
private final int validVolsRequired;
|
|
|
+ private volatile boolean fsRunning;
|
|
|
|
|
|
final ReplicaMap volumeMap;
|
|
|
+ final LazyWriteReplicaTracker lazyWriteReplicaTracker;
|
|
|
+
|
|
|
+ private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
|
|
|
+
|
|
|
|
|
|
// Used for synchronizing access to usage stats
|
|
|
private final Object statsLock = new Object();
|
|
@@ -225,6 +232,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
*/
|
|
|
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
|
|
|
) throws IOException {
|
|
|
+ this.fsRunning = true;
|
|
|
this.datanode = datanode;
|
|
|
this.dataStorage = storage;
|
|
|
this.conf = conf;
|
|
@@ -255,6 +263,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
|
|
volumeMap = new ReplicaMap(this);
|
|
|
+ lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
|
|
ReflectionUtils.newInstance(conf.getClass(
|
|
@@ -264,11 +274,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
|
|
|
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
|
|
|
|
|
|
+ // TODO: Initialize transientReplicaTracker from blocks on disk.
|
|
|
+
|
|
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
|
|
addVolume(dataLocations, storage.getStorageDir(idx));
|
|
|
}
|
|
|
|
|
|
cacheManager = new FsDatasetCache(this);
|
|
|
+ lazyWriter = new Daemon(new LazyWriter(
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
|
|
|
+ lazyWriter.start();
|
|
|
registerMBean(datanode.getDatanodeUuid());
|
|
|
}
|
|
|
|
|
@@ -664,8 +680,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
|
|
|
}
|
|
|
|
|
|
- static File moveBlockFiles(Block b, File srcfile, File destdir
|
|
|
- ) throws IOException {
|
|
|
+ static File moveBlockFiles(Block b, File srcfile, File destdir)
|
|
|
+ throws IOException {
|
|
|
final File dstfile = new File(destdir, b.getBlockName());
|
|
|
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
|
|
|
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
|
|
@@ -688,6 +704,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
return dstfile;
|
|
|
}
|
|
|
|
|
|
+ static File copyBlockFiles(Block b, File srcfile, File destdir)
|
|
|
+ throws IOException {
|
|
|
+ final File dstfile = new File(destdir, b.getBlockName());
|
|
|
+ final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
|
|
|
+ final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
|
|
|
+ try {
|
|
|
+ FileUtils.copyFile(srcmeta, dstmeta);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new IOException("Failed to copy meta file for " + b
|
|
|
+ + " from " + srcmeta + " to " + dstmeta, e);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ FileUtils.copyFile(srcfile, dstfile);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new IOException("Failed to copy block file for " + b
|
|
|
+ + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
|
|
|
+ + " and " + srcfile + " to " + dstfile);
|
|
|
+ }
|
|
|
+ return dstfile;
|
|
|
+ }
|
|
|
+
|
|
|
static private void truncateBlock(File blockFile, File metaFile,
|
|
|
long oldlen, long newlen) throws IOException {
|
|
|
LOG.info("truncateBlock: blockFile=" + blockFile
|
|
@@ -950,6 +990,83 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Attempt to evict one or more transient block replicas we have at least
|
|
|
+ * spaceNeeded bytes free.
|
|
|
+ *
|
|
|
+ * @return true if we were able to free up at least spaceNeeded bytes, false
|
|
|
+ * otherwise.
|
|
|
+ */
|
|
|
+ private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ boolean isAvailable = false;
|
|
|
+
|
|
|
+ LOG.info("Attempting to evict blocks from transient storage");
|
|
|
+
|
|
|
+ // Reverse the map so we can iterate in order of replica creation times,
|
|
|
+ // evicting oldest replicas one at a time until we have sufficient space.
|
|
|
+ TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
|
|
|
+ lazyWriteReplicaTracker.getLruMap();
|
|
|
+ int blocksEvicted = 0;
|
|
|
+
|
|
|
+ // TODO: It is really inefficient to do this with the Object lock held!
|
|
|
+ // TODO: This logic is here just for prototyping.
|
|
|
+ // TODO: We should replace it with proactive discard when ram_disk free space
|
|
|
+ // TODO: falls below a low watermark. That way we avoid fs operations on the
|
|
|
+ // TODO: hot path with the lock held.
|
|
|
+ synchronized (this) {
|
|
|
+ long currentTime = System.currentTimeMillis() / 1000;
|
|
|
+ for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
|
|
|
+ LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
|
|
|
+ LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
|
|
|
+ "; block LMT=" + entry.getKey() +
|
|
|
+ "; currentTime=" + currentTime);
|
|
|
+ ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
|
|
|
+ Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
|
|
+ File blockFile = replicaInfo.getBlockFile();
|
|
|
+ File metaFile = replicaInfo.getMetaFile();
|
|
|
+ long used = blockFile.length() + metaFile.length();
|
|
|
+ lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
|
|
|
+
|
|
|
+ // Move the persisted replica to the finalized directory of
|
|
|
+ // the target volume.
|
|
|
+ BlockPoolSlice bpSlice =
|
|
|
+ lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
|
|
|
+ File newBlockFile = bpSlice.activateSavedReplica(
|
|
|
+ replicaInfo, lazyWriteReplica.savedBlockFile);
|
|
|
+
|
|
|
+ ReplicaInfo newReplicaInfo =
|
|
|
+ new FinalizedReplica(replicaInfo.getBlockId(),
|
|
|
+ replicaInfo.getBytesOnDisk(),
|
|
|
+ replicaInfo.getGenerationStamp(),
|
|
|
+ lazyWriteReplica.lazyPersistVolume,
|
|
|
+ newBlockFile.getParentFile());
|
|
|
+
|
|
|
+ // Update the volumeMap entry. This removes the old entry.
|
|
|
+ volumeMap.add(bpid, newReplicaInfo);
|
|
|
+
|
|
|
+ // Remove the old replicas.
|
|
|
+ blockFile.delete();
|
|
|
+ metaFile.delete();
|
|
|
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
|
|
|
+ ++blocksEvicted;
|
|
|
+
|
|
|
+ if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
|
|
|
+ LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
|
|
|
+ isAvailable = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return isAvailable;
|
|
|
+ }
|
|
|
+
|
|
|
@Override // FsDatasetSpi
|
|
|
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
|
|
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
|
|
@@ -972,7 +1089,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
} catch (DiskOutOfSpaceException de) {
|
|
|
if (allowLazyPersist) {
|
|
|
- allowLazyPersist = false;
|
|
|
+ if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
|
|
|
+ // Eviction did not work, we'll just fallback to DEFAULT storage.
|
|
|
+ LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
|
|
|
+ " bytes for new block. Will fallback to DEFAULT " +
|
|
|
+ "storage");
|
|
|
+ allowLazyPersist = false;
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
throw de;
|
|
@@ -984,6 +1107,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
|
|
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
|
|
|
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
|
|
+
|
|
|
return newReplicaInfo;
|
|
|
}
|
|
|
|
|
@@ -1129,7 +1253,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
|
|
b.getGenerationStamp(), v, f.getParentFile(), 0);
|
|
|
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
|
|
-
|
|
|
return newReplicaInfo;
|
|
|
}
|
|
|
|
|
@@ -1196,8 +1319,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
File dest = v.addFinalizedBlock(
|
|
|
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
|
|
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
|
|
+
|
|
|
+ if (v.isTransientStorage()) {
|
|
|
+ lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
|
|
|
+
|
|
|
+ // Schedule a checkpoint.
|
|
|
+ ((LazyWriter) lazyWriter.getRunnable())
|
|
|
+ .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
|
|
|
+ }
|
|
|
}
|
|
|
volumeMap.add(bpid, newReplicaInfo);
|
|
|
+
|
|
|
return newReplicaInfo;
|
|
|
}
|
|
|
|
|
@@ -1217,6 +1349,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
replicaInfo.getMetaFile(), b.getLocalBlock())) {
|
|
|
LOG.warn("Block " + b + " unfinalized and removed. " );
|
|
|
}
|
|
|
+ if (replicaInfo.getVolume().isTransientStorage()) {
|
|
|
+ lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1312,6 +1447,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
return finalized;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the list of finalized blocks from in-memory blockmap for a block pool.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
|
|
|
+ ArrayList<FinalizedReplica> finalized =
|
|
|
+ new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
|
|
+ for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
|
|
+ if(!b.getVolume().isTransientStorage() &&
|
|
|
+ b.getState() == ReplicaState.FINALIZED) {
|
|
|
+ finalized.add(new FinalizedReplica((FinalizedReplica)b));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return finalized;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check whether the given block is a valid one.
|
|
|
* valid means finalized
|
|
@@ -1429,6 +1580,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
volumeMap.remove(bpid, invalidBlks[i]);
|
|
|
}
|
|
|
|
|
|
+ if (v.isTransientStorage()) {
|
|
|
+ lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
|
|
|
+ }
|
|
|
+
|
|
|
// If a DFSClient has the replica in its cache of short-circuit file
|
|
|
// descriptors (and the client is using ShortCircuitShm), invalidate it.
|
|
|
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
|
@@ -1649,8 +1804,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override // FsDatasetSpi
|
|
|
public void shutdown() {
|
|
|
- if (mbeanName != null)
|
|
|
+ fsRunning = false;
|
|
|
+
|
|
|
+ ((LazyWriter) lazyWriter.getRunnable()).stop();
|
|
|
+ lazyWriter.interrupt();
|
|
|
+
|
|
|
+ if (mbeanName != null) {
|
|
|
MBeans.unregister(mbeanName);
|
|
|
+ }
|
|
|
|
|
|
if (asyncDiskService != null) {
|
|
|
asyncDiskService.shutdown();
|
|
@@ -1659,6 +1820,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
if(volumes != null) {
|
|
|
volumes.shutdown();
|
|
|
}
|
|
|
+
|
|
|
+ try {
|
|
|
+ lazyWriter.join();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
|
|
|
+ "from LazyWriter.join");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetMBean
|
|
@@ -1691,7 +1859,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
*/
|
|
|
@Override
|
|
|
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
|
|
- File diskMetaFile, FsVolumeSpi vol) {
|
|
|
+ File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
|
|
Block corruptBlock = null;
|
|
|
ReplicaInfo memBlockInfo;
|
|
|
synchronized (this) {
|
|
@@ -1724,6 +1892,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
if (blockScanner != null) {
|
|
|
blockScanner.deleteBlock(bpid, new Block(blockId));
|
|
|
}
|
|
|
+ if (vol.isTransientStorage()) {
|
|
|
+ lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
|
|
|
+ }
|
|
|
LOG.warn("Removed block " + blockId
|
|
|
+ " from memory with missing block file on the disk");
|
|
|
// Finally remove the metadata file
|
|
@@ -1747,6 +1918,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
if (blockScanner != null) {
|
|
|
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
|
|
|
}
|
|
|
+ if (vol.isTransientStorage()) {
|
|
|
+ lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
|
|
|
+ }
|
|
|
LOG.warn("Added missing block to memory " + diskBlockInfo);
|
|
|
return;
|
|
|
}
|
|
@@ -1924,9 +2098,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
final String bpid = oldBlock.getBlockPoolId();
|
|
|
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
|
|
LOG.info("updateReplica: " + oldBlock
|
|
|
- + ", recoveryId=" + recoveryId
|
|
|
- + ", length=" + newlength
|
|
|
- + ", replica=" + replica);
|
|
|
+ + ", recoveryId=" + recoveryId
|
|
|
+ + ", length=" + newlength
|
|
|
+ + ", replica=" + replica);
|
|
|
|
|
|
//check replica
|
|
|
if (replica == null) {
|
|
@@ -2196,5 +2370,123 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
|
|
|
nbytes, flags);
|
|
|
}
|
|
|
+
|
|
|
+ private static class BlockIdPair {
|
|
|
+ final String bpid;
|
|
|
+ final long blockId;
|
|
|
+
|
|
|
+ BlockIdPair(final String bpid, final long blockId) {
|
|
|
+ this.bpid = bpid;
|
|
|
+ this.blockId = blockId;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class LazyWriter implements Runnable {
|
|
|
+ private volatile boolean shouldRun = true;
|
|
|
+ final int checkpointerInterval;
|
|
|
+
|
|
|
+ final private Queue<BlockIdPair> blocksPendingCheckpoint;
|
|
|
+
|
|
|
+ public LazyWriter(final int checkpointerInterval) {
|
|
|
+ this.checkpointerInterval = checkpointerInterval;
|
|
|
+ blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Schedule a replica for writing to persistent storage.
|
|
|
+ public synchronized void addReplicaToLazyWriteQueue(
|
|
|
+ String bpid, long blockId) {
|
|
|
+ LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue");
|
|
|
+ blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void moveReplicaToNewVolume(String bpid, long blockId)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
|
|
|
+
|
|
|
+ FsVolumeImpl targetVolume = null;
|
|
|
+ Block block = null;
|
|
|
+ File blockFile = null;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ block = getStoredBlock(bpid, blockId);
|
|
|
+ blockFile = getFile(bpid, blockId);
|
|
|
+
|
|
|
+ if (block == null) {
|
|
|
+ // The block was deleted before it could be checkpointed.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Pick a target volume for the block.
|
|
|
+ targetVolume = volumes.getNextVolume(
|
|
|
+ StorageType.DEFAULT, block.getNumBytes());
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
|
|
|
+ lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
|
|
|
+ File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
|
|
|
+ .lazyPersistReplica(block, blockFile);
|
|
|
+ lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
|
|
|
+ LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
|
|
|
+ " to file " + savedBlockFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Checkpoint a pending replica to persistent storage now.
|
|
|
+ * @return true if there is more work to be done, false otherwise.
|
|
|
+ */
|
|
|
+ private boolean saveNextReplica() {
|
|
|
+ BlockIdPair blockIdPair = null;
|
|
|
+ int moreWorkThreshold = 0;
|
|
|
+
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ // Dequeue the next replica waiting to be checkpointed.
|
|
|
+ blockIdPair = blocksPendingCheckpoint.poll();
|
|
|
+ if (blockIdPair == null) {
|
|
|
+ LOG.info("LazyWriter has no blocks to persist. " +
|
|
|
+ "Thread going to sleep.");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Move the replica outside the lock.
|
|
|
+ moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId);
|
|
|
+
|
|
|
+ } catch(IOException ioe) {
|
|
|
+ // If we failed, put the block on the queue and let a retry
|
|
|
+ // interval elapse before we try again so we don't try to keep
|
|
|
+ // checkpointing the same block in a tight loop.
|
|
|
+ synchronized (this) {
|
|
|
+ blocksPendingCheckpoint.add(blockIdPair);
|
|
|
+ ++moreWorkThreshold;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ return blocksPendingCheckpoint.size() > moreWorkThreshold;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (fsRunning && shouldRun) {
|
|
|
+ try {
|
|
|
+ if (!saveNextReplica()) {
|
|
|
+ Thread.sleep(checkpointerInterval * 1000);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.info("LazyWriter was interrupted, exiting");
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Ignoring exception in LazyWriter:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ shouldRun = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|