|
@@ -28,12 +28,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Array;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Collection;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedHashMap;
|
|
|
+import java.util.LinkedHashSet;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -179,6 +181,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
/** Category of the operation that a thread is executing. */
|
|
|
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
|
|
|
|
|
|
+ // Modules implementing groups of RPC calls
|
|
|
+ /** Erasure coding calls. */
|
|
|
+ private final ErasureCoding erasureCoding;
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Construct a router RPC server.
|
|
@@ -275,6 +281,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
// Create the client
|
|
|
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
|
|
|
this.namenodeResolver, this.rpcMonitor);
|
|
|
+
|
|
|
+ // Initialize modules
|
|
|
+ this.erasureCoding = new ErasureCoding(this);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -360,7 +369,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
* client requests.
|
|
|
* @throws UnsupportedOperationException If the operation is not supported.
|
|
|
*/
|
|
|
- private void checkOperation(OperationCategory op, boolean supported)
|
|
|
+ protected void checkOperation(OperationCategory op, boolean supported)
|
|
|
throws StandbyException, UnsupportedOperationException {
|
|
|
checkOperation(op);
|
|
|
|
|
@@ -382,7 +391,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
* @throws StandbyException If the Router is in safe mode and cannot serve
|
|
|
* client requests.
|
|
|
*/
|
|
|
- private void checkOperation(OperationCategory op) throws StandbyException {
|
|
|
+ protected void checkOperation(OperationCategory op) throws StandbyException {
|
|
|
// Log the function we are currently calling.
|
|
|
if (rpcMonitor != null) {
|
|
|
rpcMonitor.startOp();
|
|
@@ -942,8 +951,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
RemoteMethod method = new RemoteMethod("getListing",
|
|
|
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
|
|
|
new RemoteParam(), startAfter, needLocation);
|
|
|
- Map<RemoteLocation, Object> listings =
|
|
|
- rpcClient.invokeConcurrent(locations, method, false, false);
|
|
|
+ Map<RemoteLocation, DirectoryListing> listings =
|
|
|
+ rpcClient.invokeConcurrent(
|
|
|
+ locations, method, false, false, DirectoryListing.class);
|
|
|
|
|
|
Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
|
|
|
int totalRemainingEntries = 0;
|
|
@@ -952,9 +962,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
if (listings != null) {
|
|
|
// Check the subcluster listing with the smallest name
|
|
|
String lastName = null;
|
|
|
- for (Entry<RemoteLocation, Object> entry : listings.entrySet()) {
|
|
|
+ for (Entry<RemoteLocation, DirectoryListing> entry :
|
|
|
+ listings.entrySet()) {
|
|
|
RemoteLocation location = entry.getKey();
|
|
|
- DirectoryListing listing = (DirectoryListing) entry.getValue();
|
|
|
+ DirectoryListing listing = entry.getValue();
|
|
|
if (listing == null) {
|
|
|
LOG.debug("Cannot get listing from {}", location);
|
|
|
} else {
|
|
@@ -1078,11 +1089,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
|
|
|
RemoteMethod method = new RemoteMethod("getStats");
|
|
|
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> results =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
+ Map<FederationNamespaceInfo, long[]> results =
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
|
|
|
long[] combinedData = new long[STATS_ARRAY_LENGTH];
|
|
|
- for (Object o : results.values()) {
|
|
|
- long[] data = (long[]) o;
|
|
|
+ for (long[] data : results.values()) {
|
|
|
for (int i = 0; i < combinedData.length && i < data.length; i++) {
|
|
|
if (data[i] >= 0) {
|
|
|
combinedData[i] += data[i];
|
|
@@ -1115,11 +1125,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
new Class<?>[] {DatanodeReportType.class}, type);
|
|
|
|
|
|
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> results =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false, timeOutMs);
|
|
|
- for (Entry<FederationNamespaceInfo, Object> entry : results.entrySet()) {
|
|
|
+ Map<FederationNamespaceInfo, DatanodeInfo[]> results =
|
|
|
+ rpcClient.invokeConcurrent(
|
|
|
+ nss, method, true, false, timeOutMs, DatanodeInfo[].class);
|
|
|
+ for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
|
|
|
+ results.entrySet()) {
|
|
|
FederationNamespaceInfo ns = entry.getKey();
|
|
|
- DatanodeInfo[] result = (DatanodeInfo[]) entry.getValue();
|
|
|
+ DatanodeInfo[] result = entry.getValue();
|
|
|
for (DatanodeInfo node : result) {
|
|
|
String nodeId = node.getXferAddr();
|
|
|
if (!datanodesMap.containsKey(nodeId)) {
|
|
@@ -1149,10 +1161,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
|
|
|
new Class<?>[] {DatanodeReportType.class}, type);
|
|
|
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> results =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
- for (Object r : results.values()) {
|
|
|
- DatanodeStorageReport[] result = (DatanodeStorageReport[]) r;
|
|
|
+ Map<FederationNamespaceInfo, DatanodeStorageReport[]> results =
|
|
|
+ rpcClient.invokeConcurrent(
|
|
|
+ nss, method, true, false, DatanodeStorageReport[].class);
|
|
|
+ for (DatanodeStorageReport[] result : results.values()) {
|
|
|
for (DatanodeStorageReport node : result) {
|
|
|
String nodeId = node.getDatanodeInfo().getXferAddr();
|
|
|
if (!datanodesMap.containsKey(nodeId)) {
|
|
@@ -1180,17 +1192,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
new Class<?>[] {SafeModeAction.class, boolean.class},
|
|
|
action, isChecked);
|
|
|
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> results =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, true);
|
|
|
+ Map<FederationNamespaceInfo, Boolean> results =
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, true, boolean.class);
|
|
|
|
|
|
// We only report true if all the name space are in safe mode
|
|
|
int numSafemode = 0;
|
|
|
- for (Object result : results.values()) {
|
|
|
- if (result instanceof Boolean) {
|
|
|
- boolean safemode = (boolean) result;
|
|
|
- if (safemode) {
|
|
|
- numSafemode++;
|
|
|
- }
|
|
|
+ for (boolean safemode : results.values()) {
|
|
|
+ if (safemode) {
|
|
|
+ numSafemode++;
|
|
|
}
|
|
|
}
|
|
|
return numSafemode == results.size();
|
|
@@ -1203,18 +1212,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
RemoteMethod method = new RemoteMethod("restoreFailedStorage",
|
|
|
new Class<?>[] {String.class}, arg);
|
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> ret =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
+ Map<FederationNamespaceInfo, Boolean> ret =
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
|
|
|
|
|
|
boolean success = true;
|
|
|
- Object obj = ret;
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<FederationNamespaceInfo, Boolean> results =
|
|
|
- (Map<FederationNamespaceInfo, Boolean>)obj;
|
|
|
- Collection<Boolean> sucesses = results.values();
|
|
|
- for (boolean s : sucesses) {
|
|
|
+ for (boolean s : ret.values()) {
|
|
|
if (!s) {
|
|
|
success = false;
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
return success;
|
|
@@ -1227,18 +1232,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
RemoteMethod method = new RemoteMethod("saveNamespace",
|
|
|
new Class<?>[] {Long.class, Long.class}, timeWindow, txGap);
|
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> ret =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
+ Map<FederationNamespaceInfo, Boolean> ret =
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
|
|
|
|
|
|
boolean success = true;
|
|
|
- Object obj = ret;
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<FederationNamespaceInfo, Boolean> results =
|
|
|
- (Map<FederationNamespaceInfo, Boolean>)obj;
|
|
|
- Collection<Boolean> sucesses = results.values();
|
|
|
- for (boolean s : sucesses) {
|
|
|
+ for (boolean s : ret.values()) {
|
|
|
if (!s) {
|
|
|
success = false;
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
return success;
|
|
@@ -1250,17 +1251,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
|
|
|
RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
|
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> ret =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
+ Map<FederationNamespaceInfo, Long> ret =
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, false, long.class);
|
|
|
|
|
|
// Return the maximum txid
|
|
|
long txid = 0;
|
|
|
- Object obj = ret;
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<FederationNamespaceInfo, Long> results =
|
|
|
- (Map<FederationNamespaceInfo, Long>)obj;
|
|
|
- Collection<Long> txids = results.values();
|
|
|
- for (long t : txids) {
|
|
|
+ for (long t : ret.values()) {
|
|
|
if (t > txid) {
|
|
|
txid = t;
|
|
|
}
|
|
@@ -1295,17 +1291,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
RemoteMethod method = new RemoteMethod("rollingUpgrade",
|
|
|
new Class<?>[] {RollingUpgradeAction.class}, action);
|
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> ret =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
+ Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
|
|
|
+ rpcClient.invokeConcurrent(
|
|
|
+ nss, method, true, false, RollingUpgradeInfo.class);
|
|
|
|
|
|
// Return the first rolling upgrade info
|
|
|
RollingUpgradeInfo info = null;
|
|
|
- Object obj = ret;
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<FederationNamespaceInfo, RollingUpgradeInfo> results =
|
|
|
- (Map<FederationNamespaceInfo, RollingUpgradeInfo>)obj;
|
|
|
- Collection<RollingUpgradeInfo> infos = results.values();
|
|
|
- for (RollingUpgradeInfo infoNs : infos) {
|
|
|
+ for (RollingUpgradeInfo infoNs : ret.values()) {
|
|
|
if (info == null && infoNs != null) {
|
|
|
info = infoNs;
|
|
|
}
|
|
@@ -1357,10 +1349,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
final List<RemoteLocation> locations = getLocationsForPath(path, false);
|
|
|
RemoteMethod method = new RemoteMethod("getContentSummary",
|
|
|
new Class<?>[] {String.class}, new RemoteParam());
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<String, ContentSummary> results =
|
|
|
- (Map<String, ContentSummary>) ((Object)rpcClient.invokeConcurrent(
|
|
|
- locations, method, false, false));
|
|
|
+ Map<RemoteLocation, ContentSummary> results =
|
|
|
+ rpcClient.invokeConcurrent(
|
|
|
+ locations, method, false, false, ContentSummary.class);
|
|
|
summaries.addAll(results.values());
|
|
|
} catch (FileNotFoundException e) {
|
|
|
notFoundException = e;
|
|
@@ -1746,17 +1737,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
RemoteMethod method = new RemoteMethod(
|
|
|
"getCurrentEditLogTxid", new Class<?>[] {});
|
|
|
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
|
|
- Map<FederationNamespaceInfo, Object> ret =
|
|
|
- rpcClient.invokeConcurrent(nss, method, true, false);
|
|
|
+ Map<FederationNamespaceInfo, Long> ret =
|
|
|
+ rpcClient.invokeConcurrent(nss, method, true, false, long.class);
|
|
|
|
|
|
// Return the maximum txid
|
|
|
long txid = 0;
|
|
|
- Object obj = ret;
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<FederationNamespaceInfo, Long> results =
|
|
|
- (Map<FederationNamespaceInfo, Long>)obj;
|
|
|
- Collection<Long> txids = results.values();
|
|
|
- for (long t : txids) {
|
|
|
+ for (long t : ret.values()) {
|
|
|
if (t > txid) {
|
|
|
txid = t;
|
|
|
}
|
|
@@ -1789,31 +1775,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
checkOperation(OperationCategory.WRITE, false);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
|
|
|
- throws IOException {
|
|
|
- checkOperation(OperationCategory.READ, false);
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override // ClientProtocol
|
|
|
- public ErasureCodingPolicy getErasureCodingPolicy(String src)
|
|
|
- throws IOException {
|
|
|
- checkOperation(OperationCategory.READ, false);
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override // ClientProtocol
|
|
|
- public void setErasureCodingPolicy(String src, String ecPolicyName)
|
|
|
- throws IOException {
|
|
|
- checkOperation(OperationCategory.WRITE, false);
|
|
|
- }
|
|
|
-
|
|
|
- @Override // ClientProtocol
|
|
|
- public void unsetErasureCodingPolicy(String src) throws IOException {
|
|
|
- checkOperation(OperationCategory.WRITE, false);
|
|
|
- }
|
|
|
-
|
|
|
@Override // ClientProtocol
|
|
|
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
|
|
|
StorageType type) throws IOException {
|
|
@@ -1875,38 +1836,61 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
|
|
|
+ throws IOException {
|
|
|
+ return erasureCoding.getErasureCodingPolicies();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public Map<String, String> getErasureCodingCodecs() throws IOException {
|
|
|
+ return erasureCoding.getErasureCodingCodecs();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override // ClientProtocol
|
|
|
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
|
|
|
ErasureCodingPolicy[] policies) throws IOException {
|
|
|
- checkOperation(OperationCategory.WRITE, false);
|
|
|
- return null;
|
|
|
+ return erasureCoding.addErasureCodingPolicies(policies);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void removeErasureCodingPolicy(String arg0) throws IOException {
|
|
|
- checkOperation(OperationCategory.WRITE, false);
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public void removeErasureCodingPolicy(String ecPolicyName)
|
|
|
+ throws IOException {
|
|
|
+ erasureCoding.removeErasureCodingPolicy(ecPolicyName);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void disableErasureCodingPolicy(String arg0) throws IOException {
|
|
|
- checkOperation(OperationCategory.WRITE, false);
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public void disableErasureCodingPolicy(String ecPolicyName)
|
|
|
+ throws IOException {
|
|
|
+ erasureCoding.disableErasureCodingPolicy(ecPolicyName);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void enableErasureCodingPolicy(String arg0) throws IOException {
|
|
|
- checkOperation(OperationCategory.WRITE, false);
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public void enableErasureCodingPolicy(String ecPolicyName)
|
|
|
+ throws IOException {
|
|
|
+ erasureCoding.enableErasureCodingPolicy(ecPolicyName);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public ECBlockGroupStats getECBlockGroupStats() throws IOException {
|
|
|
- checkOperation(OperationCategory.READ, false);
|
|
|
- return null;
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public ErasureCodingPolicy getErasureCodingPolicy(String src)
|
|
|
+ throws IOException {
|
|
|
+ return erasureCoding.getErasureCodingPolicy(src);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public void setErasureCodingPolicy(String src, String ecPolicyName)
|
|
|
+ throws IOException {
|
|
|
+ erasureCoding.setErasureCodingPolicy(src, ecPolicyName);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override // ClientProtocol
|
|
|
+ public void unsetErasureCodingPolicy(String src) throws IOException {
|
|
|
+ erasureCoding.unsetErasureCodingPolicy(src);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Map<String, String> getErasureCodingCodecs() throws IOException {
|
|
|
- checkOperation(OperationCategory.READ, false);
|
|
|
- return null;
|
|
|
+ public ECBlockGroupStats getECBlockGroupStats() throws IOException {
|
|
|
+ return erasureCoding.getECBlockGroupStats();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1974,7 +1958,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
* @return Prioritized list of locations in the federated cluster.
|
|
|
* @throws IOException If the location for this path cannot be determined.
|
|
|
*/
|
|
|
- private List<RemoteLocation> getLocationsForPath(
|
|
|
+ protected List<RemoteLocation> getLocationsForPath(
|
|
|
String path, boolean failIfLocked) throws IOException {
|
|
|
try {
|
|
|
// Check the location for this path
|
|
@@ -2097,4 +2081,37 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|
|
UserGroupInformation ugi = Server.getRemoteUser();
|
|
|
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Merge the outputs from multiple namespaces.
|
|
|
+ * @param map Namespace -> Output array.
|
|
|
+ * @param clazz Class of the values.
|
|
|
+ * @return Array with the outputs.
|
|
|
+ */
|
|
|
+ protected static <T> T[] merge(
|
|
|
+ Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) {
|
|
|
+
|
|
|
+ // Put all results into a set to avoid repeats
|
|
|
+ Set<T> ret = new LinkedHashSet<>();
|
|
|
+ for (T[] values : map.values()) {
|
|
|
+ for (T val : values) {
|
|
|
+ ret.add(val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return toArray(ret, clazz);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Convert a set of values into an array.
|
|
|
+ * @param set Input set.
|
|
|
+ * @param clazz Class of the values.
|
|
|
+ * @return Array with the values in set.
|
|
|
+ */
|
|
|
+ private static <T> T[] toArray(Set<T> set, Class<T> clazz) {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
|
|
|
+ combinedData = set.toArray(combinedData);
|
|
|
+ return combinedData;
|
|
|
+ }
|
|
|
}
|