|
@@ -542,8 +542,8 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
|
|
|
|
static class ActiveFile {
|
|
static class ActiveFile {
|
|
- File file;
|
|
|
|
- List<Thread> threads = new ArrayList<Thread>(2);
|
|
|
|
|
|
+ final File file;
|
|
|
|
+ final List<Thread> threads = new ArrayList<Thread>(2);
|
|
|
|
|
|
ActiveFile(File f, List<Thread> list) {
|
|
ActiveFile(File f, List<Thread> list) {
|
|
file = f;
|
|
file = f;
|
|
@@ -763,38 +763,64 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /** interrupt and wait for all ongoing create threads */
|
|
|
|
- private synchronized void interruptOngoingCreates(Block b) {
|
|
|
|
- //remove ongoingCreates threads
|
|
|
|
- ActiveFile activefile = ongoingCreates.get(b);
|
|
|
|
- if (activefile != null) {
|
|
|
|
- for(Thread t : activefile.threads) {
|
|
|
|
|
|
+ /** {@inheritDoc} */
|
|
|
|
+ public void updateBlock(Block oldblock, Block newblock) throws IOException {
|
|
|
|
+ if (oldblock.getBlockId() != newblock.getBlockId()) {
|
|
|
|
+ throw new IOException("Cannot update oldblock (=" + oldblock
|
|
|
|
+ + ") to newblock (=" + newblock + ").");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for(;;) {
|
|
|
|
+ final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
|
|
|
|
+ if (threads == null) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // interrupt and wait for all ongoing create threads
|
|
|
|
+ for(Thread t : threads) {
|
|
t.interrupt();
|
|
t.interrupt();
|
|
}
|
|
}
|
|
- for(Thread t : activefile.threads) {
|
|
|
|
|
|
+ for(Thread t : threads) {
|
|
try {
|
|
try {
|
|
t.join();
|
|
t.join();
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- DataNode.LOG.warn("interruptOngoingCreates: b=" + b
|
|
|
|
- + ", activeFile=" + activefile + ", t=" + t, e);
|
|
|
|
|
|
+ DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- activefile.threads.clear();
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- /** {@inheritDoc} */
|
|
|
|
- public synchronized void updateBlock(Block oldblock, Block newblock
|
|
|
|
- ) throws IOException {
|
|
|
|
- if (oldblock.getBlockId() != newblock.getBlockId()) {
|
|
|
|
- throw new IOException("Cannot update oldblock (=" + oldblock
|
|
|
|
- + ") to newblock (=" + newblock + ").");
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Try to update an old block to a new block.
|
|
|
|
+ * If there are ongoing create threads running for the old block,
|
|
|
|
+ * the threads will be returned without updating the block.
|
|
|
|
+ *
|
|
|
|
+ * @return ongoing create threads if there is any. Otherwise, return null.
|
|
|
|
+ */
|
|
|
|
+ private synchronized List<Thread> tryUpdateBlock(
|
|
|
|
+ Block oldblock, Block newblock) throws IOException {
|
|
|
|
+ //check ongoing create threads
|
|
|
|
+ final ActiveFile activefile = ongoingCreates.get(oldblock);
|
|
|
|
+ if (activefile != null && !activefile.threads.isEmpty()) {
|
|
|
|
+ //remove dead threads
|
|
|
|
+ for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
|
|
|
|
+ final Thread t = i.next();
|
|
|
|
+ if (!t.isAlive()) {
|
|
|
|
+ i.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //return living threads
|
|
|
|
+ if (!activefile.threads.isEmpty()) {
|
|
|
|
+ return new ArrayList<Thread>(activefile.threads);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //No ongoing create threads is alive. Update block.
|
|
File blockFile = findBlockFile(oldblock.getBlockId());
|
|
File blockFile = findBlockFile(oldblock.getBlockId());
|
|
if (blockFile == null) {
|
|
if (blockFile == null) {
|
|
throw new IOException("Block " + oldblock + " does not exist.");
|
|
throw new IOException("Block " + oldblock + " does not exist.");
|
|
}
|
|
}
|
|
- interruptOngoingCreates(oldblock);
|
|
|
|
|
|
|
|
File oldMetaFile = findMetaFile(blockFile);
|
|
File oldMetaFile = findMetaFile(blockFile);
|
|
long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
|
|
long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
|
|
@@ -830,6 +856,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
updateBlockMap(ongoingCreates, oldblock, newblock);
|
|
updateBlockMap(ongoingCreates, oldblock, newblock);
|
|
updateBlockMap(volumeMap, oldblock, newblock);
|
|
updateBlockMap(volumeMap, oldblock, newblock);
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
static private void truncateBlock(File blockFile, File metaFile,
|
|
static private void truncateBlock(File blockFile, File metaFile,
|