|
@@ -363,7 +363,6 @@ public class DataNode extends ReconfigurableBase
|
|
|
private SecureResources secureResources = null;
|
|
|
// dataDirs must be accessed while holding the DataNode lock.
|
|
|
private List<StorageLocation> dataDirs;
|
|
|
- private Configuration conf;
|
|
|
private final String confVersion;
|
|
|
private final long maxNumberOfBlocksToLog;
|
|
|
private final boolean pipelineSupportECN;
|
|
@@ -419,7 +418,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
this.confVersion = null;
|
|
|
this.usersWithLocalPathAccess = null;
|
|
|
this.connectToDnViaHostname = false;
|
|
|
- this.blockScanner = new BlockScanner(this, conf);
|
|
|
+ this.blockScanner = new BlockScanner(this, this.getConf());
|
|
|
this.pipelineSupportECN = false;
|
|
|
this.checkDiskErrorInterval =
|
|
|
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
|
|
@@ -438,7 +437,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
this.tracer = createTracer(conf);
|
|
|
this.tracerConfigurationManager =
|
|
|
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
|
|
|
- this.blockScanner = new BlockScanner(this, conf);
|
|
|
+ this.blockScanner = new BlockScanner(this);
|
|
|
this.lastDiskErrorCheck = 0;
|
|
|
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
|
|
|
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
|
|
@@ -487,7 +486,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
try {
|
|
|
hostName = getHostName(conf);
|
|
|
LOG.info("Configured hostname is " + hostName);
|
|
|
- startDataNode(conf, dataDirs, resources);
|
|
|
+ startDataNode(dataDirs, resources);
|
|
|
} catch (IOException ie) {
|
|
|
shutdown();
|
|
|
throw ie;
|
|
@@ -527,7 +526,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
try {
|
|
|
LOG.info("Reconfiguring " + property + " to " + newVal);
|
|
|
this.refreshVolumes(newVal);
|
|
|
- return conf.get(DFS_DATANODE_DATA_DIR_KEY);
|
|
|
+ return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
|
|
|
} catch (IOException e) {
|
|
|
rootException = e;
|
|
|
} finally {
|
|
@@ -650,7 +649,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
|
|
|
// Use the existing StorageLocation to detect storage type changes.
|
|
|
Map<String, StorageLocation> existingLocations = new HashMap<>();
|
|
|
- for (StorageLocation loc : getStorageLocations(this.conf)) {
|
|
|
+ for (StorageLocation loc : getStorageLocations(getConf())) {
|
|
|
existingLocations.put(loc.getFile().getCanonicalPath(), loc);
|
|
|
}
|
|
|
|
|
@@ -846,7 +845,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
it.remove();
|
|
|
}
|
|
|
}
|
|
|
- conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
|
|
|
+ getConf().set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
|
|
|
|
|
|
if (ioe != null) {
|
|
|
throw ioe;
|
|
@@ -904,14 +903,14 @@ public class DataNode extends ReconfigurableBase
|
|
|
* for information related to the different configuration options and
|
|
|
* Http Policy is decided.
|
|
|
*/
|
|
|
- private void startInfoServer(Configuration conf)
|
|
|
+ private void startInfoServer()
|
|
|
throws IOException {
|
|
|
// SecureDataNodeStarter will bind the privileged port to the channel if
|
|
|
// the DN is started by JSVC, pass it along.
|
|
|
ServerSocketChannel httpServerChannel = secureResources != null ?
|
|
|
secureResources.getHttpServerChannel() : null;
|
|
|
|
|
|
- this.httpServer = new DatanodeHttpServer(conf, this, httpServerChannel);
|
|
|
+ httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
|
|
|
httpServer.start();
|
|
|
if (httpServer.getHttpAddress() != null) {
|
|
|
infoPort = httpServer.getHttpAddress().getPort();
|
|
@@ -933,24 +932,24 @@ public class DataNode extends ReconfigurableBase
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void initIpcServer(Configuration conf) throws IOException {
|
|
|
+ private void initIpcServer() throws IOException {
|
|
|
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
|
|
|
- conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
|
|
|
+ getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
|
|
|
|
|
|
// Add all the RPC protocols that the Datanode implements
|
|
|
- RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
|
|
|
+ RPC.setProtocolEngine(getConf(), ClientDatanodeProtocolPB.class,
|
|
|
ProtobufRpcEngine.class);
|
|
|
ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
|
|
|
new ClientDatanodeProtocolServerSideTranslatorPB(this);
|
|
|
BlockingService service = ClientDatanodeProtocolService
|
|
|
.newReflectiveBlockingService(clientDatanodeProtocolXlator);
|
|
|
- ipcServer = new RPC.Builder(conf)
|
|
|
+ ipcServer = new RPC.Builder(getConf())
|
|
|
.setProtocol(ClientDatanodeProtocolPB.class)
|
|
|
.setInstance(service)
|
|
|
.setBindAddress(ipcAddr.getHostName())
|
|
|
.setPort(ipcAddr.getPort())
|
|
|
.setNumHandlers(
|
|
|
- conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
|
|
|
+ getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
|
|
|
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
|
|
|
.setSecretManager(blockPoolTokenSecretManager).build();
|
|
|
|
|
@@ -958,29 +957,32 @@ public class DataNode extends ReconfigurableBase
|
|
|
= new ReconfigurationProtocolServerSideTranslatorPB(this);
|
|
|
service = ReconfigurationProtocolService
|
|
|
.newReflectiveBlockingService(reconfigurationProtocolXlator);
|
|
|
- DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, service,
|
|
|
+ DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
|
|
|
ipcServer);
|
|
|
|
|
|
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
|
|
|
new InterDatanodeProtocolServerSideTranslatorPB(this);
|
|
|
service = InterDatanodeProtocolService
|
|
|
.newReflectiveBlockingService(interDatanodeProtocolXlator);
|
|
|
- DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
|
|
|
+ DFSUtil.addPBProtocol(getConf(), InterDatanodeProtocolPB.class, service,
|
|
|
ipcServer);
|
|
|
|
|
|
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
|
|
|
new TraceAdminProtocolServerSideTranslatorPB(this);
|
|
|
BlockingService traceAdminService = TraceAdminService
|
|
|
.newReflectiveBlockingService(traceAdminXlator);
|
|
|
- DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
|
|
|
+ DFSUtil.addPBProtocol(
|
|
|
+ getConf(),
|
|
|
+ TraceAdminProtocolPB.class,
|
|
|
+ traceAdminService,
|
|
|
ipcServer);
|
|
|
|
|
|
LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
|
|
|
|
|
|
// set service-level authorization security policy
|
|
|
- if (conf.getBoolean(
|
|
|
+ if (getConf().getBoolean(
|
|
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
|
|
- ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
|
|
+ ipcServer.refreshServiceAcl(getConf(), new HDFSPolicyProvider());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1071,17 +1073,17 @@ public class DataNode extends ReconfigurableBase
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void initDataXceiver(Configuration conf) throws IOException {
|
|
|
+ private void initDataXceiver() throws IOException {
|
|
|
// find free port or use privileged port provided
|
|
|
TcpPeerServer tcpPeerServer;
|
|
|
if (secureResources != null) {
|
|
|
tcpPeerServer = new TcpPeerServer(secureResources);
|
|
|
} else {
|
|
|
- int backlogLength = conf.getInt(
|
|
|
+ int backlogLength = getConf().getInt(
|
|
|
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
|
|
|
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
|
|
|
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
|
|
|
- DataNode.getStreamingAddr(conf), backlogLength);
|
|
|
+ DataNode.getStreamingAddr(getConf()), backlogLength);
|
|
|
}
|
|
|
if (dnConf.getTransferSocketRecvBufferSize() > 0) {
|
|
|
tcpPeerServer.setReceiveBufferSize(
|
|
@@ -1090,24 +1092,27 @@ public class DataNode extends ReconfigurableBase
|
|
|
streamingAddr = tcpPeerServer.getStreamingAddr();
|
|
|
LOG.info("Opened streaming server at " + streamingAddr);
|
|
|
this.threadGroup = new ThreadGroup("dataXceiverServer");
|
|
|
- xserver = new DataXceiverServer(tcpPeerServer, conf, this);
|
|
|
+ xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
|
|
|
this.dataXceiverServer = new Daemon(threadGroup, xserver);
|
|
|
this.threadGroup.setDaemon(true); // auto destroy when empty
|
|
|
|
|
|
- if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
|
|
|
- conf.getBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
- HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
|
|
|
+ if (getConf().getBoolean(
|
|
|
+ HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
|
|
+ HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
|
|
|
+ getConf().getBoolean(
|
|
|
+ HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
+ HdfsClientConfigKeys
|
|
|
+ .DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
|
|
|
DomainPeerServer domainPeerServer =
|
|
|
- getDomainPeerServer(conf, streamingAddr.getPort());
|
|
|
+ getDomainPeerServer(getConf(), streamingAddr.getPort());
|
|
|
if (domainPeerServer != null) {
|
|
|
this.localDataXceiverServer = new Daemon(threadGroup,
|
|
|
- new DataXceiverServer(domainPeerServer, conf, this));
|
|
|
+ new DataXceiverServer(domainPeerServer, getConf(), this));
|
|
|
LOG.info("Listening on UNIX domain socket: " +
|
|
|
domainPeerServer.getBindPath());
|
|
|
}
|
|
|
}
|
|
|
- this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
|
|
|
+ this.shortCircuitRegistry = new ShortCircuitRegistry(getConf());
|
|
|
}
|
|
|
|
|
|
private static DomainPeerServer getDomainPeerServer(Configuration conf,
|
|
@@ -1288,26 +1293,23 @@ public class DataNode extends ReconfigurableBase
|
|
|
/**
|
|
|
* This method starts the data node with the specified conf.
|
|
|
*
|
|
|
- * @param conf - the configuration
|
|
|
- * if conf's CONFIG_PROPERTY_SIMULATED property is set
|
|
|
- * then a simulated storage based data node is created.
|
|
|
+ * If conf's CONFIG_PROPERTY_SIMULATED property is set
|
|
|
+ * then a simulated storage based data node is created.
|
|
|
*
|
|
|
* @param dataDirs - only for a non-simulated storage data node
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void startDataNode(Configuration conf,
|
|
|
- List<StorageLocation> dataDirs,
|
|
|
+ void startDataNode(List<StorageLocation> dataDirectories,
|
|
|
SecureResources resources
|
|
|
) throws IOException {
|
|
|
|
|
|
// settings global for all BPs in the Data Node
|
|
|
this.secureResources = resources;
|
|
|
synchronized (this) {
|
|
|
- this.dataDirs = dataDirs;
|
|
|
+ this.dataDirs = dataDirectories;
|
|
|
}
|
|
|
- this.conf = conf;
|
|
|
- this.dnConf = new DNConf(conf);
|
|
|
- checkSecureConfig(dnConf, conf, resources);
|
|
|
+ this.dnConf = new DNConf(this);
|
|
|
+ checkSecureConfig(dnConf, getConf(), resources);
|
|
|
|
|
|
if (dnConf.maxLockedMemory > 0) {
|
|
|
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
|
|
@@ -1347,10 +1349,10 @@ public class DataNode extends ReconfigurableBase
|
|
|
|
|
|
// global DN settings
|
|
|
registerMXBean();
|
|
|
- initDataXceiver(conf);
|
|
|
- startInfoServer(conf);
|
|
|
+ initDataXceiver();
|
|
|
+ startInfoServer();
|
|
|
pauseMonitor = new JvmPauseMonitor();
|
|
|
- pauseMonitor.init(conf);
|
|
|
+ pauseMonitor.init(getConf());
|
|
|
pauseMonitor.start();
|
|
|
|
|
|
// BlockPoolTokenSecretManager is required to create ipc server.
|
|
@@ -1360,24 +1362,24 @@ public class DataNode extends ReconfigurableBase
|
|
|
dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
LOG.info("dnUserName = " + dnUserName);
|
|
|
LOG.info("supergroup = " + supergroup);
|
|
|
- initIpcServer(conf);
|
|
|
+ initIpcServer();
|
|
|
|
|
|
- metrics = DataNodeMetrics.create(conf, getDisplayName());
|
|
|
+ metrics = DataNodeMetrics.create(getConf(), getDisplayName());
|
|
|
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
|
|
|
|
|
|
- ecWorker = new ErasureCodingWorker(conf, this);
|
|
|
+ ecWorker = new ErasureCodingWorker(getConf(), this);
|
|
|
blockRecoveryWorker = new BlockRecoveryWorker(this);
|
|
|
|
|
|
blockPoolManager = new BlockPoolManager(this);
|
|
|
- blockPoolManager.refreshNamenodes(conf);
|
|
|
+ blockPoolManager.refreshNamenodes(getConf());
|
|
|
|
|
|
// Create the ReadaheadPool from the DataNode context so we can
|
|
|
// exit without having to explicitly shutdown its thread pool.
|
|
|
readaheadPool = ReadaheadPool.getInstance();
|
|
|
- saslClient = new SaslDataTransferClient(dnConf.conf,
|
|
|
+ saslClient = new SaslDataTransferClient(dnConf.getConf(),
|
|
|
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
|
|
|
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
|
|
- startMetricsLogger(conf);
|
|
|
+ startMetricsLogger();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1592,10 +1594,10 @@ public class DataNode extends ReconfigurableBase
|
|
|
// failures.
|
|
|
checkDiskError();
|
|
|
|
|
|
- data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
|
|
+ data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
|
|
|
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
|
|
|
- initDirectoryScanner(conf);
|
|
|
- initDiskBalancer(data, conf);
|
|
|
+ initDirectoryScanner(getConf());
|
|
|
+ initDiskBalancer(data, getConf());
|
|
|
}
|
|
|
|
|
|
List<BPOfferService> getAllBpOs() {
|
|
@@ -1616,10 +1618,10 @@ public class DataNode extends ReconfigurableBase
|
|
|
*/
|
|
|
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
|
|
|
final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
|
|
|
- = FsDatasetSpi.Factory.getFactory(conf);
|
|
|
+ = FsDatasetSpi.Factory.getFactory(getConf());
|
|
|
|
|
|
if (!factory.isSimulated()) {
|
|
|
- final StartupOption startOpt = getStartupOption(conf);
|
|
|
+ final StartupOption startOpt = getStartupOption(getConf());
|
|
|
if (startOpt == null) {
|
|
|
throw new IOException("Startup option not set.");
|
|
|
}
|
|
@@ -1639,7 +1641,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
|
|
|
synchronized(this) {
|
|
|
if (data == null) {
|
|
|
- data = factory.newInstance(this, storage, conf);
|
|
|
+ data = factory.newInstance(this, storage, getConf());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1720,7 +1722,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
*/
|
|
|
DatanodeProtocolClientSideTranslatorPB connectToNN(
|
|
|
InetSocketAddress nnAddr) throws IOException {
|
|
|
- return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
|
|
|
+ return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1733,7 +1735,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN(
|
|
|
InetSocketAddress lifelineNnAddr) throws IOException {
|
|
|
return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr,
|
|
|
- conf);
|
|
|
+ getConf());
|
|
|
}
|
|
|
|
|
|
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
|
@@ -2388,7 +2390,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
unbufIn = saslStreams.in;
|
|
|
|
|
|
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
|
|
- DFSUtilClient.getSmallBufferSize(conf)));
|
|
|
+ DFSUtilClient.getSmallBufferSize(getConf())));
|
|
|
in = new DataInputStream(unbufIn);
|
|
|
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
|
|
false, false, true, DataNode.this, null, cachingStrategy);
|
|
@@ -2508,7 +2510,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
}
|
|
|
ipcServer.setTracer(tracer);
|
|
|
ipcServer.start();
|
|
|
- startPlugins(conf);
|
|
|
+ startPlugins(getConf());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3051,8 +3053,8 @@ public class DataNode extends ReconfigurableBase
|
|
|
@Override // ClientDatanodeProtocol
|
|
|
public void refreshNamenodes() throws IOException {
|
|
|
checkSuperuserPrivilege();
|
|
|
- conf = new Configuration();
|
|
|
- refreshNamenodes(conf);
|
|
|
+ setConf(new Configuration());
|
|
|
+ refreshNamenodes(getConf());
|
|
|
}
|
|
|
|
|
|
@Override // ClientDatanodeProtocol
|
|
@@ -3327,8 +3329,8 @@ public class DataNode extends ReconfigurableBase
|
|
|
Token<BlockTokenIdentifier> blockToken)
|
|
|
throws IOException {
|
|
|
|
|
|
- return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient,
|
|
|
- NetUtils.getDefaultSocketFactory(getConf()), false,
|
|
|
+ return DFSUtilClient.connectToDN(datanodeID, timeout, getConf(),
|
|
|
+ saslClient, NetUtils.getDefaultSocketFactory(getConf()), false,
|
|
|
getDataEncryptionKeyFactoryForBlock(block), blockToken);
|
|
|
}
|
|
|
|
|
@@ -3341,7 +3343,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
final int numOobTypes = oobEnd - oobStart + 1;
|
|
|
oobTimeouts = new long[numOobTypes];
|
|
|
|
|
|
- final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
|
|
|
+ final String[] ele = getConf().get(DFS_DATANODE_OOB_TIMEOUT_KEY,
|
|
|
DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
|
|
|
for (int i = 0; i < numOobTypes; i++) {
|
|
|
oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
|
|
@@ -3367,10 +3369,9 @@ public class DataNode extends ReconfigurableBase
|
|
|
* Start a timer to periodically write DataNode metrics to the log file. This
|
|
|
* behavior can be disabled by configuration.
|
|
|
*
|
|
|
- * @param metricConf
|
|
|
*/
|
|
|
- protected void startMetricsLogger(Configuration metricConf) {
|
|
|
- long metricsLoggerPeriodSec = metricConf.getInt(
|
|
|
+ protected void startMetricsLogger() {
|
|
|
+ long metricsLoggerPeriodSec = getConf().getInt(
|
|
|
DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
|
|
|
DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
|
|
|
|