|
@@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.HAUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
-import org.apache.hadoop.hdfs.web.TokenAspect.DTSelecorByKind;
|
|
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
@@ -99,7 +98,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Charsets;
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
|
@@ -119,8 +117,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/** Delegation token kind */
|
|
|
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
|
|
- protected TokenAspect<WebHdfsFileSystem> tokenAspect = new TokenAspect<WebHdfsFileSystem>(
|
|
|
- this, TOKEN_KIND);
|
|
|
+ protected TokenAspect<WebHdfsFileSystem> tokenAspect;
|
|
|
|
|
|
private UserGroupInformation ugi;
|
|
|
private URI uri;
|
|
@@ -141,17 +138,44 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return SCHEME;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * return the underlying transport protocol (http / https).
|
|
|
+ */
|
|
|
+ protected String getTransportScheme() {
|
|
|
+ return "http";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize tokenAspect. This function is intended to
|
|
|
+ * be overridden by SWebHdfsFileSystem.
|
|
|
+ */
|
|
|
+ protected synchronized void initializeTokenAspect() {
|
|
|
+ tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize connectionFactory. This function is intended to
|
|
|
+ * be overridden by SWebHdfsFileSystem.
|
|
|
+ */
|
|
|
+ protected void initializeConnectionFactory(Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public synchronized void initialize(URI uri, Configuration conf
|
|
|
) throws IOException {
|
|
|
super.initialize(uri, conf);
|
|
|
setConf(conf);
|
|
|
+ initializeTokenAspect();
|
|
|
+ initializeConnectionFactory(conf);
|
|
|
+
|
|
|
ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
|
|
try {
|
|
|
this.uri = new URI(uri.getScheme(), uri.getAuthority(), null,
|
|
|
null, null);
|
|
|
- this.nnAddrs = DFSUtil.resolve(this.uri, getDefaultPort(), conf);
|
|
|
+ this.nnAddrs = DFSUtil.resolveWebHdfsUri(this.uri, conf);
|
|
|
} catch (URISyntaxException e) {
|
|
|
throw new IllegalArgumentException(e);
|
|
|
}
|
|
@@ -343,7 +367,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
*/
|
|
|
private URL getNamenodeURL(String path, String query) throws IOException {
|
|
|
InetSocketAddress nnAddr = getCurrentNNAddr();
|
|
|
- final URL url = new URL("http", nnAddr.getHostName(),
|
|
|
+ final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
|
|
|
nnAddr.getPort(), path + '?' + query);
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("url=" + url);
|
|
@@ -841,7 +865,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
super.close();
|
|
|
- tokenAspect.removeRenewAction();
|
|
|
+ synchronized (this) {
|
|
|
+ tokenAspect.removeRenewAction();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|