|
@@ -293,9 +293,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
final StorageType[] targetStorageTypes,
|
|
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
|
//send the TRANSFER_BLOCK request
|
|
|
- new Sender(out)
|
|
|
- .transferBlock(block, blockToken, dfsClient.clientName, targets,
|
|
|
- targetStorageTypes);
|
|
|
+ new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
|
|
|
+ dfsClient.clientName, targets, targetStorageTypes);
|
|
|
out.flush();
|
|
|
//ack
|
|
|
BlockOpResponseProto transferResponse = BlockOpResponseProto
|
|
@@ -315,7 +314,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
private volatile boolean streamerClosed = false;
|
|
|
- private volatile ExtendedBlock block; // its length is number of bytes acked
|
|
|
+ private final BlockToWrite block; // its length is number of bytes acked
|
|
|
private Token<BlockTokenIdentifier> accessToken;
|
|
|
private DataOutputStream blockStream;
|
|
|
private DataInputStream blockReplyStream;
|
|
@@ -364,7 +363,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
|
|
|
isAppend = false;
|
|
|
isLazyPersistFile = isLazyPersist(stat);
|
|
|
- this.block = block;
|
|
|
+ this.block = new BlockToWrite(block);
|
|
|
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|
|
}
|
|
|
|
|
@@ -379,7 +378,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
int bytesPerChecksum) throws IOException {
|
|
|
isAppend = true;
|
|
|
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
|
|
- block = lastBlock.getBlock();
|
|
|
+ block = new BlockToWrite(lastBlock.getBlock());
|
|
|
bytesSent = block.getNumBytes();
|
|
|
accessToken = lastBlock.getBlockToken();
|
|
|
isLazyPersistFile = isLazyPersist(stat);
|
|
@@ -1082,7 +1081,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
LocatedBlock lb;
|
|
|
//get a new datanode
|
|
|
lb = dfsClient.namenode.getAdditionalDatanode(
|
|
|
- src, fileId, block, nodes, storageIDs,
|
|
|
+ src, fileId, block.getCurrentBlock(), nodes, storageIDs,
|
|
|
exclude.toArray(new DatanodeInfo[exclude.size()]),
|
|
|
1, dfsClient.clientName);
|
|
|
// a new node was allocated by the namenode. Update nodes.
|
|
@@ -1260,7 +1259,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
// get a new generation stamp and an access token
|
|
|
- LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
|
|
|
+ LocatedBlock lb = dfsClient.namenode.
|
|
|
+ updateBlockForPipeline(block.getCurrentBlock(), dfsClient.clientName);
|
|
|
newGS = lb.getBlock().getGenerationStamp();
|
|
|
accessToken = lb.getBlockToken();
|
|
|
|
|
@@ -1308,16 +1308,21 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
if (success) {
|
|
|
// update pipeline at the namenode
|
|
|
- ExtendedBlock newBlock = new ExtendedBlock(
|
|
|
- block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
|
|
|
- dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
|
|
|
- nodes, storageIDs);
|
|
|
- // update client side generation stamp
|
|
|
- block = newBlock;
|
|
|
+ final ExtendedBlock oldBlock = block.getCurrentBlock();
|
|
|
+ // the new GS has been propagated to all DN, it should be ok to update the
|
|
|
+ // local block state
|
|
|
+ block.setGenerationStamp(newGS);
|
|
|
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
|
|
|
+ block.getCurrentBlock(), nodes, storageIDs);
|
|
|
}
|
|
|
return false; // do not sleep, continue processing
|
|
|
}
|
|
|
|
|
|
+ DatanodeInfo[] getExcludedNodes() {
|
|
|
+ return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
|
|
|
+ .keySet().toArray(new DatanodeInfo[0]);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Open a DataOutputStream to a DataNode so that it can be written to.
|
|
|
* This happens when a file is created and each time a new block is allocated.
|
|
@@ -1330,20 +1335,17 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
StorageType[] storageTypes = null;
|
|
|
int count = dfsClient.getConf().nBlockWriteRetry;
|
|
|
boolean success = false;
|
|
|
- ExtendedBlock oldBlock = block;
|
|
|
+ ExtendedBlock oldBlock = block.getCurrentBlock();
|
|
|
do {
|
|
|
hasError = false;
|
|
|
lastException.set(null);
|
|
|
errorIndex = -1;
|
|
|
success = false;
|
|
|
|
|
|
- DatanodeInfo[] excluded =
|
|
|
- excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
|
|
|
- .keySet()
|
|
|
- .toArray(new DatanodeInfo[0]);
|
|
|
- block = oldBlock;
|
|
|
- lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
|
|
|
- block = lb.getBlock();
|
|
|
+ DatanodeInfo[] excluded = getExcludedNodes();
|
|
|
+ lb = locateFollowingBlock(
|
|
|
+ excluded.length > 0 ? excluded : null, oldBlock);
|
|
|
+ block.setCurrentBlock(lb.getBlock());
|
|
|
block.setNumBytes(0);
|
|
|
bytesSent = 0;
|
|
|
accessToken = lb.getBlockToken();
|
|
@@ -1357,9 +1359,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
if (!success) {
|
|
|
DFSClient.LOG.info("Abandoning " + block);
|
|
|
- dfsClient.namenode.abandonBlock(block, fileId, src,
|
|
|
- dfsClient.clientName);
|
|
|
- block = null;
|
|
|
+ dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
|
|
|
+ fileId, src, dfsClient.clientName);
|
|
|
+ block.setCurrentBlock(null);
|
|
|
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
|
|
|
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
|
|
|
}
|
|
@@ -1421,7 +1423,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
// We cannot change the block length in 'block' as it counts the number
|
|
|
// of bytes ack'ed.
|
|
|
- ExtendedBlock blockCopy = new ExtendedBlock(block);
|
|
|
+ ExtendedBlock blockCopy = block.getCurrentBlock();
|
|
|
blockCopy.setNumBytes(blockSize);
|
|
|
|
|
|
boolean[] targetPinnings = getPinnings(nodes, true);
|
|
@@ -1539,7 +1541,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
|
|
|
+ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
|
|
|
+ ExtendedBlock oldBlock) throws IOException {
|
|
|
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
|
|
long sleeptime = 400;
|
|
|
while (true) {
|
|
@@ -1547,7 +1550,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
while (true) {
|
|
|
try {
|
|
|
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
|
|
- block, excludedNodes, fileId, favoredNodes);
|
|
|
+ oldBlock, excluded, fileId, favoredNodes);
|
|
|
} catch (RemoteException e) {
|
|
|
IOException ue =
|
|
|
e.unwrapRemoteException(FileNotFoundException.class,
|
|
@@ -1591,7 +1594,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
ExtendedBlock getBlock() {
|
|
|
- return block;
|
|
|
+ return block.getCurrentBlock();
|
|
|
}
|
|
|
|
|
|
DatanodeInfo[] getNodes() {
|
|
@@ -1607,6 +1610,42 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static class BlockToWrite {
|
|
|
+ private ExtendedBlock currentBlock;
|
|
|
+
|
|
|
+ BlockToWrite(ExtendedBlock block) {
|
|
|
+ setCurrentBlock(block);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized ExtendedBlock getCurrentBlock() {
|
|
|
+ return currentBlock == null ? null : new ExtendedBlock(currentBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized long getNumBytes() {
|
|
|
+ return currentBlock == null ? 0 : currentBlock.getNumBytes();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void setCurrentBlock(ExtendedBlock block) {
|
|
|
+ currentBlock = (block == null || block.getLocalBlock() == null) ?
|
|
|
+ null : new ExtendedBlock(block);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void setNumBytes(long numBytes) {
|
|
|
+ assert currentBlock != null;
|
|
|
+ currentBlock.setNumBytes(numBytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void setGenerationStamp(long generationStamp) {
|
|
|
+ assert currentBlock != null;
|
|
|
+ currentBlock.setGenerationStamp(generationStamp);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized String toString() {
|
|
|
+ return currentBlock == null ? "null" : currentBlock.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Create a socket for a write pipeline
|
|
|
* @param first the first datanode
|
|
@@ -2169,8 +2208,11 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
// update the block length first time irrespective of flag
|
|
|
if (updateLength || persistBlocks.get()) {
|
|
|
synchronized (this) {
|
|
|
- if (streamer != null && streamer.block != null) {
|
|
|
- lastBlockLength = streamer.block.getNumBytes();
|
|
|
+ if (streamer != null && !streamer.streamerClosed) {
|
|
|
+ final ExtendedBlock block = streamer.getBlock();
|
|
|
+ if (block != null) {
|
|
|
+ lastBlockLength = block.getNumBytes();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|