|
@@ -464,6 +464,16 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Utility class to encapsulate data node info and its ip address. */
|
|
|
+ private static class DNAddrPair {
|
|
|
+ DatanodeInfo info;
|
|
|
+ InetSocketAddress addr;
|
|
|
+ DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
|
|
|
+ this.info = info;
|
|
|
+ this.addr = addr;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/****************************************************************
|
|
|
* DFSInputStream provides bytes from a named file. It handles
|
|
|
* negotiation of the namenode and various datanodes as necessary.
|
|
@@ -494,7 +504,7 @@ class DFSClient implements FSConstants {
|
|
|
/**
|
|
|
* Grab the open-file info from namenode
|
|
|
*/
|
|
|
- void openInfo() throws IOException {
|
|
|
+ synchronized void openInfo() throws IOException {
|
|
|
Block oldBlocks[] = this.blocks;
|
|
|
|
|
|
LocatedBlock results[] = namenode.open(src);
|
|
@@ -560,33 +570,12 @@ class DFSClient implements FSConstants {
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
|
//
|
|
|
int failures = 0;
|
|
|
- InetSocketAddress targetAddr = null;
|
|
|
TreeSet deadNodes = new TreeSet();
|
|
|
while (s == null) {
|
|
|
- DatanodeInfo chosenNode;
|
|
|
-
|
|
|
- try {
|
|
|
- chosenNode = bestNode(nodes[targetBlock], deadNodes);
|
|
|
- targetAddr = DataNode.createSocketAddr(chosenNode.getName());
|
|
|
- } catch (IOException ie) {
|
|
|
- String blockInfo =
|
|
|
- blocks[targetBlock]+" file="+src+" offset="+target;
|
|
|
- if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
|
|
|
- throw new IOException("Could not obtain block: " + blockInfo);
|
|
|
- }
|
|
|
- if (nodes[targetBlock] == null || nodes[targetBlock].length == 0) {
|
|
|
- LOG.info("No node available for block: " + blockInfo);
|
|
|
- }
|
|
|
- LOG.info("Could not obtain block from any node: " + ie);
|
|
|
- try {
|
|
|
- Thread.sleep(3000);
|
|
|
- } catch (InterruptedException iex) {
|
|
|
- }
|
|
|
- deadNodes.clear();
|
|
|
- openInfo();
|
|
|
- failures++;
|
|
|
- continue;
|
|
|
- }
|
|
|
+ DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
|
|
|
+ DatanodeInfo chosenNode = retval.info;
|
|
|
+ InetSocketAddress targetAddr = retval.addr;
|
|
|
+
|
|
|
try {
|
|
|
s = new Socket();
|
|
|
s.connect(targetAddr, READ_TIMEOUT);
|
|
@@ -704,11 +693,142 @@ class DFSClient implements FSConstants {
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private DNAddrPair chooseDataNode(int blockId, TreeSet deadNodes)
|
|
|
+ throws IOException {
|
|
|
+ int failures = 0;
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
|
|
|
+ InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
|
|
|
+ return new DNAddrPair(chosenNode, targetAddr);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ String blockInfo =
|
|
|
+ blocks[blockId]+" file="+src;
|
|
|
+ if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
|
|
|
+ throw new IOException("Could not obtain block: " + blockInfo);
|
|
|
+ }
|
|
|
+ if (nodes[blockId] == null || nodes[blockId].length == 0) {
|
|
|
+ LOG.info("No node available for block: " + blockInfo);
|
|
|
+ }
|
|
|
+ LOG.info("Could not obtain block from any node: " + ie);
|
|
|
+ try {
|
|
|
+ Thread.sleep(3000);
|
|
|
+ } catch (InterruptedException iex) {
|
|
|
+ }
|
|
|
+ deadNodes.clear();
|
|
|
+ openInfo();
|
|
|
+ failures++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void fetchBlockByteRange(int blockId, long start,
|
|
|
+ long end, byte[] buf, int offset) throws IOException {
|
|
|
+ //
|
|
|
+ // Connect to best DataNode for desired Block, with potential offset
|
|
|
+ //
|
|
|
+ TreeSet deadNodes = new TreeSet();
|
|
|
+ Socket dn = null;
|
|
|
+ while (dn == null) {
|
|
|
+ DNAddrPair retval = chooseDataNode(blockId, deadNodes);
|
|
|
+ DatanodeInfo chosenNode = retval.info;
|
|
|
+ InetSocketAddress targetAddr = retval.addr;
|
|
|
+
|
|
|
+ try {
|
|
|
+ dn = new Socket();
|
|
|
+ dn.connect(targetAddr, READ_TIMEOUT);
|
|
|
+ dn.setSoTimeout(READ_TIMEOUT);
|
|
|
+
|
|
|
+ //
|
|
|
+ // Xmit header info to datanode
|
|
|
+ //
|
|
|
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
|
|
|
+ out.write(OP_READ_RANGE_BLOCK);
|
|
|
+ blocks[blockId].write(out);
|
|
|
+ out.writeLong(start);
|
|
|
+ out.writeLong(end);
|
|
|
+ out.flush();
|
|
|
+
|
|
|
+ //
|
|
|
+ // Get bytes in block, set streams
|
|
|
+ //
|
|
|
+ DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
|
|
|
+ long curBlockSize = in.readLong();
|
|
|
+ long actualStart = in.readLong();
|
|
|
+ long actualEnd = in.readLong();
|
|
|
+ if (curBlockSize != blocks[blockId].len) {
|
|
|
+ throw new IOException("Recorded block size is " +
|
|
|
+ blocks[blockId].len + ", but datanode reports size of " +
|
|
|
+ curBlockSize);
|
|
|
+ }
|
|
|
+ if ((actualStart != start) || (actualEnd != end)) {
|
|
|
+ throw new IOException("Asked for byte range " + start +
|
|
|
+ "-" + end + ", but only received range " + actualStart +
|
|
|
+ "-" + actualEnd);
|
|
|
+ }
|
|
|
+ int nread = in.read(buf, offset, (int)(end - start + 1));
|
|
|
+ } catch (IOException ex) {
|
|
|
+ // Put chosen node into dead list, continue
|
|
|
+ LOG.info("Failed to connect to " + targetAddr + ":" + ex);
|
|
|
+ deadNodes.add(chosenNode);
|
|
|
+ if (dn != null) {
|
|
|
+ try {
|
|
|
+ dn.close();
|
|
|
+ } catch (IOException iex) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dn = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int read(long position, byte[] buf, int off, int len)
|
|
|
+ throws IOException {
|
|
|
+ // sanity checks
|
|
|
+ checkOpen();
|
|
|
+ if (closed) {
|
|
|
+ throw new IOException("Stream closed");
|
|
|
+ }
|
|
|
+ if ((position < 0) || (position > filelen)) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ int realLen = len;
|
|
|
+ if ((position + len) > filelen) {
|
|
|
+ realLen = (int)(filelen - position);
|
|
|
+ }
|
|
|
+ // determine the block and byte range within the block
|
|
|
+ // corresponding to position and realLen
|
|
|
+ int targetBlock = -1;
|
|
|
+ long targetStart = 0;
|
|
|
+ long targetEnd = 0;
|
|
|
+ for (int idx = 0; idx < blocks.length; idx++) {
|
|
|
+ long blocklen = blocks[idx].getNumBytes();
|
|
|
+ targetEnd = targetStart + blocklen - 1;
|
|
|
+ if (position >= targetStart && position <= targetEnd) {
|
|
|
+ targetBlock = idx;
|
|
|
+ targetStart = position - targetStart;
|
|
|
+ targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
|
|
|
+ realLen = (int)(targetEnd - targetStart + 1);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ targetStart += blocklen;
|
|
|
+ }
|
|
|
+ if (targetBlock < 0) {
|
|
|
+ throw new IOException(
|
|
|
+ "Impossible situation: could not find target position "+
|
|
|
+ position);
|
|
|
+ }
|
|
|
+ fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
|
|
|
+ return realLen;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Seek to a new arbitrary location
|
|
|
*/
|
|
|
public synchronized void seek(long targetPos) throws IOException {
|
|
|
- if (targetPos >= filelen) {
|
|
|
+ if (targetPos > filelen) {
|
|
|
throw new IOException("Cannot seek after EOF");
|
|
|
}
|
|
|
pos = targetPos;
|