|
@@ -25,12 +25,14 @@ import java.io.InputStream;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.MalformedURLException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.StringTokenizer;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -62,7 +64,6 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
@@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
+import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
@@ -388,9 +390,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
|
|
|
return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
|
|
|
- f.getBlockSize(), f.getModificationTime(),
|
|
|
- f.getAccessTime(),
|
|
|
+ f.getBlockSize(), f.getModificationTime(), f.getAccessTime(),
|
|
|
f.getPermission(), f.getOwner(), f.getGroup(),
|
|
|
+ f.isSymlink() ? new Path(f.getSymlink()) : null,
|
|
|
f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory()));
|
|
|
}
|
|
|
|
|
@@ -532,24 +534,84 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
statistics.incrementReadOps(1);
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
|
|
final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
|
|
|
- ByteRangeInputStream str = getByteRangeInputStream(url);
|
|
|
- return new FSDataInputStream(str);
|
|
|
+ return new FSDataInputStream(new OffsetUrlInputStream(
|
|
|
+ new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
|
|
|
}
|
|
|
|
|
|
- private class URLOpener extends ByteRangeInputStream.URLOpener {
|
|
|
-
|
|
|
- public URLOpener(URL u) {
|
|
|
- super(u);
|
|
|
+ class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|
|
|
+ /** The url with offset parameter */
|
|
|
+ private URL offsetUrl;
|
|
|
+
|
|
|
+ OffsetUrlOpener(final URL url) {
|
|
|
+ super(url);
|
|
|
}
|
|
|
|
|
|
+ /** Open connection with offset url. */
|
|
|
@Override
|
|
|
- public HttpURLConnection openConnection() throws IOException {
|
|
|
+ protected HttpURLConnection openConnection() throws IOException {
|
|
|
return getHttpUrlConnection(offsetUrl);
|
|
|
}
|
|
|
+
|
|
|
+ /** Setup offset url before open connection. */
|
|
|
+ @Override
|
|
|
+ protected HttpURLConnection openConnection(final long offset) throws IOException {
|
|
|
+ offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
|
|
+ final HttpURLConnection conn = openConnection();
|
|
|
+ conn.setRequestMethod("GET");
|
|
|
+ return conn;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- private ByteRangeInputStream getByteRangeInputStream(URL url) {
|
|
|
- return new ByteRangeInputStream(new URLOpener(url), new URLOpener(null));
|
|
|
+
|
|
|
+ private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
|
|
+
|
|
|
+ /** Remove offset parameter, if there is any, from the url */
|
|
|
+ static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
|
|
+ String query = url.getQuery();
|
|
|
+ if (query == null) {
|
|
|
+ return url;
|
|
|
+ }
|
|
|
+ final String lower = query.toLowerCase();
|
|
|
+ if (!lower.startsWith(OFFSET_PARAM_PREFIX)
|
|
|
+ && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
|
|
|
+ return url;
|
|
|
+ }
|
|
|
+
|
|
|
+ //rebuild query
|
|
|
+ StringBuilder b = null;
|
|
|
+ for(final StringTokenizer st = new StringTokenizer(query, "&");
|
|
|
+ st.hasMoreTokens();) {
|
|
|
+ final String token = st.nextToken();
|
|
|
+ if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
|
|
|
+ if (b == null) {
|
|
|
+ b = new StringBuilder("?").append(token);
|
|
|
+ } else {
|
|
|
+ b.append('&').append(token);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ query = b == null? "": b.toString();
|
|
|
+
|
|
|
+ final String urlStr = url.toString();
|
|
|
+ return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
|
|
|
+ }
|
|
|
+
|
|
|
+ static class OffsetUrlInputStream extends ByteRangeInputStream {
|
|
|
+ OffsetUrlInputStream(URLOpener o, URLOpener r) {
|
|
|
+ super(o, r);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void checkResponseCode(final HttpURLConnection connection
|
|
|
+ ) throws IOException {
|
|
|
+ validateResponse(GetOpParam.Op.OPEN, connection);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Remove offset parameter before returning the resolved url. */
|
|
|
+ @Override
|
|
|
+ protected URL getResolvedUrl(final HttpURLConnection connection
|
|
|
+ ) throws MalformedURLException {
|
|
|
+ return removeOffsetParam(connection.getURL());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -641,7 +703,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final long offset, final long length) throws IOException {
|
|
|
statistics.incrementReadOps(1);
|
|
|
|
|
|
- final HttpOpParam.Op op = GetOpParam.Op.GETFILEBLOCKLOCATIONS;
|
|
|
+ final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
|
|
|
final Map<?, ?> m = run(op, p, new OffsetParam(offset),
|
|
|
new LengthParam(length));
|
|
|
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
|