|
@@ -30,6 +30,9 @@ import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
|
|
|
|
+import org.apache.hadoop.ozone.protocolPB
|
|
|
|
+ .ScmBlockLocationProtocolServerSideTranslatorPB;
|
|
import org.apache.hadoop.ozone.scm.block.BlockManager;
|
|
import org.apache.hadoop.ozone.scm.block.BlockManager;
|
|
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
|
|
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
|
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|
@@ -73,6 +76,7 @@ import org.apache.hadoop.ozone.protocol.proto
|
|
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
|
|
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
|
|
import org.apache.hadoop.ozone.protocolPB
|
|
import org.apache.hadoop.ozone.protocolPB
|
|
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
|
|
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
|
|
|
|
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
|
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
|
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
|
import org.apache.hadoop.ozone.protocolPB
|
|
import org.apache.hadoop.ozone.protocolPB
|
|
.StorageContainerLocationProtocolServerSideTranslatorPB;
|
|
.StorageContainerLocationProtocolServerSideTranslatorPB;
|
|
@@ -97,8 +101,10 @@ import java.util.Set;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
|
|
|
|
import static org.apache.hadoop.ozone.protocol.proto
|
|
import static org.apache.hadoop.ozone.protocol.proto
|
|
- .StorageContainerLocationProtocolProtos.DeleteScmBlockResult.Result;
|
|
|
|
|
|
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.scm.ScmConfigKeys
|
|
|
|
+ .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
|
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
|
.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
|
.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
|
@@ -146,6 +152,10 @@ public class StorageContainerManager
|
|
private final RPC.Server clientRpcServer;
|
|
private final RPC.Server clientRpcServer;
|
|
private final InetSocketAddress clientRpcAddress;
|
|
private final InetSocketAddress clientRpcAddress;
|
|
|
|
|
|
|
|
+ /** The RPC server that listens to requests from block service clients. */
|
|
|
|
+ private final RPC.Server blockRpcServer;
|
|
|
|
+ private final InetSocketAddress blockRpcAddress;
|
|
|
|
+
|
|
/** SCM mxbean. */
|
|
/** SCM mxbean. */
|
|
private ObjectName scmInfoBeanName;
|
|
private ObjectName scmInfoBeanName;
|
|
|
|
|
|
@@ -173,6 +183,8 @@ public class StorageContainerManager
|
|
ProtobufRpcEngine.class);
|
|
ProtobufRpcEngine.class);
|
|
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
|
|
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
|
|
ProtobufRpcEngine.class);
|
|
ProtobufRpcEngine.class);
|
|
|
|
+ RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
|
|
|
|
+ ProtobufRpcEngine.class);
|
|
|
|
|
|
BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
|
|
BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
|
|
StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
|
|
StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
|
|
@@ -186,12 +198,12 @@ public class StorageContainerManager
|
|
datanodeRpcAddress = updateListenAddress(conf,
|
|
datanodeRpcAddress = updateListenAddress(conf,
|
|
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
|
|
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
|
|
|
|
|
|
|
|
+ // SCM Container Service RPC
|
|
BlockingService storageProtoPbService =
|
|
BlockingService storageProtoPbService =
|
|
StorageContainerLocationProtocolProtos
|
|
StorageContainerLocationProtocolProtos
|
|
.StorageContainerLocationProtocolService
|
|
.StorageContainerLocationProtocolService
|
|
.newReflectiveBlockingService(
|
|
.newReflectiveBlockingService(
|
|
- new StorageContainerLocationProtocolServerSideTranslatorPB(
|
|
|
|
- this, this));
|
|
|
|
|
|
+ new StorageContainerLocationProtocolServerSideTranslatorPB(this));
|
|
|
|
|
|
final InetSocketAddress scmAddress =
|
|
final InetSocketAddress scmAddress =
|
|
OzoneClientUtils.getScmClientBindAddress(conf);
|
|
OzoneClientUtils.getScmClientBindAddress(conf);
|
|
@@ -201,6 +213,22 @@ public class StorageContainerManager
|
|
clientRpcAddress = updateListenAddress(conf,
|
|
clientRpcAddress = updateListenAddress(conf,
|
|
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
|
|
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
|
|
|
|
|
|
|
|
+
|
|
|
|
+ // SCM Block Service RPC
|
|
|
|
+ BlockingService blockProtoPbService =
|
|
|
|
+ ScmBlockLocationProtocolProtos
|
|
|
|
+ .ScmBlockLocationProtocolService
|
|
|
|
+ .newReflectiveBlockingService(
|
|
|
|
+ new ScmBlockLocationProtocolServerSideTranslatorPB(this));
|
|
|
|
+
|
|
|
|
+ final InetSocketAddress scmBlockAddress =
|
|
|
|
+ OzoneClientUtils.getScmBlockClientBindAddress(conf);
|
|
|
|
+ blockRpcServer = startRpcServer(conf, scmBlockAddress,
|
|
|
|
+ ScmBlockLocationProtocolPB.class, blockProtoPbService,
|
|
|
|
+ handlerCount);
|
|
|
|
+ blockRpcAddress = updateListenAddress(conf,
|
|
|
|
+ OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
|
|
|
|
+
|
|
registerMXBean();
|
|
registerMXBean();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -447,6 +475,9 @@ public class StorageContainerManager
|
|
LOG.info(buildRpcServerStartMessage(
|
|
LOG.info(buildRpcServerStartMessage(
|
|
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
|
|
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
|
|
clientRpcServer.start();
|
|
clientRpcServer.start();
|
|
|
|
+ LOG.info(buildRpcServerStartMessage(
|
|
|
|
+ "ScmBlockLocationProtocol RPC server", blockRpcAddress));
|
|
|
|
+ blockRpcServer.start();
|
|
LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
|
|
LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
|
|
datanodeRpcAddress));
|
|
datanodeRpcAddress));
|
|
datanodeRpcServer.start();
|
|
datanodeRpcServer.start();
|
|
@@ -456,6 +487,8 @@ public class StorageContainerManager
|
|
* Stop service.
|
|
* Stop service.
|
|
*/
|
|
*/
|
|
public void stop() {
|
|
public void stop() {
|
|
|
|
+ LOG.info("Stopping block service RPC server");
|
|
|
|
+ blockRpcServer.stop();
|
|
LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
|
|
LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
|
|
clientRpcServer.stop();
|
|
clientRpcServer.stop();
|
|
LOG.info("Stopping the RPC server for DataNodes");
|
|
LOG.info("Stopping the RPC server for DataNodes");
|
|
@@ -470,6 +503,7 @@ public class StorageContainerManager
|
|
*/
|
|
*/
|
|
public void join() {
|
|
public void join() {
|
|
try {
|
|
try {
|
|
|
|
+ blockRpcServer.join();
|
|
clientRpcServer.join();
|
|
clientRpcServer.join();
|
|
datanodeRpcServer.join();
|
|
datanodeRpcServer.join();
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|