|
@@ -16,7 +16,7 @@
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
|
|
|
-package org.apache.hadoop.yarn.client;
|
|
|
+package org.apache.hadoop.yarn.client.api.async.impl;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
@@ -39,16 +39,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
-import org.apache.hadoop.yarn.api.records.Token;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Token;
|
|
|
+import org.apache.hadoop.yarn.client.api.NMClient;
|
|
|
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
|
|
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AbstractEvent;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -63,75 +64,11 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
|
|
-/**
|
|
|
- * <code>NMClientAsync</code> handles communication with all the NodeManagers
|
|
|
- * and provides asynchronous updates on getting responses from them. It
|
|
|
- * maintains a thread pool to communicate with individual NMs where a number of
|
|
|
- * worker threads process requests to NMs by using {@link NMClientImpl}. The max
|
|
|
- * size of the thread pool is configurable through
|
|
|
- * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
|
|
|
- *
|
|
|
- * It should be used in conjunction with a CallbackHandler. For example
|
|
|
- *
|
|
|
- * <pre>
|
|
|
- * {@code
|
|
|
- * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
|
|
|
- * public void onContainerStarted(ContainerId containerId,
|
|
|
- * Map<String, ByteBuffer> allServiceResponse) {
|
|
|
- * [post process after the container is started, process the response]
|
|
|
- * }
|
|
|
- *
|
|
|
- * public void onContainerStatusReceived(ContainerId containerId,
|
|
|
- * ContainerStatus containerStatus) {
|
|
|
- * [make use of the status of the container]
|
|
|
- * }
|
|
|
- *
|
|
|
- * public void onContainerStopped(ContainerId containerId) {
|
|
|
- * [post process after the container is stopped]
|
|
|
- * }
|
|
|
- *
|
|
|
- * public void onStartContainerError(
|
|
|
- * ContainerId containerId, Throwable t) {
|
|
|
- * [handle the raised exception]
|
|
|
- * }
|
|
|
- *
|
|
|
- * public void onGetContainerStatusError(
|
|
|
- * ContainerId containerId, Throwable t) {
|
|
|
- * [handle the raised exception]
|
|
|
- * }
|
|
|
- *
|
|
|
- * public void onStopContainerError(
|
|
|
- * ContainerId containerId, Throwable t) {
|
|
|
- * [handle the raised exception]
|
|
|
- * }
|
|
|
- * }
|
|
|
- * }
|
|
|
- * </pre>
|
|
|
- *
|
|
|
- * The client's life-cycle should be managed like the following:
|
|
|
- *
|
|
|
- * <pre>
|
|
|
- * {@code
|
|
|
- * NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
|
|
|
- * asyncClient.init(conf);
|
|
|
- * asyncClient.start();
|
|
|
- * asyncClient.startContainer(container, containerLaunchContext);
|
|
|
- * [... wait for container being started]
|
|
|
- * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
|
|
|
- * container.getContainerToken());
|
|
|
- * [... handle the status in the callback instance]
|
|
|
- * asyncClient.stopContainer(container.getId(), container.getNodeId(),
|
|
|
- * container.getContainerToken());
|
|
|
- * [... wait for container being stopped]
|
|
|
- * asyncClient.stop();
|
|
|
- * }
|
|
|
- * </pre>
|
|
|
- */
|
|
|
+@Private
|
|
|
@Unstable
|
|
|
-@Evolving
|
|
|
-public class NMClientAsync extends AbstractService {
|
|
|
+public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
|
|
|
- private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
|
|
|
|
|
|
protected static final int INITIAL_THREAD_POOL_SIZE = 10;
|
|
|
|
|
@@ -142,25 +79,22 @@ public class NMClientAsync extends AbstractService {
|
|
|
protected BlockingQueue<ContainerEvent> events =
|
|
|
new LinkedBlockingQueue<ContainerEvent>();
|
|
|
|
|
|
- protected NMClient client;
|
|
|
- protected CallbackHandler callbackHandler;
|
|
|
-
|
|
|
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
|
|
|
new ConcurrentHashMap<ContainerId, StatefulContainer>();
|
|
|
|
|
|
- public NMClientAsync(CallbackHandler callbackHandler) {
|
|
|
- this (NMClientAsync.class.getName(), callbackHandler);
|
|
|
+ public NMClientAsyncImpl(CallbackHandler callbackHandler) {
|
|
|
+ this (NMClientAsyncImpl.class.getName(), callbackHandler);
|
|
|
}
|
|
|
|
|
|
- public NMClientAsync(String name, CallbackHandler callbackHandler) {
|
|
|
+ public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
|
|
|
this (name, new NMClientImpl(), callbackHandler);
|
|
|
}
|
|
|
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
- protected NMClientAsync(String name, NMClient client,
|
|
|
+ protected NMClientAsyncImpl(String name, NMClient client,
|
|
|
CallbackHandler callbackHandler) {
|
|
|
- super(name);
|
|
|
+ super(name, client, callbackHandler);
|
|
|
this.client = client;
|
|
|
this.callbackHandler = callbackHandler;
|
|
|
}
|
|
@@ -268,7 +202,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
// If NMClientImpl doesn't stop running containers, the states doesn't
|
|
|
// need to be cleared.
|
|
|
if (!(client instanceof NMClientImpl) ||
|
|
|
- ((NMClientImpl) client).cleanupRunningContainers.get()) {
|
|
|
+ ((NMClientImpl) client).getCleanupRunningContainers().get()) {
|
|
|
if (containers != null) {
|
|
|
containers.clear();
|
|
|
}
|
|
@@ -278,7 +212,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
|
- public void startContainer(
|
|
|
+ public void startContainerAsync(
|
|
|
Container container, ContainerLaunchContext containerLaunchContext) {
|
|
|
if (containers.putIfAbsent(container.getId(),
|
|
|
new StatefulContainer(this, container.getId())) != null) {
|
|
@@ -295,7 +229,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void stopContainer(ContainerId containerId, NodeId nodeId,
|
|
|
+ public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
|
|
|
Token containerToken) {
|
|
|
if (containers.get(containerId) == null) {
|
|
|
callbackHandler.onStopContainerError(containerId,
|
|
@@ -312,7 +246,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void getContainerStatus(ContainerId containerId, NodeId nodeId,
|
|
|
+ public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
|
|
|
Token containerToken) {
|
|
|
try {
|
|
|
events.put(new ContainerEvent(containerId, nodeId, containerToken,
|
|
@@ -443,10 +377,10 @@ public class NMClientAsync extends AbstractService {
|
|
|
}
|
|
|
assert scEvent != null;
|
|
|
Map<String, ByteBuffer> allServiceResponse =
|
|
|
- container.nmClientAsync.client.startContainer(
|
|
|
+ container.nmClientAsync.getClient().startContainer(
|
|
|
scEvent.getContainer(), scEvent.getContainerLaunchContext());
|
|
|
try {
|
|
|
- container.nmClientAsync.callbackHandler.onContainerStarted(
|
|
|
+ container.nmClientAsync.getCallbackHandler().onContainerStarted(
|
|
|
containerId, allServiceResponse);
|
|
|
} catch (Throwable thr) {
|
|
|
// Don't process user created unchecked exception
|
|
@@ -466,7 +400,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
private ContainerState onExceptionRaised(StatefulContainer container,
|
|
|
ContainerEvent event, Throwable t) {
|
|
|
try {
|
|
|
- container.nmClientAsync.callbackHandler.onStartContainerError(
|
|
|
+ container.nmClientAsync.getCallbackHandler().onStartContainerError(
|
|
|
event.getContainerId(), t);
|
|
|
} catch (Throwable thr) {
|
|
|
// Don't process user created unchecked exception
|
|
@@ -487,10 +421,10 @@ public class NMClientAsync extends AbstractService {
|
|
|
StatefulContainer container, ContainerEvent event) {
|
|
|
ContainerId containerId = event.getContainerId();
|
|
|
try {
|
|
|
- container.nmClientAsync.client.stopContainer(
|
|
|
+ container.nmClientAsync.getClient().stopContainer(
|
|
|
containerId, event.getNodeId(), event.getContainerToken());
|
|
|
try {
|
|
|
- container.nmClientAsync.callbackHandler.onContainerStopped(
|
|
|
+ container.nmClientAsync.getCallbackHandler().onContainerStopped(
|
|
|
event.getContainerId());
|
|
|
} catch (Throwable thr) {
|
|
|
// Don't process user created unchecked exception
|
|
@@ -510,7 +444,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
private ContainerState onExceptionRaised(StatefulContainer container,
|
|
|
ContainerEvent event, Throwable t) {
|
|
|
try {
|
|
|
- container.nmClientAsync.callbackHandler.onStopContainerError(
|
|
|
+ container.nmClientAsync.getCallbackHandler().onStopContainerError(
|
|
|
event.getContainerId(), t);
|
|
|
} catch (Throwable thr) {
|
|
|
// Don't process user created unchecked exception
|
|
@@ -530,7 +464,7 @@ public class NMClientAsync extends AbstractService {
|
|
|
@Override
|
|
|
public void transition(StatefulContainer container, ContainerEvent event) {
|
|
|
try {
|
|
|
- container.nmClientAsync.callbackHandler.onStartContainerError(
|
|
|
+ container.nmClientAsync.getCallbackHandler().onStartContainerError(
|
|
|
event.getContainerId(),
|
|
|
RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
|
|
|
} catch (Throwable thr) {
|
|
@@ -641,80 +575,4 @@ public class NMClientAsync extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * <p>
|
|
|
- * The callback interface needs to be implemented by {@link NMClientAsync}
|
|
|
- * users. The APIs are called when responses from <code>NodeManager</code> are
|
|
|
- * available.
|
|
|
- * </p>
|
|
|
- *
|
|
|
- * <p>
|
|
|
- * Once a callback happens, the users can chose to act on it in blocking or
|
|
|
- * non-blocking manner. If the action on callback is done in a blocking
|
|
|
- * manner, some of the threads performing requests on NodeManagers may get
|
|
|
- * blocked depending on how many threads in the pool are busy.
|
|
|
- * </p>
|
|
|
- *
|
|
|
- * <p>
|
|
|
- * The implementation of the callback function should not throw the
|
|
|
- * unexpected exception. Otherwise, {@link NMClientAsync} will just
|
|
|
- * catch, log and then ignore it.
|
|
|
- * </p>
|
|
|
- */
|
|
|
- public static interface CallbackHandler {
|
|
|
- /**
|
|
|
- * The API is called when <code>NodeManager</code> responds to indicate its
|
|
|
- * acceptance of the starting container request
|
|
|
- * @param containerId the Id of the container
|
|
|
- * @param allServiceResponse a Map between the auxiliary service names and
|
|
|
- * their outputs
|
|
|
- */
|
|
|
- void onContainerStarted(ContainerId containerId,
|
|
|
- Map<String, ByteBuffer> allServiceResponse);
|
|
|
-
|
|
|
- /**
|
|
|
- * The API is called when <code>NodeManager</code> responds with the status
|
|
|
- * of the container
|
|
|
- * @param containerId the Id of the container
|
|
|
- * @param containerStatus the status of the container
|
|
|
- */
|
|
|
- void onContainerStatusReceived(ContainerId containerId,
|
|
|
- ContainerStatus containerStatus);
|
|
|
-
|
|
|
- /**
|
|
|
- * The API is called when <code>NodeManager</code> responds to indicate the
|
|
|
- * container is stopped.
|
|
|
- * @param containerId the Id of the container
|
|
|
- */
|
|
|
- void onContainerStopped(ContainerId containerId);
|
|
|
-
|
|
|
- /**
|
|
|
- * The API is called when an exception is raised in the process of
|
|
|
- * starting a container
|
|
|
- *
|
|
|
- * @param containerId the Id of the container
|
|
|
- * @param t the raised exception
|
|
|
- */
|
|
|
- void onStartContainerError(ContainerId containerId, Throwable t);
|
|
|
-
|
|
|
- /**
|
|
|
- * The API is called when an exception is raised in the process of
|
|
|
- * querying the status of a container
|
|
|
- *
|
|
|
- * @param containerId the Id of the container
|
|
|
- * @param t the raised exception
|
|
|
- */
|
|
|
- void onGetContainerStatusError(ContainerId containerId, Throwable t);
|
|
|
-
|
|
|
- /**
|
|
|
- * The API is called when an exception is raised in the process of
|
|
|
- * stopping a container
|
|
|
- *
|
|
|
- * @param containerId the Id of the container
|
|
|
- * @param t the raised exception
|
|
|
- */
|
|
|
- void onStopContainerError(ContainerId containerId, Throwable t);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
}
|