|
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
|
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
|
|
@@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
+import java.util.UUID;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
|
|
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
|
|
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
|
|
private final Pipeline pipeline;
|
|
private final Pipeline pipeline;
|
|
private final Configuration config;
|
|
private final Configuration config;
|
|
- private XceiverClientProtocolServiceStub asyncStub;
|
|
|
|
|
|
+ private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
|
|
private XceiverClientMetrics metrics;
|
|
private XceiverClientMetrics metrics;
|
|
- private ManagedChannel channel;
|
|
|
|
|
|
+ private Map<UUID, ManagedChannel> channels;
|
|
private final Semaphore semaphore;
|
|
private final Semaphore semaphore;
|
|
private boolean closed = false;
|
|
private boolean closed = false;
|
|
|
|
|
|
@@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi {
|
|
this.semaphore =
|
|
this.semaphore =
|
|
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
|
|
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
|
|
this.metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
this.metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
|
|
+ this.channels = new HashMap<>();
|
|
|
|
+ this.asyncStubs = new HashMap<>();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void connect() throws Exception {
|
|
public void connect() throws Exception {
|
|
|
|
+
|
|
|
|
+ // leader by default is the 1st datanode in the datanode list of pipleline
|
|
DatanodeDetails leader = this.pipeline.getLeader();
|
|
DatanodeDetails leader = this.pipeline.getLeader();
|
|
|
|
+ // just make a connection to the 1st datanode at the beginning
|
|
|
|
+ connectToDatanode(leader);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ private void connectToDatanode(DatanodeDetails dn) {
|
|
// read port from the data node, on failure use default configured
|
|
// read port from the data node, on failure use default configured
|
|
// port.
|
|
// port.
|
|
- int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
|
|
|
|
|
|
+ int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
|
|
if (port == 0) {
|
|
if (port == 0) {
|
|
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
|
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
|
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
|
|
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
|
|
}
|
|
}
|
|
- LOG.debug("Connecting to server Port : " + leader.getIpAddress());
|
|
|
|
- channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
|
|
|
|
- .usePlaintext()
|
|
|
|
- .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
|
|
|
|
- .build();
|
|
|
|
- asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
|
|
|
|
|
|
+ LOG.debug("Connecting to server Port : " + dn.getIpAddress());
|
|
|
|
+ ManagedChannel channel =
|
|
|
|
+ NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
|
|
|
|
+ .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
|
|
|
|
+ .build();
|
|
|
|
+ XceiverClientProtocolServiceStub asyncStub =
|
|
|
|
+ XceiverClientProtocolServiceGrpc.newStub(channel);
|
|
|
|
+ asyncStubs.put(dn.getUuid(), asyncStub);
|
|
|
|
+ channels.put(dn.getUuid(), channel);
|
|
}
|
|
}
|
|
-
|
|
|
|
/**
|
|
/**
|
|
- * Returns if the xceiver client connects to a server.
|
|
|
|
|
|
+ * Returns if the xceiver client connects to all servers in the pipeline.
|
|
*
|
|
*
|
|
* @return True if the connection is alive, false otherwise.
|
|
* @return True if the connection is alive, false otherwise.
|
|
*/
|
|
*/
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public boolean isConnected() {
|
|
|
|
- return !channel.isTerminated() && !channel.isShutdown();
|
|
|
|
|
|
+ public boolean isConnected(DatanodeDetails details) {
|
|
|
|
+ return isConnected(channels.get(details.getUuid()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean isConnected(ManagedChannel channel) {
|
|
|
|
+ return channel != null && !channel.isTerminated() && !channel.isShutdown();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void close() {
|
|
public void close() {
|
|
closed = true;
|
|
closed = true;
|
|
- channel.shutdownNow();
|
|
|
|
- try {
|
|
|
|
- channel.awaitTermination(60, TimeUnit.MINUTES);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.error("Unexpected exception while waiting for channel termination",
|
|
|
|
- e);
|
|
|
|
|
|
+ for (ManagedChannel channel : channels.values()) {
|
|
|
|
+ channel.shutdownNow();
|
|
|
|
+ try {
|
|
|
|
+ channel.awaitTermination(60, TimeUnit.MINUTES);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.error("Unexpected exception while waiting for channel termination",
|
|
|
|
+ e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -120,6 +140,51 @@ public class XceiverClientGrpc extends XceiverClientSpi {
|
|
return pipeline;
|
|
return pipeline;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public ContainerCommandResponseProto sendCommand(
|
|
|
|
+ ContainerCommandRequestProto request) throws IOException {
|
|
|
|
+ int size = pipeline.getMachines().size();
|
|
|
|
+ ContainerCommandResponseProto responseProto = null;
|
|
|
|
+ int dnIndex = 0;
|
|
|
|
+
|
|
|
|
+ // In case of an exception or an error, we will try to read from the
|
|
|
|
+ // datanodes in the pipeline in a round robin fashion.
|
|
|
|
+
|
|
|
|
+ // TODO: cache the correct leader info in here, so that any subsequent calls
|
|
|
|
+ // should first go to leader
|
|
|
|
+ for (DatanodeDetails dn : pipeline.getMachines()) {
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ // In case the command gets retried on a 2nd datanode,
|
|
|
|
+ // sendCommandAsyncCall will create a new channel and async stub
|
|
|
|
+ // in case these don't exist for the specific datanode.
|
|
|
|
+ responseProto =
|
|
|
|
+ sendCommandAsync(request, dn).get();
|
|
|
|
+ dnIndex++;
|
|
|
|
+ if (responseProto.getResult() == ContainerProtos.Result.SUCCESS
|
|
|
|
+ || dnIndex == size) {
|
|
|
|
+ return responseProto;
|
|
|
|
+ }
|
|
|
|
+ } catch (ExecutionException | InterruptedException e) {
|
|
|
|
+ if (dnIndex < size) {
|
|
|
|
+ LOG.warn(
|
|
|
|
+ "Failed to execute command " + request + " on datanode " + dn
|
|
|
|
+ .getUuidString() +". Retrying", e);
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException("Failed to execute command " + request, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return responseProto;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // TODO: for a true async API, once the waitable future while executing
|
|
|
|
+ // the command on one channel fails, it should be retried asynchronously
|
|
|
|
+ // on the future Task for all the remaining datanodes.
|
|
|
|
+
|
|
|
|
+ // Note: this Async api is not used currently used in any active I/O path.
|
|
|
|
+ // In case it gets used, the asynchronous retry logic needs to be plugged
|
|
|
|
+ // in here.
|
|
/**
|
|
/**
|
|
* Sends a given command to server gets a waitable future back.
|
|
* Sends a given command to server gets a waitable future back.
|
|
*
|
|
*
|
|
@@ -128,15 +193,25 @@ public class XceiverClientGrpc extends XceiverClientSpi {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public CompletableFuture<ContainerCommandResponseProto>
|
|
|
|
- sendCommandAsync(ContainerCommandRequestProto request)
|
|
|
|
|
|
+ public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
|
|
|
|
+ ContainerCommandRequestProto request)
|
|
|
|
+ throws IOException, ExecutionException, InterruptedException {
|
|
|
|
+ return sendCommandAsync(request, pipeline.getLeader());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
|
|
|
|
+ ContainerCommandRequestProto request, DatanodeDetails dn)
|
|
throws IOException, ExecutionException, InterruptedException {
|
|
throws IOException, ExecutionException, InterruptedException {
|
|
- if(closed){
|
|
|
|
|
|
+ if (closed) {
|
|
throw new IOException("This channel is not connected.");
|
|
throw new IOException("This channel is not connected.");
|
|
}
|
|
}
|
|
|
|
|
|
- if(channel == null || !isConnected()) {
|
|
|
|
- reconnect();
|
|
|
|
|
|
+ UUID dnId = dn.getUuid();
|
|
|
|
+ ManagedChannel channel = channels.get(dnId);
|
|
|
|
+ // If the channel doesn't exist for this specific datanode or the channel
|
|
|
|
+ // is closed, just reconnect
|
|
|
|
+ if (!isConnected(channel)) {
|
|
|
|
+ reconnect(dn);
|
|
}
|
|
}
|
|
|
|
|
|
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
|
|
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
|
|
@@ -145,48 +220,54 @@ public class XceiverClientGrpc extends XceiverClientSpi {
|
|
long requestTime = Time.monotonicNowNanos();
|
|
long requestTime = Time.monotonicNowNanos();
|
|
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
|
|
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
|
|
// create a new grpc stream for each non-async call.
|
|
// create a new grpc stream for each non-async call.
|
|
|
|
+
|
|
|
|
+ // TODO: for async calls, we should reuse StreamObserver resources.
|
|
final StreamObserver<ContainerCommandRequestProto> requestObserver =
|
|
final StreamObserver<ContainerCommandRequestProto> requestObserver =
|
|
- asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
|
|
|
|
- @Override
|
|
|
|
- public void onNext(ContainerCommandResponseProto value) {
|
|
|
|
- replyFuture.complete(value);
|
|
|
|
- metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
|
|
|
- metrics.addContainerOpsLatency(request.getCmdType(),
|
|
|
|
- Time.monotonicNowNanos() - requestTime);
|
|
|
|
- semaphore.release();
|
|
|
|
- }
|
|
|
|
- @Override
|
|
|
|
- public void onError(Throwable t) {
|
|
|
|
- replyFuture.completeExceptionally(t);
|
|
|
|
- metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
|
|
|
- metrics.addContainerOpsLatency(request.getCmdType(),
|
|
|
|
- Time.monotonicNowNanos() - requestTime);
|
|
|
|
- semaphore.release();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void onCompleted() {
|
|
|
|
- if (!replyFuture.isDone()) {
|
|
|
|
- replyFuture.completeExceptionally(
|
|
|
|
- new IOException("Stream completed but no reply for request "
|
|
|
|
- + request));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ asyncStubs.get(dnId)
|
|
|
|
+ .send(new StreamObserver<ContainerCommandResponseProto>() {
|
|
|
|
+ @Override
|
|
|
|
+ public void onNext(ContainerCommandResponseProto value) {
|
|
|
|
+ replyFuture.complete(value);
|
|
|
|
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
|
|
|
+ metrics.addContainerOpsLatency(request.getCmdType(),
|
|
|
|
+ Time.monotonicNowNanos() - requestTime);
|
|
|
|
+ semaphore.release();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onError(Throwable t) {
|
|
|
|
+ replyFuture.completeExceptionally(t);
|
|
|
|
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
|
|
|
|
+ metrics.addContainerOpsLatency(request.getCmdType(),
|
|
|
|
+ Time.monotonicNowNanos() - requestTime);
|
|
|
|
+ semaphore.release();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onCompleted() {
|
|
|
|
+ if (!replyFuture.isDone()) {
|
|
|
|
+ replyFuture.completeExceptionally(new IOException(
|
|
|
|
+ "Stream completed but no reply for request " + request));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
requestObserver.onNext(request);
|
|
requestObserver.onNext(request);
|
|
requestObserver.onCompleted();
|
|
requestObserver.onCompleted();
|
|
return replyFuture;
|
|
return replyFuture;
|
|
}
|
|
}
|
|
|
|
|
|
- private void reconnect() throws IOException {
|
|
|
|
|
|
+ private void reconnect(DatanodeDetails dn)
|
|
|
|
+ throws IOException {
|
|
|
|
+ ManagedChannel channel;
|
|
try {
|
|
try {
|
|
- connect();
|
|
|
|
|
|
+ connectToDatanode(dn);
|
|
|
|
+ channel = channels.get(dn.getUuid());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.error("Error while connecting: ", e);
|
|
LOG.error("Error while connecting: ", e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
|
|
|
|
- if (channel == null || !isConnected()) {
|
|
|
|
|
|
+ if (channel == null || !isConnected(channel)) {
|
|
throw new IOException("This channel is not connected.");
|
|
throw new IOException("This channel is not connected.");
|
|
}
|
|
}
|
|
}
|
|
}
|