|
@@ -696,6 +696,7 @@ class DFSClient implements FSConstants {
|
|
|
*/
|
|
|
static class BlockReader extends FSInputChecker {
|
|
|
|
|
|
+ private Socket dnSock; //for now just sending checksumOk.
|
|
|
private DataInputStream in;
|
|
|
private DataChecksum checksum;
|
|
|
private long lastChunkOffset = -1;
|
|
@@ -739,7 +740,12 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return super.read(buf, off, len);
|
|
|
+ int nRead = super.read(buf, off, len);
|
|
|
+ if (nRead >= 0 && gotEOS && needChecksum()) {
|
|
|
+ //checksum is verified and there are no errors.
|
|
|
+ checksumOk(dnSock);
|
|
|
+ }
|
|
|
+ return nRead;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -892,13 +898,15 @@ class DFSClient implements FSConstants {
|
|
|
|
|
|
private BlockReader( String file, long blockId, DataInputStream in,
|
|
|
DataChecksum checksum, boolean verifyChecksum,
|
|
|
- long startOffset, long firstChunkOffset ) {
|
|
|
+ long startOffset, long firstChunkOffset,
|
|
|
+ Socket dnSock ) {
|
|
|
super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
|
|
|
1, verifyChecksum,
|
|
|
checksum.getChecksumSize() > 0? checksum : null,
|
|
|
checksum.getBytesPerChecksum(),
|
|
|
checksum.getChecksumSize());
|
|
|
|
|
|
+ this.dnSock = dnSock;
|
|
|
this.in = in;
|
|
|
this.checksum = checksum;
|
|
|
this.startOffset = Math.max( startOffset, 0 );
|
|
@@ -965,7 +973,7 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
|
|
|
return new BlockReader( file, blockId, in, checksum, verifyChecksum,
|
|
|
- startOffset, firstChunkOffset );
|
|
|
+ startOffset, firstChunkOffset, sock );
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -986,7 +994,7 @@ class DFSClient implements FSConstants {
|
|
|
* errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
|
|
|
* checksum was verified and there was no error.
|
|
|
*/
|
|
|
- void checksumOk(Socket sock) {
|
|
|
+ private void checksumOk(Socket sock) {
|
|
|
try {
|
|
|
OutputStream out = NetUtils.getOutputStream(sock, WRITE_TIMEOUT);
|
|
|
byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
|
|
@@ -1297,9 +1305,6 @@ class DFSClient implements FSConstants {
|
|
|
|
|
|
if (result >= 0) {
|
|
|
pos += result;
|
|
|
- if ( pos > blockEnd ) {
|
|
|
- blockReader.checksumOk(s);
|
|
|
- }
|
|
|
} else {
|
|
|
// got a EOS from reader though we expect more data on it.
|
|
|
throw new IOException("Unexpected EOS from the reader");
|
|
@@ -1368,6 +1373,7 @@ class DFSClient implements FSConstants {
|
|
|
DNAddrPair retval = chooseDataNode(block);
|
|
|
DatanodeInfo chosenNode = retval.info;
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
|
+ BlockReader reader = null;
|
|
|
|
|
|
try {
|
|
|
dn = socketFactory.createSocket();
|
|
@@ -1376,9 +1382,10 @@ class DFSClient implements FSConstants {
|
|
|
|
|
|
int len = (int) (end - start + 1);
|
|
|
|
|
|
- BlockReader reader =
|
|
|
- BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(),
|
|
|
- start, len, buffersize, verifyChecksum);
|
|
|
+ reader = BlockReader.newBlockReader(dn, src,
|
|
|
+ block.getBlock().getBlockId(),
|
|
|
+ start, len, buffersize,
|
|
|
+ verifyChecksum);
|
|
|
int nread = reader.readAll(buf, offset, len);
|
|
|
if (nread != len) {
|
|
|
throw new IOException("truncated return from reader.read(): " +
|
|
@@ -1397,16 +1404,13 @@ class DFSClient implements FSConstants {
|
|
|
" for file " + src +
|
|
|
" for block " + block.getBlock().getBlockId() + ":" +
|
|
|
StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- // Put chosen node into dead list, continue
|
|
|
- addToDeadNodes(chosenNode);
|
|
|
- if (dn != null) {
|
|
|
- try {
|
|
|
- dn.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- }
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(reader);
|
|
|
+ IOUtils.closeSocket(dn);
|
|
|
dn = null;
|
|
|
}
|
|
|
+ // Put chosen node into dead list, continue
|
|
|
+ addToDeadNodes(chosenNode);
|
|
|
}
|
|
|
throw (ioe == null) ? new IOException("Could not read data") : ioe;
|
|
|
}
|