|
@@ -27,6 +27,8 @@ import org.apache.commons.io.input.BoundedInputStream;
|
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
|
|
* created each time. This class hides the complexity of those multiple
|
|
@@ -61,7 +63,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
}
|
|
|
|
|
|
enum StreamStatus {
|
|
|
- NORMAL, SEEK
|
|
|
+ NORMAL, SEEK, CLOSED
|
|
|
}
|
|
|
protected InputStream in;
|
|
|
protected URLOpener originalURL;
|
|
@@ -89,40 +91,51 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
|
|
) throws IOException;
|
|
|
|
|
|
- private InputStream getInputStream() throws IOException {
|
|
|
- if (status != StreamStatus.NORMAL) {
|
|
|
-
|
|
|
- if (in != null) {
|
|
|
- in.close();
|
|
|
- in = null;
|
|
|
- }
|
|
|
-
|
|
|
- // Use the original url if no resolved url exists, eg. if
|
|
|
- // it's the first time a request is made.
|
|
|
- final URLOpener opener =
|
|
|
- (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
|
|
-
|
|
|
- final HttpURLConnection connection = opener.openConnection(startPos);
|
|
|
- connection.connect();
|
|
|
- checkResponseCode(connection);
|
|
|
-
|
|
|
- final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
|
- if (cl == null) {
|
|
|
- throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
|
|
- }
|
|
|
- final long streamlength = Long.parseLong(cl);
|
|
|
- filelength = startPos + streamlength;
|
|
|
- // Java has a bug with >2GB request streams. It won't bounds check
|
|
|
- // the reads so the transfer blocks until the server times out
|
|
|
- in = new BoundedInputStream(connection.getInputStream(), streamlength);
|
|
|
-
|
|
|
- resolvedURL.setURL(getResolvedUrl(connection));
|
|
|
- status = StreamStatus.NORMAL;
|
|
|
+ @VisibleForTesting
|
|
|
+ protected InputStream getInputStream() throws IOException {
|
|
|
+ switch (status) {
|
|
|
+ case NORMAL:
|
|
|
+ break;
|
|
|
+ case SEEK:
|
|
|
+ if (in != null) {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ in = openInputStream();
|
|
|
+ status = StreamStatus.NORMAL;
|
|
|
+ break;
|
|
|
+ case CLOSED:
|
|
|
+ throw new IOException("Stream closed");
|
|
|
}
|
|
|
-
|
|
|
return in;
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ protected InputStream openInputStream() throws IOException {
|
|
|
+ // Use the original url if no resolved url exists, eg. if
|
|
|
+ // it's the first time a request is made.
|
|
|
+ final URLOpener opener =
|
|
|
+ (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
|
|
+
|
|
|
+ final HttpURLConnection connection = opener.openConnection(startPos);
|
|
|
+ connection.connect();
|
|
|
+ checkResponseCode(connection);
|
|
|
+
|
|
|
+ final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
|
+ if (cl == null) {
|
|
|
+ throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
|
|
+ }
|
|
|
+ final long streamlength = Long.parseLong(cl);
|
|
|
+ filelength = startPos + streamlength;
|
|
|
+ // Java has a bug with >2GB request streams. It won't bounds check
|
|
|
+ // the reads so the transfer blocks until the server times out
|
|
|
+ InputStream is =
|
|
|
+ new BoundedInputStream(connection.getInputStream(), streamlength);
|
|
|
+
|
|
|
+ resolvedURL.setURL(getResolvedUrl(connection));
|
|
|
+
|
|
|
+ return is;
|
|
|
+ }
|
|
|
+
|
|
|
private int update(final int n) throws IOException {
|
|
|
if (n != -1) {
|
|
|
currentPos += n;
|
|
@@ -150,17 +163,21 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
* The next read() will be from that location. Can't
|
|
|
* seek past the end of the file.
|
|
|
*/
|
|
|
+ @Override
|
|
|
public void seek(long pos) throws IOException {
|
|
|
if (pos != currentPos) {
|
|
|
startPos = pos;
|
|
|
currentPos = pos;
|
|
|
- status = StreamStatus.SEEK;
|
|
|
+ if (status != StreamStatus.CLOSED) {
|
|
|
+ status = StreamStatus.SEEK;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Return the current offset from the start of the file
|
|
|
*/
|
|
|
+ @Override
|
|
|
public long getPos() throws IOException {
|
|
|
return currentPos;
|
|
|
}
|
|
@@ -169,7 +186,17 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
* Seeks a different copy of the data. Returns true if
|
|
|
* found a new source, false otherwise.
|
|
|
*/
|
|
|
+ @Override
|
|
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
return false;
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ if (in != null) {
|
|
|
+ in.close();
|
|
|
+ in = null;
|
|
|
+ }
|
|
|
+ status = StreamStatus.CLOSED;
|
|
|
+ }
|
|
|
+}
|