|
@@ -21,6 +21,7 @@ import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URL;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
@@ -36,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
|
@@ -67,6 +69,14 @@ import com.google.protobuf.TextFormat;
|
|
|
public class QuorumJournalManager implements JournalManager {
|
|
|
static final Logger LOG = LoggerFactory.getLogger(QuorumJournalManager.class);
|
|
|
|
|
|
+ // This config is not publicly exposed
|
|
|
+ static final String QJM_RPC_MAX_TXNS_KEY =
|
|
|
+ "dfs.ha.tail-edits.qjm.rpc.max-txns";
|
|
|
+ static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;
|
|
|
+
|
|
|
+ // Maximum number of transactions to fetch at a time when using the
|
|
|
+ // RPC edit fetch mechanism
|
|
|
+ private final int maxTxnsPerRpc;
|
|
|
// Timeouts for which the QJM will wait for each of the following actions.
|
|
|
private final int startSegmentTimeoutMs;
|
|
|
private final int prepareRecoveryTimeoutMs;
|
|
@@ -125,6 +135,10 @@ public class QuorumJournalManager implements JournalManager {
|
|
|
this.nameServiceId = nameServiceId;
|
|
|
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
|
|
|
|
|
|
+ this.maxTxnsPerRpc =
|
|
|
+ conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
|
|
|
+ Preconditions.checkArgument(maxTxnsPerRpc > 0,
|
|
|
+ "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
|
|
|
// Configure timeouts.
|
|
|
this.startSegmentTimeoutMs = conf.getInt(
|
|
|
DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
|
|
@@ -478,17 +492,104 @@ public class QuorumJournalManager implements JournalManager {
|
|
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
|
long fromTxnId, boolean inProgressOk,
|
|
|
boolean onlyDurableTxns) throws IOException {
|
|
|
+ if (inProgressOk) {
|
|
|
+ LOG.info("Tailing edits starting from txn ID " + fromTxnId +
|
|
|
+ " via RPC mechanism");
|
|
|
+ try {
|
|
|
+ Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
|
|
|
+ selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
|
|
|
+ streams.addAll(rpcStreams);
|
|
|
+ return;
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
|
|
|
+ " via RPC; falling back to streaming.", ioe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ selectStreamingInputStreams(streams, fromTxnId, inProgressOk,
|
|
|
+ onlyDurableTxns);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Select input streams from the journals, specifically using the RPC
|
|
|
+ * mechanism optimized for low latency.
|
|
|
+ *
|
|
|
+ * @param streams The collection to store the return streams into.
|
|
|
+ * @param fromTxnId Select edits starting from this transaction ID
|
|
|
+ * @param onlyDurableTxns Iff true, only include transactions which have been
|
|
|
+ * committed to a quorum of the journals.
|
|
|
+ * @throws IOException Upon issues, including cache misses on the journals.
|
|
|
+ */
|
|
|
+ private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
|
|
|
+ long fromTxnId, boolean onlyDurableTxns) throws IOException {
|
|
|
+ QuorumCall<AsyncLogger, GetJournaledEditsResponseProto> q =
|
|
|
+ loggers.getJournaledEdits(fromTxnId, maxTxnsPerRpc);
|
|
|
+ Map<AsyncLogger, GetJournaledEditsResponseProto> responseMap =
|
|
|
+ loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
|
|
+ "selectRpcInputStreams");
|
|
|
+ assert responseMap.size() >= loggers.getMajoritySize() :
|
|
|
+ "Quorum call returned without a majority";
|
|
|
|
|
|
+ List<Integer> responseCounts = new ArrayList<>();
|
|
|
+ for (GetJournaledEditsResponseProto resp : responseMap.values()) {
|
|
|
+ responseCounts.add(resp.getTxnCount());
|
|
|
+ }
|
|
|
+ Collections.sort(responseCounts);
|
|
|
+ int highestTxnCount = responseCounts.get(responseCounts.size() - 1);
|
|
|
+ if (LOG.isDebugEnabled() || highestTxnCount < 0) {
|
|
|
+ StringBuilder msg = new StringBuilder("Requested edits starting from ");
|
|
|
+ msg.append(fromTxnId).append("; got ").append(responseMap.size())
|
|
|
+ .append(" responses: <");
|
|
|
+ for (Map.Entry<AsyncLogger, GetJournaledEditsResponseProto> ent :
|
|
|
+ responseMap.entrySet()) {
|
|
|
+ msg.append("[").append(ent.getKey()).append(", ")
|
|
|
+ .append(ent.getValue().getTxnCount()).append("],");
|
|
|
+ }
|
|
|
+ msg.append(">");
|
|
|
+ if (highestTxnCount < 0) {
|
|
|
+ throw new IOException("Did not get any valid JournaledEdits " +
|
|
|
+ "responses: " + msg);
|
|
|
+ } else {
|
|
|
+ LOG.debug(msg.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
|
|
|
+ responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
|
|
|
+ if (maxAllowedTxns == 0) {
|
|
|
+ LOG.debug("No new edits available in logs; requested starting from " +
|
|
|
+ "ID " + fromTxnId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LOG.info("Selected loggers with >= " + maxAllowedTxns +
|
|
|
+ " transactions starting from " + fromTxnId);
|
|
|
+ PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
|
|
|
+ JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
|
|
+ for (GetJournaledEditsResponseProto resp : responseMap.values()) {
|
|
|
+ long endTxnId = fromTxnId - 1 +
|
|
|
+ Math.min(maxAllowedTxns, resp.getTxnCount());
|
|
|
+ allStreams.add(EditLogFileInputStream.fromByteString(
|
|
|
+ resp.getEditLog(), fromTxnId, endTxnId, true));
|
|
|
+ }
|
|
|
+ JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Select input streams from the journals, specifically using the streaming
|
|
|
+ * mechanism optimized for resiliency / bulk load.
|
|
|
+ */
|
|
|
+ private void selectStreamingInputStreams(
|
|
|
+ Collection<EditLogInputStream> streams, long fromTxnId,
|
|
|
+ boolean inProgressOk, boolean onlyDurableTxns) throws IOException {
|
|
|
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
|
|
loggers.getEditLogManifest(fromTxnId, inProgressOk);
|
|
|
Map<AsyncLogger, RemoteEditLogManifest> resps =
|
|
|
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
|
|
- "selectInputStreams");
|
|
|
-
|
|
|
- LOG.debug("selectInputStream manifests:\n" +
|
|
|
+ "selectStreamingInputStreams");
|
|
|
+
|
|
|
+ LOG.debug("selectStreamingInputStream manifests:\n" +
|
|
|
Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
|
|
|
-
|
|
|
- final PriorityQueue<EditLogInputStream> allStreams =
|
|
|
+
|
|
|
+ final PriorityQueue<EditLogInputStream> allStreams =
|
|
|
new PriorityQueue<EditLogInputStream>(64,
|
|
|
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
|
|
for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
|