|
@@ -36,7 +36,6 @@ import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSOutputSummer;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
@@ -63,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
|
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
@@ -606,6 +604,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
try {
|
|
|
blockStream.close();
|
|
|
} catch (IOException e) {
|
|
|
+ setLastException(e);
|
|
|
} finally {
|
|
|
blockStream = null;
|
|
|
}
|
|
@@ -614,10 +613,20 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
try {
|
|
|
blockReplyStream.close();
|
|
|
} catch (IOException e) {
|
|
|
+ setLastException(e);
|
|
|
} finally {
|
|
|
blockReplyStream = null;
|
|
|
}
|
|
|
}
|
|
|
+ if (null != s) {
|
|
|
+ try {
|
|
|
+ s.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ setLastException(e);
|
|
|
+ } finally {
|
|
|
+ s = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -1003,16 +1012,20 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
persistBlocks.set(true);
|
|
|
|
|
|
boolean result = false;
|
|
|
+ DataOutputStream out = null;
|
|
|
try {
|
|
|
+ assert null == s : "Previous socket unclosed";
|
|
|
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
|
|
|
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
|
|
|
|
|
|
//
|
|
|
// Xmit header info to datanode
|
|
|
//
|
|
|
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ out = new DataOutputStream(new BufferedOutputStream(
|
|
|
NetUtils.getOutputStream(s, writeTimeout),
|
|
|
FSConstants.SMALL_BUFFER_SIZE));
|
|
|
+
|
|
|
+ assert null == blockReplyStream : "Previous blockReplyStream unclosed";
|
|
|
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
|
|
|
|
|
|
// send the request
|
|
@@ -1038,7 +1051,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
+ firstBadLink);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ assert null == blockStream : "Previous blockStream unclosed";
|
|
|
blockStream = out;
|
|
|
result = true; // success
|
|
|
|
|
@@ -1059,12 +1072,15 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
}
|
|
|
hasError = true;
|
|
|
setLastException(ie);
|
|
|
- blockReplyStream = null;
|
|
|
result = false; // error
|
|
|
} finally {
|
|
|
if (!result) {
|
|
|
IOUtils.closeSocket(s);
|
|
|
s = null;
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ out = null;
|
|
|
+ IOUtils.closeStream(blockReplyStream);
|
|
|
+ blockReplyStream = null;
|
|
|
}
|
|
|
}
|
|
|
return result;
|