|
@@ -0,0 +1,304 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
|
|
|
+
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
+import org.apache.hadoop.yarn.server.records.Version;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Implements {@link FederationStateStore} and provides a service for
|
|
|
+ * participating in the federation membership.
|
|
|
+ */
|
|
|
+public class FederationStateStoreService extends AbstractService
|
|
|
+ implements FederationStateStore {
|
|
|
+
|
|
|
+ public static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(FederationStateStoreService.class);
|
|
|
+
|
|
|
+ private Configuration config;
|
|
|
+ private ScheduledExecutorService scheduledExecutorService;
|
|
|
+ private FederationStateStoreHeartbeat stateStoreHeartbeat;
|
|
|
+ private FederationStateStore stateStoreClient = null;
|
|
|
+ private SubClusterId subClusterId;
|
|
|
+ private long heartbeatInterval;
|
|
|
+ private RMContext rmContext;
|
|
|
+
|
|
|
+ public FederationStateStoreService(RMContext rmContext) {
|
|
|
+ super(FederationStateStoreService.class.getName());
|
|
|
+ LOG.info("FederationStateStoreService initialized");
|
|
|
+ this.rmContext = rmContext;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
+
|
|
|
+ this.config = conf;
|
|
|
+
|
|
|
+ RetryPolicy retryPolicy =
|
|
|
+ FederationStateStoreFacade.createRetryPolicy(conf);
|
|
|
+
|
|
|
+ this.stateStoreClient =
|
|
|
+ (FederationStateStore) FederationStateStoreFacade.createRetryInstance(
|
|
|
+ conf, YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
|
|
|
+ FederationStateStore.class, retryPolicy);
|
|
|
+ this.stateStoreClient.init(conf);
|
|
|
+ LOG.info("Initialized state store client class");
|
|
|
+
|
|
|
+ this.subClusterId =
|
|
|
+ SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
|
|
+
|
|
|
+ heartbeatInterval = conf.getLong(
|
|
|
+ YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
|
|
|
+ if (heartbeatInterval <= 0) {
|
|
|
+ heartbeatInterval =
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
|
|
|
+ }
|
|
|
+ LOG.info("Initialized federation membership service.");
|
|
|
+
|
|
|
+ super.serviceInit(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
+
|
|
|
+ registerAndInitializeHeartbeat();
|
|
|
+
|
|
|
+ super.serviceStart();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceStop() throws Exception {
|
|
|
+ Exception ex = null;
|
|
|
+ try {
|
|
|
+ if (this.scheduledExecutorService != null
|
|
|
+ && !this.scheduledExecutorService.isShutdown()) {
|
|
|
+ this.scheduledExecutorService.shutdown();
|
|
|
+ LOG.info("Stopped federation membership heartbeat");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Failed to shutdown ScheduledExecutorService", e);
|
|
|
+ ex = e;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.stateStoreClient != null) {
|
|
|
+ try {
|
|
|
+ deregisterSubCluster(SubClusterDeregisterRequest
|
|
|
+ .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
|
|
|
+ } finally {
|
|
|
+ this.stateStoreClient.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ex != null) {
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return a client accessible string representation of the service address.
|
|
|
+ private String getServiceAddress(InetSocketAddress address) {
|
|
|
+ InetSocketAddress socketAddress = NetUtils.getConnectAddress(address);
|
|
|
+ return socketAddress.getAddress().getHostAddress() + ":"
|
|
|
+ + socketAddress.getPort();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void registerAndInitializeHeartbeat() {
|
|
|
+ String clientRMAddress =
|
|
|
+ getServiceAddress(rmContext.getClientRMService().getBindAddress());
|
|
|
+ String amRMAddress = getServiceAddress(
|
|
|
+ rmContext.getApplicationMasterService().getBindAddress());
|
|
|
+ String rmAdminAddress = getServiceAddress(
|
|
|
+ config.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
|
|
|
+ String webAppAddress =
|
|
|
+ WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(config);
|
|
|
+
|
|
|
+ SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
|
|
|
+ amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
|
|
|
+ SubClusterState.SC_NEW, ResourceManager.getClusterTimeStamp(), "");
|
|
|
+ try {
|
|
|
+ registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo));
|
|
|
+ LOG.info("Successfully registered for federation subcluster: {}",
|
|
|
+ subClusterInfo);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new YarnRuntimeException(
|
|
|
+ "Failed to register Federation membership with the StateStore", e);
|
|
|
+ }
|
|
|
+ stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId,
|
|
|
+ stateStoreClient, rmContext.getScheduler());
|
|
|
+ scheduledExecutorService =
|
|
|
+ HadoopExecutors.newSingleThreadScheduledExecutor();
|
|
|
+ scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
|
|
|
+ heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
|
|
|
+ LOG.info("Started federation membership heartbeat with interval: {}",
|
|
|
+ heartbeatInterval);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public FederationStateStore getStateStoreClient() {
|
|
|
+ return stateStoreClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public FederationStateStoreHeartbeat getStateStoreHeartbeatThread() {
|
|
|
+ return stateStoreHeartbeat;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Version getCurrentVersion() {
|
|
|
+ return stateStoreClient.getCurrentVersion();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Version loadVersion() {
|
|
|
+ return stateStoreClient.getCurrentVersion();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
|
|
|
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.getPolicyConfiguration(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
|
|
|
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.setPolicyConfiguration(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
|
|
|
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.getPoliciesConfigurations(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SubClusterRegisterResponse registerSubCluster(
|
|
|
+ SubClusterRegisterRequest registerSubClusterRequest)
|
|
|
+ throws YarnException {
|
|
|
+ return stateStoreClient.registerSubCluster(registerSubClusterRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SubClusterDeregisterResponse deregisterSubCluster(
|
|
|
+ SubClusterDeregisterRequest subClusterDeregisterRequest)
|
|
|
+ throws YarnException {
|
|
|
+ return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
|
|
|
+ SubClusterHeartbeatRequest subClusterHeartbeatRequest)
|
|
|
+ throws YarnException {
|
|
|
+ return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GetSubClusterInfoResponse getSubCluster(
|
|
|
+ GetSubClusterInfoRequest subClusterRequest) throws YarnException {
|
|
|
+ return stateStoreClient.getSubCluster(subClusterRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GetSubClustersInfoResponse getSubClusters(
|
|
|
+ GetSubClustersInfoRequest subClustersRequest) throws YarnException {
|
|
|
+ return stateStoreClient.getSubClusters(subClustersRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
|
|
|
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.addApplicationHomeSubCluster(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
|
|
|
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.updateApplicationHomeSubCluster(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
|
|
|
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.getApplicationHomeSubCluster(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
|
|
|
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.getApplicationsHomeSubCluster(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
|
|
|
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
|
|
|
+ return stateStoreClient.deleteApplicationHomeSubCluster(request);
|
|
|
+ }
|
|
|
+}
|