瀏覽代碼

YARN-5609. Expose upgrade and restart API in ContainerManagementProtocol. Contributed by Arun Suresh

Jian He 8 年之前
父節點
當前提交
fe644bafe7
共有 20 個文件被更改,包括 732 次插入45 次删除
  1. 30 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
  2. 30 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
  3. 54 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
  4. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
  5. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  6. 73 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
  7. 84 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
  8. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
  9. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
  10. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
  11. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  12. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java
  13. 45 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  14. 69 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  15. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
  16. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
  17. 91 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  18. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
  19. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
  20. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

+ 30 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -30,10 +30,15 @@ import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -476,5 +481,30 @@ public class TestContainerLauncher {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
 }

+ 30 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java

@@ -46,12 +46,17 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -481,6 +486,31 @@ public class TestContainerLauncherImpl {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
   
   @SuppressWarnings("serial")

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java

@@ -23,12 +23,17 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -215,4 +220,53 @@ public interface ContainerManagementProtocol {
   @Unstable
   ResourceLocalizationResponse localize(ResourceLocalizationRequest request)
     throws YarnException, IOException;
+
+  /**
+   * ReInitialize the Container with a new Launch Context.
+   * @param request Specify the new ContainerLaunchContext.
+   * @return Response that the ReInitialize request is accepted.
+   * @throws YarnException Exception specific to YARN.
+   * @throws IOException IOException thrown from the RPC layer.
+   */
+  @Public
+  @Unstable
+  ReInitializeContainerResponse reInitializeContainer(
+      ReInitializeContainerRequest request) throws YarnException, IOException;
+
+  /**
+   * Restart the container.
+   * @param containerId Container Id.
+   * @return Response that the restart request is accepted.
+   * @throws YarnException Exception specific to YARN.
+   * @throws IOException IOException thrown from the RPC layer.
+   */
+  @Public
+  @Unstable
+  RestartContainerResponse restartContainer(ContainerId containerId)
+      throws YarnException, IOException;
+
+  /**
+   * Rollback the Last ReInitialization if possible.
+   * @param containerId Container Id.
+   * @return Response that the rollback request is accepted.
+   * @throws YarnException Exception specific to YARN.
+   * @throws IOException IOException thrown from the RPC layer.
+   */
+  @Public
+  @Unstable
+  RollbackResponse rollbackLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException;
+
+  /**
+   * Commit the Last ReInitialization if possible. Once the reinitialization
+   * has been committed, It cannot be rolled back.
+   * @param containerId Container Id.
+   * @return Response that the commit request is accepted.
+   * @throws YarnException Exception specific to YARN.
+   * @throws IOException IOException thrown from the RPC layer.
+   */
+  @Public
+  @Unstable
+  CommitResponse commitLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException;
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto

@@ -28,6 +28,7 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 package hadoop.yarn;
 
+import "yarn_protos.proto";
 import "yarn_service_protos.proto";
 
 service ContainerManagementProtocolService {
@@ -37,4 +38,9 @@ service ContainerManagementProtocolService {
   rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
   rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
   rpc localize(ResourceLocalizationRequestProto) returns (ResourceLocalizationResponseProto);
+
+  rpc reInitializeContainer(ReInitializeContainerRequestProto) returns (ReInitializeContainerResponseProto);
+  rpc restartContainer(ContainerIdProto) returns (RestartContainerResponseProto);
+  rpc rollbackLastReInitialization(ContainerIdProto) returns (RollbackResponseProto);
+  rpc commitLastReInitialization(ContainerIdProto) returns (CommitResponseProto);
 }

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -295,6 +295,24 @@ message ResourceLocalizationRequestProto {
 message ResourceLocalizationResponseProto {
 }
 
+message ReInitializeContainerRequestProto {
+  optional ContainerIdProto container_id = 1;
+  optional ContainerLaunchContextProto container_launch_context = 2;
+  optional bool auto_commit = 3 [default = true];
+}
+
+message ReInitializeContainerResponseProto {
+}
+
+message RestartContainerResponseProto {
+}
+
+message RollbackResponseProto {
+}
+
+message CommitResponseProto {
+}
+
 //// bulk API records
 message StartContainersRequestProto {
   repeated StartContainerRequestProto start_container_request = 1;

+ 73 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java

@@ -27,33 +27,50 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RestartContainerResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RollbackResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
@@ -185,4 +202,60 @@ public class ContainerManagementProtocolPBClientImpl implements ContainerManagem
       return null;
     }
   }
+
+  @Override
+  public ReInitializeContainerResponse reInitializeContainer(
+      ReInitializeContainerRequest request) throws YarnException, IOException {
+    YarnServiceProtos.ReInitializeContainerRequestProto requestProto =
+        ((ReInitializeContainerRequestPBImpl) request).getProto();
+    try {
+      return new ReInitializeContainerResponsePBImpl(
+          proxy.reInitializeContainer(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public RestartContainerResponse restartContainer(ContainerId containerId)
+      throws YarnException, IOException {
+    YarnProtos.ContainerIdProto containerIdProto = ProtoUtils
+        .convertToProtoFormat(containerId);
+    try {
+      return new RestartContainerResponsePBImpl(
+          proxy.restartContainer(null, containerIdProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public RollbackResponse rollbackLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException {
+    YarnProtos.ContainerIdProto containerIdProto = ProtoUtils
+        .convertToProtoFormat(containerId);
+    try {
+      return new RollbackResponsePBImpl(
+          proxy.rollbackLastReInitialization(null, containerIdProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public CommitResponse commitLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException {
+    YarnProtos.ContainerIdProto containerIdProto = ProtoUtils
+        .convertToProtoFormat(containerId);
+    try {
+      return new CommitResponsePBImpl(
+          proxy.commitLastReInitialization(null, containerIdProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
 }

+ 84 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java

@@ -23,37 +23,57 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RestartContainerResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RollbackResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RestartContainerResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RollbackResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersResponseProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.CommitResponseProto;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -156,4 +176,66 @@ public class ContainerManagementProtocolPBServiceImpl implements ContainerManage
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public ReInitializeContainerResponseProto reInitializeContainer(
+      RpcController controller, ReInitializeContainerRequestProto proto)
+      throws ServiceException {
+    ReInitializeContainerRequestPBImpl request =
+        new ReInitializeContainerRequestPBImpl(proto);
+    try {
+      ReInitializeContainerResponse response =
+          real.reInitializeContainer(request);
+      return ((ReInitializeContainerResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RestartContainerResponseProto restartContainer(
+      RpcController controller, ContainerIdProto containerId)
+      throws ServiceException {
+    ContainerId request = ProtoUtils.convertFromProtoFormat(containerId);
+    try {
+      RestartContainerResponse response = real.restartContainer(request);
+      return ((RestartContainerResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RollbackResponseProto rollbackLastReInitialization(
+      RpcController controller, ContainerIdProto containerId) throws
+      ServiceException {
+    ContainerId request = ProtoUtils.convertFromProtoFormat(containerId);
+    try {
+      RollbackResponse response = real.rollbackLastReInitialization(request);
+      return ((RollbackResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CommitResponseProto commitLastReInitialization(
+      RpcController controller, ContainerIdProto containerId) throws
+      ServiceException {
+    ContainerId request = ProtoUtils.convertFromProtoFormat(containerId);
+    try {
+      CommitResponse response = real.commitLastReInitialization(request);
+      return ((CommitResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java

@@ -32,12 +32,17 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -206,5 +211,30 @@ public class TestContainerLaunchRPC {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
 }

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java

@@ -26,12 +26,17 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -187,5 +192,30 @@ public class TestContainerResourceIncreaseRPC {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java

@@ -43,6 +43,11 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
@@ -520,6 +525,11 @@ public class TestPBImplRecords {
     generateByNewInstance(ResourceAllocationRequest.class);
     generateByNewInstance(ReservationAllocationState.class);
     generateByNewInstance(ResourceUtilization.class);
+    generateByNewInstance(ReInitializeContainerRequest.class);
+    generateByNewInstance(ReInitializeContainerResponse.class);
+    generateByNewInstance(RestartContainerResponse.class);
+    generateByNewInstance(RollbackResponse.class);
+    generateByNewInstance(CommitResponse.class);
   }
 
   private class GetSetPair {

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -33,13 +33,18 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -355,6 +360,31 @@ public class TestRPC {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
 
   public static ContainerTokenIdentifier newContainerTokenIdentifier(

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java

@@ -45,6 +45,10 @@ public class NMAuditLogger {
     // Some commonly used descriptions
     public static final String START_CONTAINER = "Start Container Request";
     public static final String STOP_CONTAINER = "Stop Container Request";
+    public static final String START_CONTAINER_REINIT =
+        "Container ReInitialization - Started";
+    public static final String FINISH_CONTAINER_REINIT =
+        "Container ReInitialization - Finished";
     public static final String FINISH_SUCCESS_CONTAINER = "Container Finished - Succeeded";
     public static final String FINISH_FAILED_CONTAINER = "Container Finished - Failed";
     public static final String FINISH_KILLED_CONTAINER = "Container Finished - Killed";

+ 45 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -38,12 +38,17 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -1534,7 +1539,7 @@ public class ContainerManagerImpl extends CompositeService implements
       ResourceLocalizationRequest request) throws YarnException, IOException {
 
     ContainerId containerId = request.getContainerId();
-    Container container = preUpgradeOrLocalizeCheck(containerId,
+    Container container = preReInitializeOrLocalizeCheck(containerId,
         ReInitOp.LOCALIZE);
     try {
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
@@ -1551,6 +1556,21 @@ public class ContainerManagerImpl extends CompositeService implements
     return ResourceLocalizationResponse.newInstance();
   }
 
+  @Override
+  public ReInitializeContainerResponse reInitializeContainer(
+      ReInitializeContainerRequest request) throws YarnException, IOException {
+    reInitializeContainer(request.getContainerId(),
+        request.getContainerLaunchContext(), request.getAutoCommit());
+    return ReInitializeContainerResponse.newInstance();
+  }
+
+  @Override
+  public RestartContainerResponse restartContainer(ContainerId containerId)
+      throws YarnException, IOException {
+    reInitializeContainer(containerId, null, true);
+    return RestartContainerResponse.newInstance();
+  }
+
   /**
    * ReInitialize a container using a new Launch Context. If the
    * retryFailureContext is not provided, The container is
@@ -1568,11 +1588,13 @@ public class ContainerManagerImpl extends CompositeService implements
   public void reInitializeContainer(ContainerId containerId,
       ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
       throws YarnException {
-    Container container = preUpgradeOrLocalizeCheck(containerId,
+    Container container = preReInitializeOrLocalizeCheck(containerId,
         ReInitOp.RE_INIT);
     ResourceSet resourceSet = new ResourceSet();
     try {
-      resourceSet.addResources(reInitLaunchContext.getLocalResources());
+      if (reInitLaunchContext != null) {
+        resourceSet.addResources(reInitLaunchContext.getLocalResources());
+      }
       dispatcher.getEventHandler().handle(
           new ContainerReInitEvent(containerId, reInitLaunchContext,
               resourceSet, autoCommit));
@@ -1587,38 +1609,53 @@ public class ContainerManagerImpl extends CompositeService implements
   /**
    * Rollback the last reInitialization, if possible.
    * @param containerId Container ID.
+   * @return Rollback Response.
    * @throws YarnException Yarn Exception.
    */
-  public void rollbackReInitialization(ContainerId containerId)
+  @Override
+  public RollbackResponse rollbackLastReInitialization(ContainerId containerId)
       throws YarnException {
-    Container container = preUpgradeOrLocalizeCheck(containerId,
+    Container container = preReInitializeOrLocalizeCheck(containerId,
         ReInitOp.ROLLBACK);
     if (container.canRollback()) {
       dispatcher.getEventHandler().handle(
           new ContainerEvent(containerId, ContainerEventType.ROLLBACK_REINIT));
+      container.setIsReInitializing(true);
     } else {
       throw new YarnException("Nothing to rollback to !!");
     }
+    return RollbackResponse.newInstance();
   }
 
   /**
    * Commit last reInitialization after which no rollback will be possible.
    * @param containerId Container ID.
+   * @return Commit Response.
    * @throws YarnException Yarn Exception.
    */
-  public void commitReInitialization(ContainerId containerId)
+  @Override
+  public CommitResponse commitLastReInitialization(ContainerId containerId)
       throws YarnException {
-    Container container = preUpgradeOrLocalizeCheck(containerId,
+    Container container = preReInitializeOrLocalizeCheck(containerId,
         ReInitOp.COMMIT);
     if (container.canRollback()) {
       container.commitUpgrade();
     } else {
       throw new YarnException("Nothing to Commit !!");
     }
+    return CommitResponse.newInstance();
   }
 
-  private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
+  private Container preReInitializeOrLocalizeCheck(ContainerId containerId,
       ReInitOp op) throws YarnException {
+    UserGroupInformation remoteUgi = getRemoteUgi();
+    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+    authorizeUser(remoteUgi, nmTokenIdentifier);
+    if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId()
+        .equals(containerId.getApplicationAttemptId().getApplicationId())) {
+      throw new YarnException("ApplicationMaster not autorized to perform " +
+          "["+ op + "] on Container [" + containerId + "]!!");
+    }
     Container container = context.getContainers().get(containerId);
     if (container == null) {
       throw new YarnException("Specified " + containerId + " does not exist!");

+ 69 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -99,6 +99,8 @@ public class ContainerImpl implements Container {
     private final ContainerLaunchContext oldLaunchContext;
     private final ResourceSet oldResourceSet;
 
+    private boolean isRollback = false;
+
     private ReInitializationContext(ContainerLaunchContext newLaunchContext,
         ResourceSet newResourceSet,
         ContainerLaunchContext oldLaunchContext,
@@ -113,20 +115,23 @@ public class ContainerImpl implements Container {
       return (oldLaunchContext != null);
     }
 
-    private ResourceSet mergedResourceSet() {
-      if (oldLaunchContext == null) {
+    private ResourceSet mergedResourceSet(ResourceSet current) {
+      if (isRollback) {
+        // No merging should be done for rollback
         return newResourceSet;
       }
-      return ResourceSet.merge(oldResourceSet, newResourceSet);
+      if (current == newResourceSet) {
+        // This happens during a restart
+        return current;
+      }
+      return ResourceSet.merge(current, newResourceSet);
     }
 
     private ReInitializationContext createContextForRollback() {
-      if (oldLaunchContext == null) {
-        return null;
-      } else {
-        return new ReInitializationContext(
-            oldLaunchContext, oldResourceSet, null, null);
-      }
+      ReInitializationContext cntxt = new ReInitializationContext(
+          oldLaunchContext, oldResourceSet, null, null);
+      cntxt.isRollback = true;
+      return cntxt;
     }
   }
 
@@ -918,13 +923,20 @@ public class ContainerImpl implements Container {
     public void transition(ContainerImpl container, ContainerEvent event) {
       container.reInitContext = createReInitContext(container, event);
       try {
-        Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
-            resByVisibility = container.reInitContext.newResourceSet
-            .getAllResourcesByVisibility();
-        if (!resByVisibility.isEmpty()) {
+        // 'reInitContext.newResourceSet' can be
+        // a) current container resourceSet (In case of Restart)
+        // b) previous resourceSet (In case of RollBack)
+        // c) An actual NEW resourceSet (In case of Upgrade/ReInit)
+        //
+        // In cases a) and b) Container can immediately be cleaned up since
+        // we are sure the resources are already available (we check the
+        // pendingResources to verify that nothing more is needed). So we can
+        // kill the container immediately
+        ResourceSet newResourceSet = container.reInitContext.newResourceSet;
+        if (!newResourceSet.getPendingResources().isEmpty()) {
           container.dispatcher.getEventHandler().handle(
               new ContainerLocalizationRequestEvent(
-                  container, resByVisibility));
+                  container, newResourceSet.getAllResourcesByVisibility()));
         } else {
           // We are not waiting on any resources, so...
           // Kill the current container.
@@ -932,6 +944,11 @@ public class ContainerImpl implements Container {
               new ContainersLauncherEvent(container,
                   ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
         }
+        container.metrics.reInitingContainer();
+        NMAuditLogger.logSuccess(container.user,
+            AuditConstants.START_CONTAINER_REINIT, "ContainerImpl",
+            container.containerId.getApplicationAttemptId().getApplicationId(),
+            container.containerId);
       } catch (Exception e) {
         LOG.error("Container [" + container.getContainerId() + "]" +
             " re-initialization failure..", e);
@@ -943,13 +960,26 @@ public class ContainerImpl implements Container {
     protected ReInitializationContext createReInitContext(
         ContainerImpl container, ContainerEvent event) {
       ContainerReInitEvent reInitEvent = (ContainerReInitEvent)event;
-      return new ReInitializationContext(
-          reInitEvent.getReInitLaunchContext(),
-          reInitEvent.getResourceSet(),
-          // If AutoCommit is turned on, then no rollback can happen...
-          // So don't need to store the previous context.
-          (reInitEvent.isAutoCommit() ? null : container.launchContext),
-          (reInitEvent.isAutoCommit() ? null : container.resourceSet));
+      if (reInitEvent.getReInitLaunchContext() == null) {
+        // This is a Restart...
+        // We also need to make sure that if Rollback is possible, the
+        // rollback state should be retained in the
+        // oldLaunchContext and oldResourceSet
+        return new ReInitializationContext(
+            container.launchContext, container.resourceSet,
+            container.canRollback() ?
+                container.reInitContext.oldLaunchContext : null,
+            container.canRollback() ?
+                container.reInitContext.oldResourceSet : null);
+      } else {
+        return new ReInitializationContext(
+            reInitEvent.getReInitLaunchContext(),
+            reInitEvent.getResourceSet(),
+            // If AutoCommit is turned on, then no rollback can happen...
+            // So don't need to store the previous context.
+            (reInitEvent.isAutoCommit() ? null : container.launchContext),
+            (reInitEvent.isAutoCommit() ? null : container.resourceSet));
+      }
     }
   }
 
@@ -1080,6 +1110,12 @@ public class ContainerImpl implements Container {
       container.metrics.runningContainer();
       container.wasLaunched  = true;
 
+      if (container.isReInitializing()) {
+        NMAuditLogger.logSuccess(container.user,
+            AuditConstants.FINISH_CONTAINER_REINIT, "ContainerImpl",
+            container.containerId.getApplicationAttemptId().getApplicationId(),
+            container.containerId);
+      }
       container.setIsReInitializing(false);
       // Check if this launch was due to a re-initialization.
       // If autocommit == true, then wipe the re-init context. This ensures
@@ -1214,6 +1250,12 @@ public class ContainerImpl implements Container {
             container.getContainerId() + "] !!");
         container.reInitContext =
             container.reInitContext.createContextForRollback();
+        container.metrics.rollbackContainerOnFailure();
+        container.metrics.reInitingContainer();
+        NMAuditLogger.logSuccess(container.user,
+            AuditConstants.START_CONTAINER_REINIT, "ContainerImpl",
+            container.containerId.getApplicationAttemptId().getApplicationId(),
+            container.containerId);
         new KilledForReInitializationTransition().transition(container, event);
         return ContainerState.LOCALIZED;
       } else {
@@ -1305,7 +1347,7 @@ public class ContainerImpl implements Container {
     public void transition(ContainerImpl container,
         ContainerEvent event) {
       LOG.info("Relaunching Container [" + container.getContainerId()
-          + "] for upgrade !!");
+          + "] for re-initialization !!");
       container.wasLaunched  = false;
       container.metrics.endRunningContainer();
 
@@ -1319,7 +1361,8 @@ public class ContainerImpl implements Container {
       container.remainingRetryAttempts =
           container.containerRetryContext.getMaxRetries();
 
-      container.resourceSet = container.reInitContext.mergedResourceSet();
+      container.resourceSet =
+          container.reInitContext.mergedResourceSet(container.resourceSet);
 
       container.sendLaunchEvent();
     }
@@ -1655,6 +1698,9 @@ public class ContainerImpl implements Container {
 
   @Override
   public void setIsReInitializing(boolean isReInitializing) {
+    if (this.isReInitializing && !isReInitializing) {
+      metrics.endReInitingContainer();
+    }
     this.isReInitializing = isReInitializing;
   }
 

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java

@@ -31,10 +31,14 @@ import com.google.common.annotations.VisibleForTesting;
 
 @Metrics(about="Metrics for node manager", context="yarn")
 public class NodeManagerMetrics {
+  // CHECKSTYLE:OFF:VisibilityModifier
   @Metric MutableCounterInt containersLaunched;
   @Metric MutableCounterInt containersCompleted;
   @Metric MutableCounterInt containersFailed;
   @Metric MutableCounterInt containersKilled;
+  @Metric MutableCounterInt containersRolledBackOnFailure;
+  @Metric("# of reInitializing containers")
+      MutableGaugeInt containersReIniting;
   @Metric("# of initializing containers")
       MutableGaugeInt containersIniting;
   @Metric MutableGaugeInt containersRunning;
@@ -56,6 +60,7 @@ public class NodeManagerMetrics {
       MutableGaugeInt goodLocalDirsDiskUtilizationPerc;
   @Metric("Disk utilization % on good log dirs")
       MutableGaugeInt goodLogDirsDiskUtilizationPerc;
+  // CHECKSTYLE:ON:VisibilityModifier
 
   private JvmMetrics jvmMetrics = null;
 
@@ -89,6 +94,10 @@ public class NodeManagerMetrics {
     containersCompleted.incr();
   }
 
+  public void rollbackContainerOnFailure() {
+    containersRolledBackOnFailure.incr();
+  }
+
   public void failedContainer() {
     containersFailed.incr();
   }
@@ -113,6 +122,14 @@ public class NodeManagerMetrics {
     containersRunning.decr();
   }
 
+  public void reInitingContainer() {
+    containersReIniting.incr();
+  }
+
+  public void endReInitingContainer() {
+    containersReIniting.decr();
+  }
+
   public void allocateContainer(Resource res) {
     allocatedContainers.incr();
     allocatedMB = allocatedMB + res.getMemorySize();
@@ -211,4 +228,13 @@ public class NodeManagerMetrics {
     return goodLocalDirsDiskUtilizationPerc.value();
   }
 
+  @VisibleForTesting
+  public int getReInitializingContainer() {
+    return containersReIniting.value();
+  }
+
+  @VisibleForTesting
+  public int getContainersRolledbackOnFailure() {
+    return containersRolledBackOnFailure.value();
+  }
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java

@@ -341,6 +341,18 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
     super.testContainerUpgradeProcessFailure();
   }
 
+  @Override
+  public void testContainerRestart() throws IOException, InterruptedException,
+      YarnException {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerRestart");
+    super.testContainerRestart();
+  }
+
   private boolean shouldRunTest() {
     return System
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

+ 91 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -369,7 +369,75 @@ public class TestContainerManager extends BaseContainerManagerTest {
       DefaultContainerExecutor.containerIsAlive(pid));
   }
 
-  private String[] testContainerUpgradeSuccess(boolean autoCommit)
+  @Test
+  public void testContainerRestart() throws IOException, InterruptedException,
+      YarnException {
+    containerManager.start();
+    // ////// Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+    String pid = prepareInitialContainer(cId, oldStartFile);
+
+    // Test that the container can restart
+    // Also, Since there was no rollback context present before the
+    // restart, rollback should NOT be possible after the restart
+    doRestartTests(cId, oldStartFile, "Hello World!", pid, false);
+  }
+
+  private String doRestartTests(ContainerId cId, File oldStartFile,
+      String testString, String pid, boolean canRollback)
+      throws YarnException, IOException, InterruptedException {
+    int beforeRestart = metrics.getRunningContainers();
+    Container container =
+        containerManager.getContext().getContainers().get(cId);
+    Assert.assertFalse(container.isReInitializing());
+    containerManager.restartContainer(cId);
+    Assert.assertTrue(container.isReInitializing());
+
+    // Wait for original process to die and the new process to restart
+    int timeoutSecs = 0;
+    while (DefaultContainerExecutor.containerIsAlive(pid)
+        && (metrics.getRunningContainers() == beforeRestart)
+        && container.isReInitializing()
+        && timeoutSecs++ < 20) {
+      Thread.sleep(1000);
+      LOG.info("Waiting for Original process to die.." +
+          "and new process to start!!");
+    }
+
+    Assert.assertFalse("Old Process Still alive!!",
+        DefaultContainerExecutor.containerIsAlive(pid));
+
+    String newPid = null;
+    timeoutSecs = 0;
+    while (timeoutSecs++ < 20) {
+      LOG.info("Waiting for New process file to be created!!");
+      // Now verify the contents of the file
+      BufferedReader reader =
+          new BufferedReader(new FileReader(oldStartFile));
+      Assert.assertEquals(testString, reader.readLine());
+      // Get the pid of the process
+      newPid = reader.readLine().trim();
+      // No more lines
+      Assert.assertEquals(null, reader.readLine());
+      reader.close();
+      if (!newPid.equals(pid)) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    // Assert both pids are different
+    Assert.assertNotEquals(pid, newPid);
+
+    // Container cannot rollback from a restart
+    Assert.assertEquals(canRollback, container.canRollback());
+
+    return newPid;
+  }
+
+  private String[] testContainerReInitSuccess(boolean autoCommit)
       throws IOException, InterruptedException, YarnException {
     containerManager.start();
     // ////// Construct the Container-id
@@ -412,10 +480,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testContainerUpgradeSuccessAutoCommit() throws IOException,
       InterruptedException, YarnException {
-    testContainerUpgradeSuccess(true);
+    testContainerReInitSuccess(true);
     // Should not be able to Commit (since already auto committed)
     try {
-      containerManager.commitReInitialization(createContainerId(0));
+      containerManager.commitLastReInitialization(createContainerId(0));
       Assert.fail();
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
@@ -425,12 +493,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
       InterruptedException, YarnException {
-    testContainerUpgradeSuccess(false);
+    testContainerReInitSuccess(false);
     ContainerId cId = createContainerId(0);
-    containerManager.commitReInitialization(cId);
+    containerManager.commitLastReInitialization(cId);
     // Should not be able to Rollback once committed
     try {
-      containerManager.rollbackReInitialization(cId);
+      containerManager.rollbackLastReInitialization(cId);
       Assert.fail();
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Nothing to rollback to"));
@@ -440,27 +508,38 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
       InterruptedException, YarnException {
-    String[] pids = testContainerUpgradeSuccess(false);
+    String[] pids = testContainerReInitSuccess(false);
+
+    // Test that the container can be Restarted after the successful upgrrade.
+    // Also, since there is a rollback context present before the restart, it
+    // should be possible to rollback the container AFTER the restart.
+    pids[1] = doRestartTests(createContainerId(0),
+        new File(tmpDir, "start_file_n.txt").getAbsoluteFile(),
+        "Upgrade World!", pids[1], true);
 
     // Delete the old start File..
     File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
     oldStartFile.delete();
 
     ContainerId cId = createContainerId(0);
     // Explicit Rollback
-    containerManager.rollbackReInitialization(cId);
+    containerManager.rollbackLastReInitialization(cId);
 
+    Container container =
+        containerManager.getContext().getContainers().get(cId);
+    Assert.assertTrue(container.isReInitializing());
     // Original should be dead anyway
     Assert.assertFalse("Original Process is still alive!",
         DefaultContainerExecutor.containerIsAlive(pids[0]));
 
-    // Wait for upgraded process to die
+    // Wait for new container to startup
     int timeoutSecs = 0;
-    while (!DefaultContainerExecutor.containerIsAlive(pids[1])
-        && timeoutSecs++ < 20) {
+    while (container.isReInitializing() && timeoutSecs++ < 20) {
       Thread.sleep(1000);
-      LOG.info("Waiting for Upgraded process to die..");
+      LOG.info("Waiting for ReInitialization to complete..");
     }
+    Assert.assertFalse(container.isReInitializing());
 
     timeoutSecs = 0;
     // Wait for new processStartfile to be created

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java

@@ -25,10 +25,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -332,4 +337,28 @@ public class NodeManager implements ContainerManagementProtocol {
       ResourceLocalizationRequest request) throws YarnException, IOException {
     return null;
   }
+
+  @Override
+  public ReInitializeContainerResponse reInitializeContainer(
+      ReInitializeContainerRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public RestartContainerResponse restartContainer(ContainerId containerId)
+      throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public RollbackResponse rollbackLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public CommitResponse commitLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException {
+    return null;
+  }
 }

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java

@@ -41,14 +41,19 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -57,6 +62,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -176,6 +182,31 @@ public class TestAMAuthorization {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
 
   public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

@@ -33,12 +33,17 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -164,6 +169,31 @@ public class TestApplicationMasterLauncher {
         ResourceLocalizationRequest request) throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReInitializeContainerResponse reInitializeContainer(
+        ReInitializeContainerRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RestartContainerResponse restartContainer(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public RollbackResponse rollbackLastReInitialization(
+        ContainerId containerId) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public CommitResponse commitLastReInitialization(ContainerId containerId)
+        throws YarnException, IOException {
+      return null;
+    }
   }
 
   @Test