|
@@ -154,19 +154,18 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
return f.isAbsolute()? f: new Path(workingDir, f);
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private static <T> T jsonParse(final InputStream in) throws IOException {
|
|
|
+ private static Map<?, ?> jsonParse(final InputStream in) throws IOException {
|
|
|
if (in == null) {
|
|
|
throw new IOException("The input stream is null.");
|
|
|
}
|
|
|
- return (T)JSON.parse(new InputStreamReader(in));
|
|
|
+ return (Map<?, ?>)JSON.parse(new InputStreamReader(in));
|
|
|
}
|
|
|
|
|
|
- private static void validateResponse(final HttpOpParam.Op op,
|
|
|
+ private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
|
|
|
final HttpURLConnection conn) throws IOException {
|
|
|
final int code = conn.getResponseCode();
|
|
|
if (code != op.getExpectedHttpResponseCode()) {
|
|
|
- final Map<String, Object> m;
|
|
|
+ final Map<?, ?> m;
|
|
|
try {
|
|
|
m = jsonParse(conn.getErrorStream());
|
|
|
} catch(IOException e) {
|
|
@@ -175,6 +174,10 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
+ ", message=" + conn.getResponseMessage(), e);
|
|
|
}
|
|
|
|
|
|
+ if (m.get(RemoteException.class.getSimpleName()) == null) {
|
|
|
+ return m;
|
|
|
+ }
|
|
|
+
|
|
|
final RemoteException re = JsonUtil.toRemoteException(m);
|
|
|
throw re.unwrapRemoteException(AccessControlException.class,
|
|
|
DSQuotaExceededException.class,
|
|
@@ -185,6 +188,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
NSQuotaExceededException.class,
|
|
|
UnresolvedPathException.class);
|
|
|
}
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
|
@@ -235,15 +239,15 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
* @param op http operation
|
|
|
* @param fspath file system path
|
|
|
* @param parameters parameters for the operation
|
|
|
- * @return a JSON object, e.g. Object[], Map<String, Object>, etc.
|
|
|
+ * @return a JSON object, e.g. Object[], Map<?, ?>, etc.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private <T> T run(final HttpOpParam.Op op, final Path fspath,
|
|
|
+ private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
|
|
|
final Param<?,?>... parameters) throws IOException {
|
|
|
final HttpURLConnection conn = httpConnect(op, fspath, parameters);
|
|
|
- validateResponse(op, conn);
|
|
|
try {
|
|
|
- return WebHdfsFileSystem.<T>jsonParse(conn.getInputStream());
|
|
|
+ final Map<?, ?> m = validateResponse(op, conn);
|
|
|
+ return m != null? m: jsonParse(conn.getInputStream());
|
|
|
} finally {
|
|
|
conn.disconnect();
|
|
|
}
|
|
@@ -258,7 +262,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
|
|
|
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
|
|
|
- final Map<String, Object> json = run(op, f);
|
|
|
+ final Map<?, ?> json = run(op, f);
|
|
|
final HdfsFileStatus status = JsonUtil.toFileStatus(json, true);
|
|
|
if (status == null) {
|
|
|
throw new FileNotFoundException("File does not exist: " + f);
|
|
@@ -284,7 +288,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
|
|
|
- final Map<String, Object> json = run(op, f,
|
|
|
+ final Map<?, ?> json = run(op, f,
|
|
|
new PermissionParam(applyUMask(permission)));
|
|
|
return (Boolean)json.get("boolean");
|
|
|
}
|
|
@@ -293,7 +297,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
public boolean rename(final Path src, final Path dst) throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
|
|
|
- final Map<String, Object> json = run(op, src,
|
|
|
+ final Map<?, ?> json = run(op, src,
|
|
|
new DestinationParam(makeQualified(dst).toUri().getPath()));
|
|
|
return (Boolean)json.get("boolean");
|
|
|
}
|
|
@@ -333,8 +337,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
) throws IOException {
|
|
|
statistics.incrementWriteOps(1);
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
|
|
|
- final Map<String, Object> json = run(op, p,
|
|
|
- new ReplicationParam(replication));
|
|
|
+ final Map<?, ?> json = run(op, p, new ReplicationParam(replication));
|
|
|
return (Boolean)json.get("boolean");
|
|
|
}
|
|
|
|
|
@@ -403,7 +406,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
@Override
|
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
|
|
|
- final Map<String, Object> json = run(op, f, new RecursiveParam(recursive));
|
|
|
+ final Map<?, ?> json = run(op, f, new RecursiveParam(recursive));
|
|
|
return (Boolean)json.get("boolean");
|
|
|
}
|
|
|
|
|
@@ -428,8 +431,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
//convert FileStatus
|
|
|
final FileStatus[] statuses = new FileStatus[array.length];
|
|
|
for(int i = 0; i < array.length; i++) {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- final Map<String, Object> m = (Map<String, Object>)array[i];
|
|
|
+ final Map<?, ?> m = (Map<?, ?>)array[i];
|
|
|
statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
|
|
|
}
|
|
|
return statuses;
|
|
@@ -439,7 +441,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer
|
|
|
) throws IOException {
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
|
|
- final Map<String, Object> m = run(op, null, new RenewerParam(renewer));
|
|
|
+ final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
|
|
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
|
|
token.setService(new Text(getCanonicalServiceName()));
|
|
|
return token;
|
|
@@ -467,7 +469,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
statistics.incrementReadOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETFILEBLOCKLOCATIONS;
|
|
|
- final Map<String, Object> m = run(op, p, new OffsetParam(offset),
|
|
|
+ final Map<?, ?> m = run(op, p, new OffsetParam(offset),
|
|
|
new LengthParam(length));
|
|
|
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
|
|
|
}
|
|
@@ -477,7 +479,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
statistics.incrementReadOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
|
|
|
- final Map<String, Object> m = run(op, p);
|
|
|
+ final Map<?, ?> m = run(op, p);
|
|
|
return JsonUtil.toContentSummary(m);
|
|
|
}
|
|
|
|
|
@@ -487,7 +489,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
statistics.incrementReadOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
|
|
|
- final Map<String, Object> m = run(op, p);
|
|
|
+ final Map<?, ?> m = run(op, p);
|
|
|
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
|
|
|
}
|
|
|
}
|