|
@@ -1134,32 +1134,39 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // interrupt and wait for all ongoing create threads
|
|
|
- for(Thread t : threads) {
|
|
|
- t.interrupt();
|
|
|
- }
|
|
|
- for(Thread t : threads) {
|
|
|
- try {
|
|
|
- t.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
|
|
|
- break; // retry with new threadlist from the beginning
|
|
|
- }
|
|
|
+ interruptAndJoinThreads(threads);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Try to interrupt all of the given threads, and join on them.
|
|
|
+ * If interrupted, returns false, indicating some threads may
|
|
|
+ * still be running.
|
|
|
+ */
|
|
|
+ private boolean interruptAndJoinThreads(List<Thread> threads) {
|
|
|
+ // interrupt and wait for all ongoing create threads
|
|
|
+ for(Thread t : threads) {
|
|
|
+ t.interrupt();
|
|
|
+ }
|
|
|
+ for(Thread t : threads) {
|
|
|
+ try {
|
|
|
+ t.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
- * Try to update an old block to a new block.
|
|
|
- * If there are ongoing create threads running for the old block,
|
|
|
- * the threads will be returned without updating the block.
|
|
|
- *
|
|
|
- * @return ongoing create threads if there is any. Otherwise, return null.
|
|
|
+ * Return a list of active writer threads for the given block.
|
|
|
+ * @return null if there are no such threads or the file is
|
|
|
+ * not being created
|
|
|
*/
|
|
|
- private synchronized List<Thread> tryUpdateBlock(
|
|
|
- Block oldblock, Block newblock) throws IOException {
|
|
|
- //check ongoing create threads
|
|
|
- final ActiveFile activefile = ongoingCreates.get(oldblock);
|
|
|
+ private synchronized ArrayList<Thread> getActiveThreads(Block block) {
|
|
|
+ final ActiveFile activefile = ongoingCreates.get(block);
|
|
|
if (activefile != null && !activefile.threads.isEmpty()) {
|
|
|
//remove dead threads
|
|
|
for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
|
|
@@ -1168,13 +1175,30 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
i.remove();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//return living threads
|
|
|
if (!activefile.threads.isEmpty()) {
|
|
|
return new ArrayList<Thread>(activefile.threads);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Try to update an old block to a new block.
|
|
|
+ * If there are ongoing create threads running for the old block,
|
|
|
+ * the threads will be returned without updating the block.
|
|
|
+ *
|
|
|
+ * @return ongoing create threads if there is any. Otherwise, return null.
|
|
|
+ */
|
|
|
+ private synchronized List<Thread> tryUpdateBlock(
|
|
|
+ Block oldblock, Block newblock) throws IOException {
|
|
|
+ //check ongoing create threads
|
|
|
+ ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
|
|
|
+ if (activeThreads != null) {
|
|
|
+ return activeThreads;
|
|
|
+ }
|
|
|
+
|
|
|
//No ongoing create threads is alive. Update block.
|
|
|
File blockFile = findBlockFile(oldblock.getBlockId());
|
|
|
if (blockFile == null) {
|
|
@@ -1945,30 +1969,42 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
|
|
|
- throws IOException {
|
|
|
+ public BlockRecoveryInfo startBlockRecovery(long blockId)
|
|
|
+ throws IOException {
|
|
|
Block stored = getStoredBlock(blockId);
|
|
|
|
|
|
if (stored == null) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- ActiveFile activeFile = ongoingCreates.get(stored);
|
|
|
- boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
|
|
|
-
|
|
|
+ // It's important that this loop not be synchronized - otherwise
|
|
|
+ // this will deadlock against the thread it's joining against!
|
|
|
+ while (true) {
|
|
|
+ DataNode.LOG.debug(
|
|
|
+ "Interrupting active writer threads for block " + stored);
|
|
|
+ List<Thread> activeThreads = getActiveThreads(stored);
|
|
|
+ if (activeThreads == null) break;
|
|
|
+ if (interruptAndJoinThreads(activeThreads))
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- BlockRecoveryInfo info = new BlockRecoveryInfo(
|
|
|
- stored, isRecovery);
|
|
|
- if (DataNode.LOG.isDebugEnabled()) {
|
|
|
- DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
|
|
|
- " length " + stored.getNumBytes() +
|
|
|
- " genstamp " + stored.getGenerationStamp());
|
|
|
+ synchronized (this) {
|
|
|
+ ActiveFile activeFile = ongoingCreates.get(stored);
|
|
|
+ boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
|
|
|
+
|
|
|
+
|
|
|
+ BlockRecoveryInfo info = new BlockRecoveryInfo(
|
|
|
+ stored, isRecovery);
|
|
|
+ if (DataNode.LOG.isDebugEnabled()) {
|
|
|
+ DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
|
|
|
+ " length " + stored.getNumBytes() +
|
|
|
+ " genstamp " + stored.getGenerationStamp());
|
|
|
+ }
|
|
|
+
|
|
|
+ // paranoia! verify that the contents of the stored block
|
|
|
+ // matches the block file on disk.
|
|
|
+ validateBlockMetadata(stored);
|
|
|
+ return info;
|
|
|
}
|
|
|
-
|
|
|
- // paranoia! verify that the contents of the stored block
|
|
|
- // matches the block file on disk.
|
|
|
- validateBlockMetadata(stored);
|
|
|
-
|
|
|
- return info;
|
|
|
}
|
|
|
}
|