|
@@ -56,14 +56,12 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
|
import org.apache.hadoop.hdfs.HAUtilClient;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.web.resources.*;
|
|
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -145,8 +143,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
setConf(conf);
|
|
|
/** set user pattern based on configuration file */
|
|
|
UserParam.setUserPattern(conf.get(
|
|
|
- DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
|
|
- DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
|
|
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
|
|
|
|
|
connectionFactory = URLConnectionFactory
|
|
|
.newDefaultURLConnectionFactory(conf);
|
|
@@ -172,7 +170,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
|
|
|
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
|
|
|
- SafeModeException.class);
|
|
|
+ HdfsConstantsClient.SAFEMODE_EXCEPTION_CLASS_NAME);
|
|
|
} else {
|
|
|
|
|
|
int maxFailoverAttempts = conf.getInt(
|
|
@@ -209,8 +207,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/** Is WebHDFS enabled in conf? */
|
|
|
public static boolean isEnabled(final Configuration conf, final Log log) {
|
|
|
- final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
|
|
|
- DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
|
|
|
+ final boolean b = conf.getBoolean(
|
|
|
+ HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
|
|
|
return b;
|
|
|
}
|
|
|
|
|
@@ -230,7 +229,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Using UGI token: " + token);
|
|
|
}
|
|
|
- canRefreshDelegationToken = false;
|
|
|
+ canRefreshDelegationToken = false;
|
|
|
} else {
|
|
|
token = getDelegationToken(null);
|
|
|
if (token != null) {
|
|
@@ -263,15 +262,15 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
@Override
|
|
|
@VisibleForTesting
|
|
|
public int getDefaultPort() {
|
|
|
- return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
|
+ return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public URI getUri() {
|
|
|
return this.uri;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected URI canonicalizeUri(URI uri) {
|
|
|
return NetUtils.getCanonicalUri(uri, getDefaultPort());
|
|
@@ -295,8 +294,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
@Override
|
|
|
public synchronized void setWorkingDirectory(final Path dir) {
|
|
|
String result = makeAbsolute(dir).toUri().getPath();
|
|
|
- if (!DFSUtil.isValidName(result)) {
|
|
|
- throw new IllegalArgumentException("Invalid DFS directory name " +
|
|
|
+ if (!DFSUtilClient.isValidName(result)) {
|
|
|
+ throw new IllegalArgumentException("Invalid DFS directory name " +
|
|
|
result);
|
|
|
}
|
|
|
workingDir = makeAbsolute(dir);
|
|
@@ -375,10 +374,10 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/**
|
|
|
* Covert an exception to an IOException.
|
|
|
- *
|
|
|
+ *
|
|
|
* For a non-IOException, wrap it with IOException.
|
|
|
* For a RemoteException, unwrap it.
|
|
|
- * For an IOException which is not a RemoteException, return it.
|
|
|
+ * For an IOException which is not a RemoteException, return it.
|
|
|
*/
|
|
|
private static IOException toIOException(Exception e) {
|
|
|
if (!(e instanceof IOException)) {
|
|
@@ -421,9 +420,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
return url;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
|
|
|
- List<Param<?,?>> authParams = Lists.newArrayList();
|
|
|
+ List<Param<?,?>> authParams = Lists.newArrayList();
|
|
|
// Skip adding delegation token for token operations because these
|
|
|
// operations require authentication.
|
|
|
Token<?> token = null;
|
|
@@ -502,11 +501,11 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/**
|
|
|
* Two-step requests redirected to a DN
|
|
|
- *
|
|
|
+ *
|
|
|
* Create/Append:
|
|
|
- * Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
* Step 2) Submit another Http request with the URL from the Location header with data.
|
|
|
- *
|
|
|
+ *
|
|
|
* The reason of having two-step create/append is for preventing clients to
|
|
|
* send out the data before the redirect. This issue is addressed by the
|
|
|
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
|
|
@@ -514,7 +513,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
* and Java 6 http client), which do not correctly implement "Expect:
|
|
|
* 100-continue". The two-step create/append is a temporary workaround for
|
|
|
* the software library bugs.
|
|
|
- *
|
|
|
+ *
|
|
|
* Open/Checksum
|
|
|
* Also implements two-step connects for other operations redirected to
|
|
|
* a DN such as open and checksum
|
|
@@ -523,7 +522,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
//redirect hostname and port
|
|
|
String redirectHost = null;
|
|
|
|
|
|
-
|
|
|
+
|
|
|
// resolve redirects for a DN operation unless already resolved
|
|
|
if (op.getRedirect() && !redirected) {
|
|
|
final HttpOpParam.Op redirectOp =
|
|
@@ -553,7 +552,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}
|
|
|
throw ioe;
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
|
|
@@ -565,7 +564,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
conn.setInstanceFollowRedirects(false);
|
|
|
switch (op.getType()) {
|
|
|
// if not sending a message body for a POST or PUT operation, need
|
|
|
- // to ensure the server/proxy knows this
|
|
|
+ // to ensure the server/proxy knows this
|
|
|
case POST:
|
|
|
case PUT: {
|
|
|
conn.setDoOutput(true);
|
|
@@ -673,21 +672,21 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
|
|
|
private final Path fspath;
|
|
|
private final Param<?,?>[] parameters;
|
|
|
-
|
|
|
+
|
|
|
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
|
|
|
Param<?,?>... parameters) {
|
|
|
super(op, false);
|
|
|
this.fspath = fspath;
|
|
|
this.parameters = parameters;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
|
|
|
final Path fspath) {
|
|
|
super(op, false);
|
|
|
this.fspath = fspath;
|
|
|
this.parameters = parameters;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected URL getUrl() throws IOException {
|
|
|
if (excludeDatanodes.getValue() != null) {
|
|
@@ -708,7 +707,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
Void getResponse(HttpURLConnection conn) throws IOException {
|
|
|
return null;
|
|
@@ -723,12 +722,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
|
|
|
final Path fspath) {
|
|
|
super(op, parameters, fspath);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
final T getResponse(HttpURLConnection conn) throws IOException {
|
|
|
try {
|
|
@@ -751,7 +750,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
conn.disconnect();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
abstract T decodeResponse(Map<?,?> json) throws IOException;
|
|
|
}
|
|
|
|
|
@@ -762,7 +761,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
Boolean decodeResponse(Map<?,?> json) throws IOException {
|
|
|
return (Boolean)json.get("boolean");
|
|
@@ -774,13 +773,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
*/
|
|
|
class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
|
|
|
private final int bufferSize;
|
|
|
-
|
|
|
+
|
|
|
FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
|
|
|
Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
this.bufferSize = bufferSize;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
FSDataOutputStream getResponse(final HttpURLConnection conn)
|
|
|
throws IOException {
|
|
@@ -812,7 +811,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return conn;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Used by open() which tracks the resolved url itself
|
|
|
*/
|
|
@@ -926,26 +925,26 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
new RenameOptionSetParam(options)
|
|
|
).run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
- public void setXAttr(Path p, String name, byte[] value,
|
|
|
+ public void setXAttr(Path p, String name, byte[] value,
|
|
|
EnumSet<XAttrSetFlag> flag) throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
|
|
|
if (value != null) {
|
|
|
new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
|
|
|
- XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
|
|
|
+ XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
|
|
|
new XAttrSetFlagParam(flag)).run();
|
|
|
} else {
|
|
|
- new FsPathRunner(op, p, new XAttrNameParam(name),
|
|
|
+ new FsPathRunner(op, p, new XAttrNameParam(name),
|
|
|
new XAttrSetFlagParam(flag)).run();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public byte[] getXAttr(Path p, final String name) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
|
|
- return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
|
|
|
+ return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
|
|
|
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
|
|
@Override
|
|
|
byte[] decodeResponse(Map<?, ?> json) throws IOException {
|
|
@@ -953,11 +952,11 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}.run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public Map<String, byte[]> getXAttrs(Path p) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
|
|
- return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
|
|
|
+ return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
|
|
|
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
|
|
@Override
|
|
|
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
|
|
@@ -965,18 +964,18 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}.run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
- public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
|
|
|
+ public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
|
|
|
throws IOException {
|
|
|
- Preconditions.checkArgument(names != null && !names.isEmpty(),
|
|
|
+ Preconditions.checkArgument(names != null && !names.isEmpty(),
|
|
|
"XAttr names cannot be null or empty.");
|
|
|
Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
|
|
|
for (int i = 0; i < parameters.length - 1; i++) {
|
|
|
parameters[i] = new XAttrNameParam(names.get(i));
|
|
|
}
|
|
|
parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
|
|
|
-
|
|
|
+
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
|
|
return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
|
|
|
@Override
|
|
@@ -985,7 +984,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}.run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public List<String> listXAttrs(Path p) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
|
|
@@ -1065,7 +1064,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Path createSnapshot(final Path path, final String snapshotName)
|
|
|
+ public Path createSnapshot(final Path path, final String snapshotName)
|
|
|
throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
|
|
@@ -1119,14 +1118,14 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
@Override
|
|
|
public long getDefaultBlockSize() {
|
|
|
- return getConf().getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
|
|
- DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
|
|
+ return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public short getDefaultReplication() {
|
|
|
- return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
- DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1236,7 +1235,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final URL offsetUrl = offset == 0L? url
|
|
|
: new URL(url + "&" + new OffsetParam(offset));
|
|
|
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
|
@@ -1367,7 +1366,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
new TokenArgumentParam(token.encodeToUrlString())
|
|
|
).run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
|
|
final long offset, final long length) throws IOException {
|
|
@@ -1378,7 +1377,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public BlockLocation[] getFileBlockLocations(final Path p,
|
|
|
+ public BlockLocation[] getFileBlockLocations(final Path p,
|
|
|
final long offset, final long length) throws IOException {
|
|
|
statistics.incrementReadOps(1);
|
|
|
|
|
@@ -1387,7 +1386,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
new OffsetParam(offset), new LengthParam(length)) {
|
|
|
@Override
|
|
|
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
|
|
|
- return DFSUtil.locatedBlocks2Locations(
|
|
|
+ return DFSUtilClient.locatedBlocks2Locations(
|
|
|
JsonUtilClient.toLocatedBlocks(json));
|
|
|
}
|
|
|
}.run();
|
|
@@ -1416,7 +1415,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
|
|
|
) throws IOException {
|
|
|
statistics.incrementReadOps(1);
|
|
|
-
|
|
|
+
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
|
|
|
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
|
|
|
@Override
|