|
@@ -18,17 +18,13 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.net.HttpURLConnection;
|
|
|
-import java.net.MalformedURLException;
|
|
|
import java.net.URL;
|
|
|
-import java.util.StringTokenizer;
|
|
|
|
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
|
|
|
|
/**
|
|
|
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
|
@@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
|
* is made on the successive read(). The normal input stream functions are
|
|
|
* connected to the currently active input stream.
|
|
|
*/
|
|
|
-public class ByteRangeInputStream extends FSInputStream {
|
|
|
+public abstract class ByteRangeInputStream extends FSInputStream {
|
|
|
|
|
|
/**
|
|
|
* This class wraps a URL and provides method to open connection.
|
|
|
* It can be overridden to change how a connection is opened.
|
|
|
*/
|
|
|
- public static class URLOpener {
|
|
|
+ public static abstract class URLOpener {
|
|
|
protected URL url;
|
|
|
- /** The url with offset parameter */
|
|
|
- protected URL offsetUrl;
|
|
|
|
|
|
public URLOpener(URL u) {
|
|
|
url = u;
|
|
@@ -60,52 +54,9 @@ public class ByteRangeInputStream extends FSInputStream {
|
|
|
return url;
|
|
|
}
|
|
|
|
|
|
- protected HttpURLConnection openConnection() throws IOException {
|
|
|
- return (HttpURLConnection)offsetUrl.openConnection();
|
|
|
- }
|
|
|
+ protected abstract HttpURLConnection openConnection() throws IOException;
|
|
|
|
|
|
- private HttpURLConnection openConnection(final long offset) throws IOException {
|
|
|
- offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
|
|
- final HttpURLConnection conn = openConnection();
|
|
|
- conn.setRequestMethod("GET");
|
|
|
- if (offset != 0L) {
|
|
|
- conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
|
|
- }
|
|
|
- return conn;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
|
|
-
|
|
|
- /** Remove offset parameter, if there is any, from the url */
|
|
|
- static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
|
|
- String query = url.getQuery();
|
|
|
- if (query == null) {
|
|
|
- return url;
|
|
|
- }
|
|
|
- final String lower = query.toLowerCase();
|
|
|
- if (!lower.startsWith(OFFSET_PARAM_PREFIX)
|
|
|
- && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
|
|
|
- return url;
|
|
|
- }
|
|
|
-
|
|
|
- //rebuild query
|
|
|
- StringBuilder b = null;
|
|
|
- for(final StringTokenizer st = new StringTokenizer(query, "&");
|
|
|
- st.hasMoreTokens();) {
|
|
|
- final String token = st.nextToken();
|
|
|
- if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
|
|
|
- if (b == null) {
|
|
|
- b = new StringBuilder("?").append(token);
|
|
|
- } else {
|
|
|
- b.append('&').append(token);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- query = b == null? "": b.toString();
|
|
|
-
|
|
|
- final String urlStr = url.toString();
|
|
|
- return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
|
|
|
+ protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
|
|
|
}
|
|
|
|
|
|
enum StreamStatus {
|
|
@@ -120,11 +71,6 @@ public class ByteRangeInputStream extends FSInputStream {
|
|
|
|
|
|
StreamStatus status = StreamStatus.SEEK;
|
|
|
|
|
|
- /** Create an input stream with the URL. */
|
|
|
- public ByteRangeInputStream(final URL url) {
|
|
|
- this(new URLOpener(url), new URLOpener(null));
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Create with the specified URLOpeners. Original url is used to open the
|
|
|
* stream for the first time. Resolved url is used in subsequent requests.
|
|
@@ -136,6 +82,12 @@ public class ByteRangeInputStream extends FSInputStream {
|
|
|
this.resolvedURL = r;
|
|
|
}
|
|
|
|
|
|
+ protected abstract void checkResponseCode(final HttpURLConnection connection
|
|
|
+ ) throws IOException;
|
|
|
+
|
|
|
+ protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
|
|
+ ) throws IOException;
|
|
|
+
|
|
|
private InputStream getInputStream() throws IOException {
|
|
|
if (status != StreamStatus.NORMAL) {
|
|
|
|
|
@@ -150,32 +102,14 @@ public class ByteRangeInputStream extends FSInputStream {
|
|
|
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
|
|
|
|
|
final HttpURLConnection connection = opener.openConnection(startPos);
|
|
|
- try {
|
|
|
- connection.connect();
|
|
|
- final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
|
- filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
|
|
- if (HftpFileSystem.LOG.isDebugEnabled()) {
|
|
|
- HftpFileSystem.LOG.debug("filelength = " + filelength);
|
|
|
- }
|
|
|
- in = connection.getInputStream();
|
|
|
- } catch (FileNotFoundException fnfe) {
|
|
|
- throw fnfe;
|
|
|
- } catch (IOException ioe) {
|
|
|
- HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
|
|
|
- }
|
|
|
-
|
|
|
- int respCode = connection.getResponseCode();
|
|
|
- if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
|
|
|
- // We asked for a byte range but did not receive a partial content
|
|
|
- // response...
|
|
|
- throw new IOException("HTTP_PARTIAL expected, received " + respCode);
|
|
|
- } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
|
|
|
- // We asked for all bytes from the beginning but didn't receive a 200
|
|
|
- // response (none of the other 2xx codes are valid here)
|
|
|
- throw new IOException("HTTP_OK expected, received " + respCode);
|
|
|
- }
|
|
|
+ connection.connect();
|
|
|
+ checkResponseCode(connection);
|
|
|
+
|
|
|
+ final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
|
|
+ filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
|
|
+ in = connection.getInputStream();
|
|
|
|
|
|
- resolvedURL.setURL(removeOffsetParam(connection.getURL()));
|
|
|
+ resolvedURL.setURL(getResolvedUrl(connection));
|
|
|
status = StreamStatus.NORMAL;
|
|
|
}
|
|
|
|