|
@@ -32,11 +32,13 @@ import org.apache.hadoop.util.Time;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+import java.util.Iterator;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Netty client handler.
|
|
* Netty client handler.
|
|
@@ -51,15 +53,17 @@ public class XceiverClientHandler extends
|
|
private final Pipeline pipeline;
|
|
private final Pipeline pipeline;
|
|
private volatile Channel channel;
|
|
private volatile Channel channel;
|
|
private XceiverClientMetrics metrics;
|
|
private XceiverClientMetrics metrics;
|
|
|
|
+ private final Semaphore semaphore;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Constructs a client that can communicate to a container server.
|
|
* Constructs a client that can communicate to a container server.
|
|
*/
|
|
*/
|
|
- public XceiverClientHandler(Pipeline pipeline) {
|
|
|
|
|
|
+ public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
|
|
super(false);
|
|
super(false);
|
|
Preconditions.checkNotNull(pipeline);
|
|
Preconditions.checkNotNull(pipeline);
|
|
this.pipeline = pipeline;
|
|
this.pipeline = pipeline;
|
|
this.metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
this.metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
|
|
+ this.semaphore = semaphore;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -83,6 +87,7 @@ public class XceiverClientHandler extends
|
|
|
|
|
|
String key = msg.getTraceID();
|
|
String key = msg.getTraceID();
|
|
ResponseFuture response = responses.remove(key);
|
|
ResponseFuture response = responses.remove(key);
|
|
|
|
+ semaphore.release();
|
|
|
|
|
|
if (response != null) {
|
|
if (response != null) {
|
|
response.getFuture().complete(msg);
|
|
response.getFuture().complete(msg);
|
|
@@ -105,6 +110,12 @@ public class XceiverClientHandler extends
|
|
@Override
|
|
@Override
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
LOG.info("Exception in client " + cause.toString());
|
|
LOG.info("Exception in client " + cause.toString());
|
|
|
|
+ Iterator<String> keyIterator = responses.keySet().iterator();
|
|
|
|
+ while (keyIterator.hasNext()) {
|
|
|
|
+ ResponseFuture response = responses.remove(keyIterator.next());
|
|
|
|
+ response.getFuture().completeExceptionally(cause);
|
|
|
|
+ semaphore.release();
|
|
|
|
+ }
|
|
ctx.close();
|
|
ctx.close();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -133,7 +144,8 @@ public class XceiverClientHandler extends
|
|
* @return CompletableFuture
|
|
* @return CompletableFuture
|
|
*/
|
|
*/
|
|
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
|
|
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
|
|
- ContainerProtos.ContainerCommandRequestProto request) {
|
|
|
|
|
|
+ ContainerProtos.ContainerCommandRequestProto request)
|
|
|
|
+ throws InterruptedException {
|
|
|
|
|
|
// Throw an exception of request doesn't have traceId
|
|
// Throw an exception of request doesn't have traceId
|
|
if (StringUtils.isEmpty(request.getTraceID())) {
|
|
if (StringUtils.isEmpty(request.getTraceID())) {
|
|
@@ -152,6 +164,7 @@ public class XceiverClientHandler extends
|
|
= new CompletableFuture<>();
|
|
= new CompletableFuture<>();
|
|
ResponseFuture response = new ResponseFuture(future,
|
|
ResponseFuture response = new ResponseFuture(future,
|
|
Time.monotonicNowNanos());
|
|
Time.monotonicNowNanos());
|
|
|
|
+ semaphore.acquire();
|
|
ResponseFuture previous = responses.putIfAbsent(
|
|
ResponseFuture previous = responses.putIfAbsent(
|
|
request.getTraceID(), response);
|
|
request.getTraceID(), response);
|
|
if (previous != null) {
|
|
if (previous != null) {
|