|
@@ -23,6 +23,7 @@ import java.io.InputStream;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
|
|
|
|
|
|
+import org.apache.commons.io.input.BoundedInputStream;
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
|
|
|
|
@@ -106,8 +107,14 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
checkResponseCode(connection);
|
|
checkResponseCode(connection);
|
|
|
|
|
|
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
- filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
|
|
|
- in = connection.getInputStream();
|
|
|
|
|
|
+ if (cl == null) {
|
|
|
|
+ throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
|
|
|
+ }
|
|
|
|
+ final long streamlength = Long.parseLong(cl);
|
|
|
|
+ filelength = startPos + streamlength;
|
|
|
|
+ // Java has a bug with >2GB request streams. It won't bounds check
|
|
|
|
+ // the reads so the transfer blocks until the server times out
|
|
|
|
+ in = new BoundedInputStream(connection.getInputStream(), streamlength);
|
|
|
|
|
|
resolvedURL.setURL(getResolvedUrl(connection));
|
|
resolvedURL.setURL(getResolvedUrl(connection));
|
|
status = StreamStatus.NORMAL;
|
|
status = StreamStatus.NORMAL;
|
|
@@ -116,21 +123,27 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
return in;
|
|
return in;
|
|
}
|
|
}
|
|
|
|
|
|
- private void update(final boolean isEOF, final int n)
|
|
|
|
- throws IOException {
|
|
|
|
- if (!isEOF) {
|
|
|
|
|
|
+ private int update(final int n) throws IOException {
|
|
|
|
+ if (n != -1) {
|
|
currentPos += n;
|
|
currentPos += n;
|
|
} else if (currentPos < filelength) {
|
|
} else if (currentPos < filelength) {
|
|
throw new IOException("Got EOF but currentPos = " + currentPos
|
|
throw new IOException("Got EOF but currentPos = " + currentPos
|
|
+ " < filelength = " + filelength);
|
|
+ " < filelength = " + filelength);
|
|
}
|
|
}
|
|
|
|
+ return n;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
public int read() throws IOException {
|
|
public int read() throws IOException {
|
|
final int b = getInputStream().read();
|
|
final int b = getInputStream().read();
|
|
- update(b == -1, 1);
|
|
|
|
|
|
+ update((b == -1) ? -1 : 1);
|
|
return b;
|
|
return b;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int read(byte b[], int off, int len) throws IOException {
|
|
|
|
+ return update(getInputStream().read(b, off, len));
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Seek to the given offset from the start of the file.
|
|
* Seek to the given offset from the start of the file.
|