|
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.hdds.HddsUtils;
|
|
|
+import org.apache.hadoop.io.MultipleIOException;
|
|
|
+import org.apache.ratis.retry.RetryPolicy;
|
|
|
import org.apache.ratis.shaded.com.google.protobuf
|
|
|
.InvalidProtocolBufferException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -35,15 +37,17 @@ import org.apache.ratis.RatisHelper;
|
|
|
import org.apache.ratis.client.RaftClient;
|
|
|
import org.apache.ratis.protocol.RaftClientReply;
|
|
|
import org.apache.ratis.protocol.RaftGroup;
|
|
|
-import org.apache.ratis.protocol.RaftGroupId;
|
|
|
import org.apache.ratis.protocol.RaftPeer;
|
|
|
import org.apache.ratis.rpc.RpcType;
|
|
|
import org.apache.ratis.rpc.SupportedRpcType;
|
|
|
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
|
|
+import org.apache.ratis.util.CheckedBiConsumer;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
@@ -65,50 +69,48 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
|
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
|
|
|
final int maxOutstandingRequests =
|
|
|
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
|
|
|
+ final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
|
|
|
return new XceiverClientRatis(pipeline,
|
|
|
- SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
|
|
|
+ SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
|
|
|
+ retryPolicy);
|
|
|
}
|
|
|
|
|
|
private final Pipeline pipeline;
|
|
|
private final RpcType rpcType;
|
|
|
private final AtomicReference<RaftClient> client = new AtomicReference<>();
|
|
|
private final int maxOutstandingRequests;
|
|
|
+ private final RetryPolicy retryPolicy;
|
|
|
|
|
|
/**
|
|
|
* Constructs a client.
|
|
|
*/
|
|
|
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
|
|
|
- int maxOutStandingChunks) {
|
|
|
+ int maxOutStandingChunks, RetryPolicy retryPolicy) {
|
|
|
super();
|
|
|
this.pipeline = pipeline;
|
|
|
this.rpcType = rpcType;
|
|
|
this.maxOutstandingRequests = maxOutStandingChunks;
|
|
|
+ this.retryPolicy = retryPolicy;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritDoc}
|
|
|
*/
|
|
|
- public void createPipeline()
|
|
|
- throws IOException {
|
|
|
- RaftGroupId groupId = pipeline.getId().getRaftGroupID();
|
|
|
- RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
|
|
|
- LOG.debug("initializing pipeline:{} with nodes:{}",
|
|
|
- pipeline.getId(), group.getPeers());
|
|
|
- reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group);
|
|
|
+ public void createPipeline() throws IOException {
|
|
|
+ final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
|
|
|
+ LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
|
|
|
+ callRatisRpc(pipeline.getMachines(),
|
|
|
+ (raftClient, peer) -> raftClient.groupAdd(group, peer.getId()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritDoc}
|
|
|
*/
|
|
|
- public void destroyPipeline()
|
|
|
- throws IOException {
|
|
|
- RaftGroupId groupId = pipeline.getId().getRaftGroupID();
|
|
|
- RaftGroup currentGroup =
|
|
|
- RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
|
|
|
- LOG.debug("destroying pipeline:{} with nodes:{}",
|
|
|
- pipeline.getId(), currentGroup.getPeers());
|
|
|
- reinitialize(pipeline.getMachines(), currentGroup,
|
|
|
- RatisHelper.emptyRaftGroup());
|
|
|
+ public void destroyPipeline() throws IOException {
|
|
|
+ final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
|
|
|
+ LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
|
|
|
+ callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient
|
|
|
+ .groupRemove(group.getGroupId(), peer.getId()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -121,51 +123,28 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
|
return HddsProtos.ReplicationType.RATIS;
|
|
|
}
|
|
|
|
|
|
- private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup oldGroup,
|
|
|
- RaftGroup newGroup) throws IOException {
|
|
|
+ private void callRatisRpc(List<DatanodeDetails> datanodes,
|
|
|
+ CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
|
|
|
+ throws IOException {
|
|
|
if (datanodes.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- IOException exception = null;
|
|
|
- for (DatanodeDetails d : datanodes) {
|
|
|
- try {
|
|
|
- reinitialize(d, oldGroup, newGroup);
|
|
|
+ final List<IOException> exceptions =
|
|
|
+ Collections.synchronizedList(new ArrayList<>());
|
|
|
+ datanodes.parallelStream().forEach(d -> {
|
|
|
+ final RaftPeer p = RatisHelper.toRaftPeer(d);
|
|
|
+ try (RaftClient client = RatisHelper
|
|
|
+ .newRaftClient(rpcType, p, retryPolicy)) {
|
|
|
+ rpc.accept(client, p);
|
|
|
} catch (IOException ioe) {
|
|
|
- if (exception == null) {
|
|
|
- exception = new IOException(
|
|
|
- "Failed to reinitialize some of the RaftPeer(s)", ioe);
|
|
|
- } else {
|
|
|
- exception.addSuppressed(ioe);
|
|
|
- }
|
|
|
+ exceptions.add(
|
|
|
+ new IOException("Failed invoke Ratis rpc " + rpc + " for " + d,
|
|
|
+ ioe));
|
|
|
}
|
|
|
- }
|
|
|
- if (exception != null) {
|
|
|
- throw exception;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Adds a new peers to the Ratis Ring.
|
|
|
- *
|
|
|
- * @param datanode - new datanode
|
|
|
- * @param oldGroup - previous Raft group
|
|
|
- * @param newGroup - new Raft group
|
|
|
- * @throws IOException - on Failure.
|
|
|
- */
|
|
|
- private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup,
|
|
|
- RaftGroup newGroup)
|
|
|
- throws IOException {
|
|
|
- final RaftPeer p = RatisHelper.toRaftPeer(datanode);
|
|
|
- try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ?
|
|
|
- RatisHelper.newRaftClient(rpcType, p) :
|
|
|
- RatisHelper.newRaftClient(rpcType, p, oldGroup)) {
|
|
|
- client.reinitialize(newGroup, p.getId());
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ",
|
|
|
- p, datanode, ioe);
|
|
|
- throw new IOException("Failed to reinitialize RaftPeer " + p
|
|
|
- + "(datanode=" + datanode + ")", ioe);
|
|
|
+ });
|
|
|
+ if (!exceptions.isEmpty()) {
|
|
|
+ throw MultipleIOException.createIOException(exceptions);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -183,7 +162,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
|
// maxOutstandingRequests so as to set the upper bound on max no of async
|
|
|
// requests to be handled by raft client
|
|
|
if (!client.compareAndSet(null,
|
|
|
- RatisHelper.newRaftClient(rpcType, getPipeline()))) {
|
|
|
+ RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) {
|
|
|
throw new IllegalStateException("Client is already connected.");
|
|
|
}
|
|
|
}
|