|
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
|
|
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
|
|
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
|
|
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
|
@@ -165,6 +166,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT;
|
|
|
|
+
|
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
|
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
|
|
@@ -299,7 +307,10 @@ public class NameNode extends ReconfigurableBase implements
|
|
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
FS_PROTECTED_DIRECTORIES,
|
|
FS_PROTECTED_DIRECTORIES,
|
|
HADOOP_CALLER_CONTEXT_ENABLED_KEY,
|
|
HADOOP_CALLER_CONTEXT_ENABLED_KEY,
|
|
- DFS_STORAGE_POLICY_SATISFIER_MODE_KEY));
|
|
|
|
|
|
+ DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
|
|
|
|
+ DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
|
|
|
+ DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
|
|
|
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION));
|
|
|
|
|
|
private static final String USAGE = "Usage: hdfs namenode ["
|
|
private static final String USAGE = "Usage: hdfs namenode ["
|
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
|
@@ -2125,12 +2136,63 @@ public class NameNode extends ReconfigurableBase implements
|
|
return reconfigureIPCBackoffEnabled(newVal);
|
|
return reconfigureIPCBackoffEnabled(newVal);
|
|
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
|
|
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
|
|
return reconfigureSPSModeEvent(newVal, property);
|
|
return reconfigureSPSModeEvent(newVal, property);
|
|
|
|
+ } else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
|
|
|
|
+ || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
|
|
|
|
+ || property.equals(
|
|
|
|
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
|
|
|
|
+ return reconfReplicationParameters(newVal, property);
|
|
} else {
|
|
} else {
|
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
|
property));
|
|
property));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private String reconfReplicationParameters(final String newVal,
|
|
|
|
+ final String property) throws ReconfigurationException {
|
|
|
|
+ BlockManager bm = namesystem.getBlockManager();
|
|
|
|
+ int newSetting;
|
|
|
|
+ namesystem.writeLock();
|
|
|
|
+ try {
|
|
|
|
+ if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) {
|
|
|
|
+ bm.setMaxReplicationStreams(
|
|
|
|
+ adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal));
|
|
|
|
+ newSetting = bm.getMaxReplicationStreams();
|
|
|
|
+ } else if (property.equals(
|
|
|
|
+ DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) {
|
|
|
|
+ bm.setReplicationStreamsHardLimit(
|
|
|
|
+ adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT,
|
|
|
|
+ newVal));
|
|
|
|
+ newSetting = bm.getReplicationStreamsHardLimit();
|
|
|
|
+ } else if (
|
|
|
|
+ property.equals(
|
|
|
|
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
|
|
|
|
+ bm.setBlocksReplWorkMultiplier(
|
|
|
|
+ adjustNewVal(
|
|
|
|
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
|
|
|
|
+ newVal));
|
|
|
|
+ newSetting = bm.getBlocksReplWorkMultiplier();
|
|
|
|
+ } else {
|
|
|
|
+ throw new IllegalArgumentException("Unexpected property " +
|
|
|
|
+ property + "in reconfReplicationParameters");
|
|
|
|
+ }
|
|
|
|
+ LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
|
|
|
|
+ return String.valueOf(newSetting);
|
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
|
+ throw new ReconfigurationException(property, newVal, getConf().get(
|
|
|
|
+ property), e);
|
|
|
|
+ } finally {
|
|
|
|
+ namesystem.writeUnlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private int adjustNewVal(int defaultVal, String newVal) {
|
|
|
|
+ if (newVal == null) {
|
|
|
|
+ return defaultVal;
|
|
|
|
+ } else {
|
|
|
|
+ return Integer.parseInt(newVal);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
|
|
private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
|
|
final String property, final String newVal)
|
|
final String property, final String newVal)
|
|
throws ReconfigurationException {
|
|
throws ReconfigurationException {
|