|
@@ -299,11 +299,17 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
if (detachDir.exists()) {
|
|
|
recoverDetachedBlocks(currentDir, detachDir);
|
|
|
}
|
|
|
- this.dataDir = new FSDir(currentDir);
|
|
|
+
|
|
|
+ // Files that were being written when the datanode was last shutdown
|
|
|
+ // are now moved back to the data directory. It is possible that
|
|
|
+ // in the future, we might want to do some sort of datanode-local
|
|
|
+ // recovery for these blocks. For example, crc validation.
|
|
|
+ //
|
|
|
this.tmpDir = new File(parent, "tmp");
|
|
|
if (tmpDir.exists()) {
|
|
|
- FileUtil.fullyDelete(tmpDir);
|
|
|
+ recoverDetachedBlocks(currentDir, tmpDir);
|
|
|
}
|
|
|
+ this.dataDir = new FSDir(currentDir);
|
|
|
if (!tmpDir.mkdirs()) {
|
|
|
if (!tmpDir.isDirectory()) {
|
|
|
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
|
|
@@ -651,8 +657,6 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
|
|
|
static Random random = new Random();
|
|
|
|
|
|
- long blockWriteTimeout = 3600 * 1000;
|
|
|
-
|
|
|
/**
|
|
|
* An FSDataset has a directory where it loads its data files.
|
|
|
*/
|
|
@@ -665,8 +669,6 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
volumes = new FSVolumeSet(volArray);
|
|
|
volumeMap = new HashMap<Block, DatanodeBlockInfo>();
|
|
|
volumes.getVolumeMap(volumeMap);
|
|
|
- blockWriteTimeout = Math.max(
|
|
|
- conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000L;
|
|
|
registerMBean(storage.getStorageID());
|
|
|
}
|
|
|
|
|
@@ -910,18 +912,8 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
threads = activeFile.threads;
|
|
|
|
|
|
if (!isRecovery) {
|
|
|
- // check how old is the temp file - wait 1 hour
|
|
|
- if ((System.currentTimeMillis() - f.lastModified()) <
|
|
|
- blockWriteTimeout) {
|
|
|
- throw new IOException("Block " + b +
|
|
|
+ throw new IOException("Block " + b +
|
|
|
" has already been started (though not completed), and thus cannot be created.");
|
|
|
- } else {
|
|
|
- // stale temp file - remove
|
|
|
- if (!f.delete()) {
|
|
|
- throw new IOException("Can't write the block - unable to remove stale temp file " + f);
|
|
|
- }
|
|
|
- f = null;
|
|
|
- }
|
|
|
} else {
|
|
|
for (Thread thread:threads) {
|
|
|
thread.interrupt();
|
|
@@ -1016,7 +1008,11 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Complete the block write!
|
|
|
*/
|
|
|
public synchronized void finalizeBlock(Block b) throws IOException {
|
|
|
- File f = ongoingCreates.get(b).file;
|
|
|
+ ActiveFile activeFile = ongoingCreates.get(b);
|
|
|
+ if (activeFile == null) {
|
|
|
+ throw new IOException("Block " + b + " is already finalized.");
|
|
|
+ }
|
|
|
+ File f = activeFile.file;
|
|
|
if (f == null || !f.exists()) {
|
|
|
throw new IOException("No temporary file " + f + " for block " + b);
|
|
|
}
|