|
@@ -22,12 +22,15 @@ import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.URL;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.StringTokenizer;
|
|
|
|
|
|
import org.apache.commons.io.input.BoundedInputStream;
|
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.net.HttpHeaders;
|
|
|
|
|
|
/**
|
|
|
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
|
@@ -70,7 +73,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
protected URLOpener resolvedURL;
|
|
|
protected long startPos = 0;
|
|
|
protected long currentPos = 0;
|
|
|
- protected long filelength;
|
|
|
+ protected Long fileLength = null;
|
|
|
|
|
|
StreamStatus status = StreamStatus.SEEK;
|
|
|
|
|
@@ -120,28 +123,60 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
connection.connect();
|
|
|
checkResponseCode(connection);
|
|
|
|
|
|
- final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
|
- if (cl == null) {
|
|
|
- throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
|
|
+ resolvedURL.setURL(getResolvedUrl(connection));
|
|
|
+
|
|
|
+ InputStream in = connection.getInputStream();
|
|
|
+ final Map<String, List<String>> headers = connection.getHeaderFields();
|
|
|
+ if (isChunkedTransferEncoding(headers)) {
|
|
|
+ // file length is not known
|
|
|
+ fileLength = null;
|
|
|
+ } else {
|
|
|
+ // for non-chunked transfer-encoding, get content-length
|
|
|
+ final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
|
|
|
+ if (cl == null) {
|
|
|
+ throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
|
|
|
+ + headers);
|
|
|
+ }
|
|
|
+ final long streamlength = Long.parseLong(cl);
|
|
|
+ fileLength = startPos + streamlength;
|
|
|
+
|
|
|
+ // Java has a bug with >2GB request streams. It won't bounds check
|
|
|
+ // the reads so the transfer blocks until the server times out
|
|
|
+ in = new BoundedInputStream(in, streamlength);
|
|
|
}
|
|
|
- final long streamlength = Long.parseLong(cl);
|
|
|
- filelength = startPos + streamlength;
|
|
|
- // Java has a bug with >2GB request streams. It won't bounds check
|
|
|
- // the reads so the transfer blocks until the server times out
|
|
|
- InputStream is =
|
|
|
- new BoundedInputStream(connection.getInputStream(), streamlength);
|
|
|
|
|
|
- resolvedURL.setURL(getResolvedUrl(connection));
|
|
|
-
|
|
|
- return is;
|
|
|
+ return in;
|
|
|
}
|
|
|
|
|
|
+ private static boolean isChunkedTransferEncoding(
|
|
|
+ final Map<String, List<String>> headers) {
|
|
|
+ return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
|
|
|
+ || contains(headers, HttpHeaders.TE, "chunked");
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Does the HTTP header map contain the given key, value pair? */
|
|
|
+ private static boolean contains(final Map<String, List<String>> headers,
|
|
|
+ final String key, final String value) {
|
|
|
+ final List<String> values = headers.get(key);
|
|
|
+ if (values != null) {
|
|
|
+ for(String v : values) {
|
|
|
+ for(final StringTokenizer t = new StringTokenizer(v, ",");
|
|
|
+ t.hasMoreTokens(); ) {
|
|
|
+ if (value.equalsIgnoreCase(t.nextToken())) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
private int update(final int n) throws IOException {
|
|
|
if (n != -1) {
|
|
|
currentPos += n;
|
|
|
- } else if (currentPos < filelength) {
|
|
|
+ } else if (fileLength != null && currentPos < fileLength) {
|
|
|
throw new IOException("Got EOF but currentPos = " + currentPos
|
|
|
- + " < filelength = " + filelength);
|
|
|
+ + " < filelength = " + fileLength);
|
|
|
}
|
|
|
return n;
|
|
|
}
|