|
@@ -1830,19 +1830,7 @@ public class DataNode extends Configured
|
|
|
|
|
|
int dataOff = checksumOff + checksumLen;
|
|
int dataOff = checksumOff + checksumLen;
|
|
|
|
|
|
- if (blockInPosition >= 0) {
|
|
|
|
- //use transferTo(). Checks on out and blockIn are already done.
|
|
|
|
-
|
|
|
|
- SocketOutputStream sockOut = (SocketOutputStream)out;
|
|
|
|
- //first write the packet
|
|
|
|
- sockOut.write(buf, 0, dataOff);
|
|
|
|
- // no need to flush. since we know out is not a buffered stream.
|
|
|
|
-
|
|
|
|
- sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
|
|
|
|
- blockInPosition, len);
|
|
|
|
-
|
|
|
|
- blockInPosition += len;
|
|
|
|
- } else {
|
|
|
|
|
|
+ if (blockInPosition < 0) {
|
|
//normal transfer
|
|
//normal transfer
|
|
IOUtils.readFully(blockIn, buf, dataOff, len);
|
|
IOUtils.readFully(blockIn, buf, dataOff, len);
|
|
|
|
|
|
@@ -1864,8 +1852,43 @@ public class DataNode extends Configured
|
|
cOff += checksumSize;
|
|
cOff += checksumSize;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ //writing is done below (mainly to handle IOException)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ if (blockInPosition >= 0) {
|
|
|
|
+ //use transferTo(). Checks on out and blockIn are already done.
|
|
|
|
+
|
|
|
|
+ SocketOutputStream sockOut = (SocketOutputStream)out;
|
|
|
|
+ //first write the packet
|
|
|
|
+ sockOut.write(buf, 0, dataOff);
|
|
|
|
+ // no need to flush. since we know out is not a buffered stream.
|
|
|
|
|
|
- out.write(buf, 0, dataOff + len);
|
|
|
|
|
|
+ sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
|
|
|
|
+ blockInPosition, len);
|
|
|
|
+
|
|
|
|
+ blockInPosition += len;
|
|
|
|
+ } else {
|
|
|
|
+ // normal transfer
|
|
|
|
+ out.write(buf, 0, dataOff + len);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ /* exception while writing to the client (well, with transferTo(),
|
|
|
|
+ * it could also be while reading from the local file). Many times
|
|
|
|
+ * this error can be ignored. We will let the callers distinguish this
|
|
|
|
+ * from other exceptions if this is not a subclass of IOException.
|
|
|
|
+ */
|
|
|
|
+ if (e.getClass().equals(IOException.class)) {
|
|
|
|
+ // "se" could be a new class in stead of SocketException.
|
|
|
|
+ IOException se = new SocketException("Original Exception : " + e);
|
|
|
|
+ se.initCause(e);
|
|
|
|
+ /* Cange the stacktrace so that original trace is not truncated
|
|
|
|
+ * when printed.*/
|
|
|
|
+ se.setStackTrace(e.getStackTrace());
|
|
|
|
+ throw se;
|
|
|
|
+ }
|
|
|
|
+ throw e;
|
|
}
|
|
}
|
|
|
|
|
|
if (throttler != null) { // rebalancing so throttle
|
|
if (throttler != null) { // rebalancing so throttle
|