|
@@ -100,9 +100,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
// Map to track commit index at every server
|
|
// Map to track commit index at every server
|
|
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
|
|
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
|
|
|
|
|
|
- // create a separate RaftClient for watchForCommit API
|
|
|
|
- private RaftClient watchClient;
|
|
|
|
-
|
|
|
|
private XceiverClientMetrics metrics;
|
|
private XceiverClientMetrics metrics;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -117,7 +114,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
this.maxOutstandingRequests = maxOutStandingChunks;
|
|
this.maxOutstandingRequests = maxOutStandingChunks;
|
|
this.retryPolicy = retryPolicy;
|
|
this.retryPolicy = retryPolicy;
|
|
commitInfoMap = new ConcurrentHashMap<>();
|
|
commitInfoMap = new ConcurrentHashMap<>();
|
|
- watchClient = null;
|
|
|
|
this.tlsConfig = tlsConfig;
|
|
this.tlsConfig = tlsConfig;
|
|
this.clientRequestTimeout = timeout;
|
|
this.clientRequestTimeout = timeout;
|
|
metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
@@ -187,9 +183,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
if (c != null) {
|
|
if (c != null) {
|
|
closeRaftClient(c);
|
|
closeRaftClient(c);
|
|
}
|
|
}
|
|
- if (watchClient != null) {
|
|
|
|
- closeRaftClient(watchClient);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void closeRaftClient(RaftClient raftClient) {
|
|
private void closeRaftClient(RaftClient raftClient) {
|
|
@@ -255,31 +248,14 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|
return clientReply;
|
|
return clientReply;
|
|
}
|
|
}
|
|
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
|
|
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
|
|
- // create a new RaftClient instance for watch request
|
|
|
|
- if (watchClient == null) {
|
|
|
|
- watchClient =
|
|
|
|
- RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
|
|
|
|
- maxOutstandingRequests, tlsConfig, clientRequestTimeout);
|
|
|
|
- }
|
|
|
|
- CompletableFuture<RaftClientReply> replyFuture = watchClient
|
|
|
|
|
|
+ CompletableFuture<RaftClientReply> replyFuture = getClient()
|
|
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
|
|
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
|
|
RaftClientReply reply;
|
|
RaftClientReply reply;
|
|
try {
|
|
try {
|
|
replyFuture.get(timeout, TimeUnit.MILLISECONDS);
|
|
replyFuture.get(timeout, TimeUnit.MILLISECONDS);
|
|
} catch (TimeoutException toe) {
|
|
} catch (TimeoutException toe) {
|
|
LOG.warn("3 way commit failed ", toe);
|
|
LOG.warn("3 way commit failed ", toe);
|
|
-
|
|
|
|
- closeRaftClient(watchClient);
|
|
|
|
- // generate a new raft client instance again so that next watch request
|
|
|
|
- // does not get blocked for the previous one
|
|
|
|
-
|
|
|
|
- // TODO : need to remove the code to create the new RaftClient instance
|
|
|
|
- // here once the watch request bypassing sliding window in Raft Client
|
|
|
|
- // gets fixed.
|
|
|
|
- watchClient =
|
|
|
|
- RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
|
|
|
|
- maxOutstandingRequests, tlsConfig, clientRequestTimeout);
|
|
|
|
- reply = watchClient
|
|
|
|
|
|
+ reply = getClient()
|
|
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
|
|
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
|
|
.get(timeout, TimeUnit.MILLISECONDS);
|
|
.get(timeout, TimeUnit.MILLISECONDS);
|
|
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
|
|
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
|