|
@@ -56,14 +56,12 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
|
import org.apache.hadoop.hdfs.HAUtilClient;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.web.resources.*;
|
|
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -145,8 +143,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
setConf(conf);
|
|
|
/** set user pattern based on configuration file */
|
|
|
UserParam.setUserPattern(conf.get(
|
|
|
- DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
|
|
- DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
|
|
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
|
|
|
|
|
connectionFactory = URLConnectionFactory
|
|
|
.newDefaultURLConnectionFactory(conf);
|
|
@@ -173,7 +171,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
|
|
|
HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
|
|
|
- SafeModeException.class);
|
|
|
+ HdfsConstantsClient.SAFEMODE_EXCEPTION_CLASS_NAME);
|
|
|
} else {
|
|
|
|
|
|
int maxFailoverAttempts = conf.getInt(
|
|
@@ -224,7 +222,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Using UGI token: " + token);
|
|
|
}
|
|
|
- canRefreshDelegationToken = false;
|
|
|
+ canRefreshDelegationToken = false;
|
|
|
} else {
|
|
|
token = getDelegationToken(null);
|
|
|
if (token != null) {
|
|
@@ -256,14 +254,14 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
@Override
|
|
|
protected int getDefaultPort() {
|
|
|
- return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
|
|
|
+ return HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public URI getUri() {
|
|
|
return this.uri;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected URI canonicalizeUri(URI uri) {
|
|
|
return NetUtils.getCanonicalUri(uri, getDefaultPort());
|
|
@@ -287,8 +285,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
@Override
|
|
|
public synchronized void setWorkingDirectory(final Path dir) {
|
|
|
String result = makeAbsolute(dir).toUri().getPath();
|
|
|
- if (!DFSUtil.isValidName(result)) {
|
|
|
- throw new IllegalArgumentException("Invalid DFS directory name " +
|
|
|
+ if (!DFSUtilClient.isValidName(result)) {
|
|
|
+ throw new IllegalArgumentException("Invalid DFS directory name " +
|
|
|
result);
|
|
|
}
|
|
|
workingDir = makeAbsolute(dir);
|
|
@@ -367,10 +365,10 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/**
|
|
|
* Covert an exception to an IOException.
|
|
|
- *
|
|
|
+ *
|
|
|
* For a non-IOException, wrap it with IOException.
|
|
|
* For a RemoteException, unwrap it.
|
|
|
- * For an IOException which is not a RemoteException, return it.
|
|
|
+ * For an IOException which is not a RemoteException, return it.
|
|
|
*/
|
|
|
private static IOException toIOException(Exception e) {
|
|
|
if (!(e instanceof IOException)) {
|
|
@@ -413,9 +411,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
return url;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
|
|
|
- List<Param<?,?>> authParams = Lists.newArrayList();
|
|
|
+ List<Param<?,?>> authParams = Lists.newArrayList();
|
|
|
// Skip adding delegation token for token operations because these
|
|
|
// operations require authentication.
|
|
|
Token<?> token = null;
|
|
@@ -494,11 +492,11 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/**
|
|
|
* Two-step requests redirected to a DN
|
|
|
- *
|
|
|
+ *
|
|
|
* Create/Append:
|
|
|
- * Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
* Step 2) Submit another Http request with the URL from the Location header with data.
|
|
|
- *
|
|
|
+ *
|
|
|
* The reason of having two-step create/append is for preventing clients to
|
|
|
* send out the data before the redirect. This issue is addressed by the
|
|
|
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
|
|
@@ -506,7 +504,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
* and Java 6 http client), which do not correctly implement "Expect:
|
|
|
* 100-continue". The two-step create/append is a temporary workaround for
|
|
|
* the software library bugs.
|
|
|
- *
|
|
|
+ *
|
|
|
* Open/Checksum
|
|
|
* Also implements two-step connects for other operations redirected to
|
|
|
* a DN such as open and checksum
|
|
@@ -515,7 +513,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
//redirect hostname and port
|
|
|
String redirectHost = null;
|
|
|
|
|
|
-
|
|
|
+
|
|
|
// resolve redirects for a DN operation unless already resolved
|
|
|
if (op.getRedirect() && !redirected) {
|
|
|
final HttpOpParam.Op redirectOp =
|
|
@@ -545,7 +543,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}
|
|
|
throw ioe;
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
|
|
@@ -557,7 +555,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
conn.setInstanceFollowRedirects(false);
|
|
|
switch (op.getType()) {
|
|
|
// if not sending a message body for a POST or PUT operation, need
|
|
|
- // to ensure the server/proxy knows this
|
|
|
+ // to ensure the server/proxy knows this
|
|
|
case POST:
|
|
|
case PUT: {
|
|
|
conn.setDoOutput(true);
|
|
@@ -665,21 +663,21 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
|
|
|
private final Path fspath;
|
|
|
private final Param<?,?>[] parameters;
|
|
|
-
|
|
|
+
|
|
|
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
|
|
|
Param<?,?>... parameters) {
|
|
|
super(op, false);
|
|
|
this.fspath = fspath;
|
|
|
this.parameters = parameters;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
|
|
|
final Path fspath) {
|
|
|
super(op, false);
|
|
|
this.fspath = fspath;
|
|
|
this.parameters = parameters;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected URL getUrl() throws IOException {
|
|
|
if (excludeDatanodes.getValue() != null) {
|
|
@@ -700,7 +698,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
Void getResponse(HttpURLConnection conn) throws IOException {
|
|
|
return null;
|
|
@@ -715,12 +713,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
|
|
|
final Path fspath) {
|
|
|
super(op, parameters, fspath);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
final T getResponse(HttpURLConnection conn) throws IOException {
|
|
|
try {
|
|
@@ -743,7 +741,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
conn.disconnect();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
abstract T decodeResponse(Map<?,?> json) throws IOException;
|
|
|
}
|
|
|
|
|
@@ -754,7 +752,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
Boolean decodeResponse(Map<?,?> json) throws IOException {
|
|
|
return (Boolean)json.get("boolean");
|
|
@@ -766,13 +764,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
*/
|
|
|
class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
|
|
|
private final int bufferSize;
|
|
|
-
|
|
|
+
|
|
|
FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
|
|
|
Param<?,?>... parameters) {
|
|
|
super(op, fspath, parameters);
|
|
|
this.bufferSize = bufferSize;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
FSDataOutputStream getResponse(final HttpURLConnection conn)
|
|
|
throws IOException {
|
|
@@ -804,7 +802,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return conn;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Used by open() which tracks the resolved url itself
|
|
|
*/
|
|
@@ -918,26 +916,26 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
new RenameOptionSetParam(options)
|
|
|
).run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
- public void setXAttr(Path p, String name, byte[] value,
|
|
|
+ public void setXAttr(Path p, String name, byte[] value,
|
|
|
EnumSet<XAttrSetFlag> flag) throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
|
|
|
if (value != null) {
|
|
|
new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
|
|
|
- XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
|
|
|
+ XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
|
|
|
new XAttrSetFlagParam(flag)).run();
|
|
|
} else {
|
|
|
- new FsPathRunner(op, p, new XAttrNameParam(name),
|
|
|
+ new FsPathRunner(op, p, new XAttrNameParam(name),
|
|
|
new XAttrSetFlagParam(flag)).run();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public byte[] getXAttr(Path p, final String name) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
|
|
- return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
|
|
|
+ return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
|
|
|
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
|
|
@Override
|
|
|
byte[] decodeResponse(Map<?, ?> json) throws IOException {
|
|
@@ -945,11 +943,11 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}.run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public Map<String, byte[]> getXAttrs(Path p) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
|
|
- return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
|
|
|
+ return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
|
|
|
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
|
|
@Override
|
|
|
Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
|
|
@@ -957,18 +955,18 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}.run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
- public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
|
|
|
+ public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
|
|
|
throws IOException {
|
|
|
- Preconditions.checkArgument(names != null && !names.isEmpty(),
|
|
|
+ Preconditions.checkArgument(names != null && !names.isEmpty(),
|
|
|
"XAttr names cannot be null or empty.");
|
|
|
Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
|
|
|
for (int i = 0; i < parameters.length - 1; i++) {
|
|
|
parameters[i] = new XAttrNameParam(names.get(i));
|
|
|
}
|
|
|
parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
|
|
|
-
|
|
|
+
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
|
|
return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
|
|
|
@Override
|
|
@@ -977,7 +975,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}.run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public List<String> listXAttrs(Path p) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
|
|
@@ -1057,7 +1055,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Path createSnapshot(final Path path, final String snapshotName)
|
|
|
+ public Path createSnapshot(final Path path, final String snapshotName)
|
|
|
throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
|
|
@@ -1111,14 +1109,14 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
@Override
|
|
|
public long getDefaultBlockSize() {
|
|
|
- return getConf().getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
|
|
- DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
|
|
+ return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public short getDefaultReplication() {
|
|
|
- return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
- DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1228,7 +1226,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final URL offsetUrl = offset == 0L? url
|
|
|
: new URL(url + "&" + new OffsetParam(offset));
|
|
|
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
|
@@ -1359,7 +1357,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
new TokenArgumentParam(token.encodeToUrlString())
|
|
|
).run();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
|
|
final long offset, final long length) throws IOException {
|
|
@@ -1370,7 +1368,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public BlockLocation[] getFileBlockLocations(final Path p,
|
|
|
+ public BlockLocation[] getFileBlockLocations(final Path p,
|
|
|
final long offset, final long length) throws IOException {
|
|
|
statistics.incrementReadOps(1);
|
|
|
|
|
@@ -1379,7 +1377,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
new OffsetParam(offset), new LengthParam(length)) {
|
|
|
@Override
|
|
|
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
|
|
|
- return DFSUtil.locatedBlocks2Locations(
|
|
|
+ return DFSUtilClient.locatedBlocks2Locations(
|
|
|
JsonUtilClient.toLocatedBlocks(json));
|
|
|
}
|
|
|
}.run();
|
|
@@ -1408,7 +1406,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
|
|
|
) throws IOException {
|
|
|
statistics.incrementReadOps(1);
|
|
|
-
|
|
|
+
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
|
|
|
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
|
|
|
@Override
|