|
@@ -377,7 +377,7 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
try {
|
|
|
s = new Socket(targetAddr.getAddress(), targetAddr.getPort());
|
|
|
- s.setSoTimeout(READ_TIMEOUT);
|
|
|
+ //s.setSoTimeout(READ_TIMEOUT);
|
|
|
|
|
|
//
|
|
|
// Xmit header info to datanode
|
|
@@ -528,11 +528,8 @@ class DFSClient implements FSConstants {
|
|
|
private UTF8 src;
|
|
|
boolean closingDown = false;
|
|
|
private boolean overwrite;
|
|
|
- private boolean blockStreamWorking;
|
|
|
private DataOutputStream blockStream;
|
|
|
private DataInputStream blockReplyStream;
|
|
|
- private File backupFile;
|
|
|
- private OutputStream backupStream;
|
|
|
private Block block;
|
|
|
private DatanodeInfo targets[];
|
|
|
private long filePos = 0;
|
|
@@ -546,9 +543,7 @@ class DFSClient implements FSConstants {
|
|
|
this.overwrite = overwrite;
|
|
|
this.blockStream = null;
|
|
|
this.blockReplyStream = null;
|
|
|
- this.blockStreamWorking = false;
|
|
|
- this.backupFile = File.createTempFile("dfsout", "bak");
|
|
|
- this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
|
|
|
+
|
|
|
nextBlockOutputStream(true);
|
|
|
}
|
|
|
|
|
@@ -558,12 +553,6 @@ class DFSClient implements FSConstants {
|
|
|
* Must get block ID and the IDs of the destinations from the namenode.
|
|
|
*/
|
|
|
private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
|
|
|
- if (! firstTime && blockStreamWorking) {
|
|
|
- blockStream.close();
|
|
|
- blockReplyStream.close();
|
|
|
- blockStreamWorking = false;
|
|
|
- }
|
|
|
-
|
|
|
boolean retry = false;
|
|
|
long start = System.currentTimeMillis();
|
|
|
do {
|
|
@@ -602,7 +591,7 @@ class DFSClient implements FSConstants {
|
|
|
Socket s = null;
|
|
|
try {
|
|
|
s = new Socket(target.getAddress(), target.getPort());
|
|
|
- s.setSoTimeout(READ_TIMEOUT);
|
|
|
+ //s.setSoTimeout(READ_TIMEOUT);
|
|
|
} catch (IOException ie) {
|
|
|
// Connection failed. Let's wait a little bit and retry
|
|
|
try {
|
|
@@ -636,7 +625,6 @@ class DFSClient implements FSConstants {
|
|
|
bytesWrittenToBlock = 0;
|
|
|
blockStream = out;
|
|
|
blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
|
|
|
- blockStreamWorking = true;
|
|
|
} while (retry);
|
|
|
}
|
|
|
|
|
@@ -717,27 +705,21 @@ class DFSClient implements FSConstants {
|
|
|
//
|
|
|
// To the blockStream, write length, then bytes
|
|
|
//
|
|
|
- if (blockStreamWorking) {
|
|
|
- try {
|
|
|
- blockStream.writeLong(workingPos);
|
|
|
- blockStream.write(outBuf, 0, workingPos);
|
|
|
- } catch (IOException ie) {
|
|
|
- try {
|
|
|
- blockStream.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
- try {
|
|
|
- blockReplyStream.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
- namenode.abandonBlock(block, src.toString());
|
|
|
- blockStreamWorking = false;
|
|
|
- }
|
|
|
+ try {
|
|
|
+ blockStream.writeLong(workingPos);
|
|
|
+ blockStream.write(outBuf, 0, workingPos);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ try {
|
|
|
+ blockStream.close();
|
|
|
+ } catch (IOException ie2) {
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ blockReplyStream.close();
|
|
|
+ } catch (IOException ie2) {
|
|
|
+ }
|
|
|
+ namenode.abandonBlock(block, src.toString());
|
|
|
+ throw ie;
|
|
|
}
|
|
|
- //
|
|
|
- // To the local block backup, write just the bytes
|
|
|
- //
|
|
|
- backupStream.write(outBuf, 0, workingPos);
|
|
|
|
|
|
//
|
|
|
// Track position
|
|
@@ -752,79 +734,20 @@ class DFSClient implements FSConstants {
|
|
|
* We're done writing to the current block.
|
|
|
*/
|
|
|
private synchronized void endBlock() throws IOException {
|
|
|
- boolean mustRecover = ! blockStreamWorking;
|
|
|
-
|
|
|
- //
|
|
|
- // A zero-length set of data indicates the end of the block
|
|
|
- //
|
|
|
- if (blockStreamWorking) {
|
|
|
- try {
|
|
|
- internalClose();
|
|
|
- } catch (IOException ie) {
|
|
|
- try {
|
|
|
- blockStream.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
- try {
|
|
|
- blockReplyStream.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
- namenode.abandonBlock(block, src.toString());
|
|
|
- mustRecover = true;
|
|
|
- } finally {
|
|
|
- blockStreamWorking = false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Done with local copy
|
|
|
- //
|
|
|
- backupStream.close();
|
|
|
-
|
|
|
- //
|
|
|
- // If necessary, recover from a failed datanode connection.
|
|
|
- //
|
|
|
- while (mustRecover) {
|
|
|
- nextBlockOutputStream(false);
|
|
|
- InputStream in = new FileInputStream(backupFile);
|
|
|
- try {
|
|
|
- byte buf[] = new byte[BUFFER_SIZE];
|
|
|
- int bytesRead = in.read(buf);
|
|
|
- while (bytesRead >= 0) {
|
|
|
- blockStream.writeLong((long) bytesRead);
|
|
|
- blockStream.write(buf, 0, bytesRead);
|
|
|
- bytesRead = in.read(buf);
|
|
|
- }
|
|
|
- internalClose();
|
|
|
- LOG.info("Recovered from failed datanode connection");
|
|
|
- mustRecover = false;
|
|
|
- } catch (IOException ie) {
|
|
|
- try {
|
|
|
- blockStream.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
- try {
|
|
|
- blockReplyStream.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
- namenode.abandonBlock(block, src.toString());
|
|
|
- blockStreamWorking = false;
|
|
|
- }
|
|
|
+ try {
|
|
|
+ internalClose();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ namenode.abandonBlock(block, src.toString());
|
|
|
+ throw ie;
|
|
|
}
|
|
|
-
|
|
|
- //
|
|
|
- // Delete local backup, start new one
|
|
|
- //
|
|
|
- backupFile.delete();
|
|
|
- backupFile = File.createTempFile("dfsout", "bak");
|
|
|
- backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Close down stream to remote datanode. Called from two places
|
|
|
- * in endBlock();
|
|
|
+ * Close down stream to remote datanode.
|
|
|
*/
|
|
|
private synchronized void internalClose() throws IOException {
|
|
|
+ try {
|
|
|
+ // A zero-length set of data indicates the end of the block
|
|
|
blockStream.writeLong(0);
|
|
|
blockStream.flush();
|
|
|
|
|
@@ -838,8 +761,16 @@ class DFSClient implements FSConstants {
|
|
|
lb.readFields(blockReplyStream);
|
|
|
namenode.reportWrittenBlock(lb);
|
|
|
|
|
|
- blockStream.close();
|
|
|
- blockReplyStream.close();
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ blockStream.close();
|
|
|
+ } catch (IOException ie2) {
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ blockReplyStream.close();
|
|
|
+ } catch (IOException ie2) {
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -855,14 +786,9 @@ class DFSClient implements FSConstants {
|
|
|
flush();
|
|
|
endBlock();
|
|
|
|
|
|
- backupStream.close();
|
|
|
- backupFile.delete();
|
|
|
+ blockStream.close();
|
|
|
+ blockReplyStream.close();
|
|
|
|
|
|
- if (blockStreamWorking) {
|
|
|
- blockStream.close();
|
|
|
- blockReplyStream.close();
|
|
|
- blockStreamWorking = false;
|
|
|
- }
|
|
|
super.close();
|
|
|
|
|
|
long localstart = System.currentTimeMillis();
|