|
@@ -0,0 +1,417 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.qjournal.server;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.NavigableMap;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
|
|
+import org.apache.hadoop.util.AutoCloseableLock;
|
|
|
+
|
|
|
+/**
|
|
|
+ * An in-memory cache of edits in their serialized form. This is used to serve
|
|
|
+ * the {@link Journal#getJournaledEdits(long, int)} call, used by the
|
|
|
+ * QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is
|
|
|
+ * enabled.
|
|
|
+ *
|
|
|
+ * <p>When a batch of edits is received by the JournalNode, it is put into this
|
|
|
+ * cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be
|
|
|
+ * stored contiguously; if a batch of edits is stored that does not align with
|
|
|
+ * the previously stored edits, the cache will be cleared before storing new
|
|
|
+ * edits to avoid gaps. This decision is made because gaps are only handled
|
|
|
+ * when in recovery mode, which the cache is not intended to be used for.
|
|
|
+ *
|
|
|
+ * <p>Batches of edits are stored in a {@link TreeMap} mapping the starting
|
|
|
+ * transaction ID of the batch to the data buffer. Upon retrieval, the
|
|
|
+ * relevant data buffers are concatenated together and a header is added
|
|
|
+ * to construct a fully-formed edit data stream.
|
|
|
+ *
|
|
|
+ * <p>The cache is of a limited size capacity determined by
|
|
|
+ * {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity
|
|
|
+ * is exceeded after adding a new batch of edits, batches of edits are removed
|
|
|
+ * until the total size is less than the capacity, starting from the ones
|
|
|
+ * containing the oldest transactions. Transactions range in size, but a
|
|
|
+ * decent rule of thumb is that 200 bytes are needed per transaction. Monitoring
|
|
|
+ * the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended
|
|
|
+ * to determine if the cache is too small; it will indicate both how many
|
|
|
+ * cache misses occurred, and how many more transactions would have been
|
|
|
+ * needed in the cache to serve the request.
|
|
|
+ */
|
|
|
+@InterfaceAudience.Private
|
|
|
+@InterfaceStability.Evolving
|
|
|
+class JournaledEditsCache {
|
|
|
+
|
|
|
+ private static final int INVALID_LAYOUT_VERSION = 0;
|
|
|
+ private static final long INVALID_TXN_ID = -1;
|
|
|
+
|
|
|
+ /** The capacity, in bytes, of this cache. */
|
|
|
+ private final int capacity;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read/write lock pair wrapped in AutoCloseable; these refer to the same
|
|
|
+ * underlying lock.
|
|
|
+ */
|
|
|
+ private final AutoCloseableLock readLock;
|
|
|
+ private final AutoCloseableLock writeLock;
|
|
|
+
|
|
|
+ // ** Start lock-protected fields **
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stores the actual data as a mapping of the StartTxnId of a batch of edits
|
|
|
+ * to the serialized batch of edits. Stores only contiguous ranges; that is,
|
|
|
+ * the last transaction ID in one batch is always one less than the first
|
|
|
+ * transaction ID in the next batch. Though the map is protected by the lock,
|
|
|
+ * individual data buffers are immutable and can be accessed without locking.
|
|
|
+ */
|
|
|
+ private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
|
|
|
+ /** Stores the layout version currently present in the cache. */
|
|
|
+ private int layoutVersion = INVALID_LAYOUT_VERSION;
|
|
|
+ /** Stores the serialized version of the header for the current version. */
|
|
|
+ private ByteBuffer layoutHeader;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The lowest/highest transaction IDs present in the cache.
|
|
|
+ * {@value INVALID_TXN_ID} if there are no transactions in the cache.
|
|
|
+ */
|
|
|
+ private long lowestTxnId;
|
|
|
+ private long highestTxnId;
|
|
|
+ /**
|
|
|
+ * The lowest transaction ID that was ever present in the cache since last
|
|
|
+ * being reset (i.e. since initialization or since reset due to being out of
|
|
|
+ * sync with the Journal). Until the cache size goes above capacity, this is
|
|
|
+ * equal to lowestTxnId.
|
|
|
+ */
|
|
|
+ private long initialTxnId;
|
|
|
+ /** The current total size of all buffers in this cache. */
|
|
|
+ private int totalSize;
|
|
|
+
|
|
|
+ // ** End lock-protected fields **
|
|
|
+
|
|
|
+ JournaledEditsCache(Configuration conf) {
|
|
|
+ capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
|
|
+ DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT);
|
|
|
+ if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
|
|
|
+ Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
|
|
|
+ "maximum JVM memory is only %d bytes. It is recommended that you " +
|
|
|
+ "decrease the cache size or increase the heap size.",
|
|
|
+ capacity, Runtime.getRuntime().maxMemory()));
|
|
|
+ }
|
|
|
+ Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
|
|
|
+ "of bytes: " + capacity);
|
|
|
+ ReadWriteLock lock = new ReentrantReadWriteLock(true);
|
|
|
+ readLock = new AutoCloseableLock(lock.readLock());
|
|
|
+ writeLock = new AutoCloseableLock(lock.writeLock());
|
|
|
+ initialize(INVALID_TXN_ID);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fetch the data for edits starting at the specific transaction ID, fetching
|
|
|
+ * up to {@code maxTxns} transactions. Populates a list of output buffers
|
|
|
+ * which contains a serialized version of the edits, and returns the count of
|
|
|
+ * edits contained within the serialized buffers. The serialized edits are
|
|
|
+ * prefixed with a standard edit log header containing information about the
|
|
|
+ * layout version. The transactions returned are guaranteed to have contiguous
|
|
|
+ * transaction IDs.
|
|
|
+ *
|
|
|
+ * If {@code requestedStartTxn} is higher than the highest transaction which
|
|
|
+ * has been added to this cache, a response with an empty buffer and a
|
|
|
+ * transaction count of 0 will be returned. If {@code requestedStartTxn} is
|
|
|
+ * lower than the lowest transaction currently contained in this cache, or no
|
|
|
+ * transactions have yet been added to the cache, an exception will be thrown.
|
|
|
+ *
|
|
|
+ * @param requestedStartTxn The ID of the first transaction to return. If any
|
|
|
+ * transactions are returned, it is guaranteed that
|
|
|
+ * the first one will have this ID.
|
|
|
+ * @param maxTxns The maximum number of transactions to return.
|
|
|
+ * @param outputBuffers A list to populate with output buffers. When
|
|
|
+ * concatenated, these form a full response.
|
|
|
+ * @return The number of transactions contained within the set of output
|
|
|
+ * buffers.
|
|
|
+ * @throws IOException If transactions are requested which cannot be served
|
|
|
+ * by this cache.
|
|
|
+ */
|
|
|
+ int retrieveEdits(long requestedStartTxn, int maxTxns,
|
|
|
+ List<ByteBuffer> outputBuffers) throws IOException {
|
|
|
+ int txnCount = 0;
|
|
|
+
|
|
|
+ try (AutoCloseableLock l = readLock.acquire()) {
|
|
|
+ if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
|
|
|
+ throw getCacheMissException(requestedStartTxn);
|
|
|
+ } else if (requestedStartTxn > highestTxnId) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ outputBuffers.add(layoutHeader);
|
|
|
+ Iterator<Map.Entry<Long, byte[]>> incrBuffIter =
|
|
|
+ dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true)
|
|
|
+ .entrySet().iterator();
|
|
|
+ long prevTxn = requestedStartTxn;
|
|
|
+ byte[] prevBuf = null;
|
|
|
+ // Stop when maximum transactions reached...
|
|
|
+ while ((txnCount < maxTxns) &&
|
|
|
+ // ... or there are no more entries ...
|
|
|
+ (incrBuffIter.hasNext() || prevBuf != null)) {
|
|
|
+ long currTxn;
|
|
|
+ byte[] currBuf;
|
|
|
+ if (incrBuffIter.hasNext()) {
|
|
|
+ Map.Entry<Long, byte[]> ent = incrBuffIter.next();
|
|
|
+ currTxn = ent.getKey();
|
|
|
+ currBuf = ent.getValue();
|
|
|
+ } else {
|
|
|
+ // This accounts for the trailing entry
|
|
|
+ currTxn = highestTxnId + 1;
|
|
|
+ currBuf = null;
|
|
|
+ }
|
|
|
+ if (prevBuf != null) { // True except for the first loop iteration
|
|
|
+ outputBuffers.add(ByteBuffer.wrap(prevBuf));
|
|
|
+ // if prevTxn < requestedStartTxn, the extra transactions will get
|
|
|
+ // removed after the loop, so don't include them in the txn count
|
|
|
+ txnCount += currTxn - Math.max(requestedStartTxn, prevTxn);
|
|
|
+ }
|
|
|
+ prevTxn = currTxn;
|
|
|
+ prevBuf = currBuf;
|
|
|
+ }
|
|
|
+ // Release the lock before doing operations on the buffers (deserializing
|
|
|
+ // to find transaction boundaries, and copying into an output buffer)
|
|
|
+ }
|
|
|
+ // Remove extra leading transactions in the first buffer
|
|
|
+ ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header
|
|
|
+ firstBuf.position(
|
|
|
+ findTransactionPosition(firstBuf.array(), requestedStartTxn));
|
|
|
+ // Remove trailing transactions in the last buffer if necessary
|
|
|
+ if (txnCount > maxTxns) {
|
|
|
+ ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1);
|
|
|
+ int limit =
|
|
|
+ findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns);
|
|
|
+ lastBuf.limit(limit);
|
|
|
+ txnCount = maxTxns;
|
|
|
+ }
|
|
|
+
|
|
|
+ return txnCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Store a batch of serialized edits into this cache. Removes old batches
|
|
|
+ * as necessary to keep the total size of the cache below the capacity.
|
|
|
+ * See the class Javadoc for more info.
|
|
|
+ *
|
|
|
+ * This attempts to always handle malformed inputs gracefully rather than
|
|
|
+ * throwing an exception, to allow the rest of the Journal's operations
|
|
|
+ * to proceed normally.
|
|
|
+ *
|
|
|
+ * @param inputData A buffer containing edits in serialized form
|
|
|
+ * @param newStartTxn The txn ID of the first edit in {@code inputData}
|
|
|
+ * @param newEndTxn The txn ID of the last edit in {@code inputData}
|
|
|
+ * @param newLayoutVersion The version of the layout used to serialize
|
|
|
+ * the edits
|
|
|
+ */
|
|
|
+ void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn,
|
|
|
+ int newLayoutVersion) {
|
|
|
+ if (newStartTxn < 0 || newEndTxn < newStartTxn) {
|
|
|
+ Journal.LOG.error(String.format("Attempted to cache data of length %d " +
|
|
|
+ "with newStartTxn %d and newEndTxn %d",
|
|
|
+ inputData.length, newStartTxn, newEndTxn));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try (AutoCloseableLock l = writeLock.acquire()) {
|
|
|
+ if (newLayoutVersion != layoutVersion) {
|
|
|
+ try {
|
|
|
+ updateLayoutVersion(newLayoutVersion, newStartTxn);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " +
|
|
|
+ "due to exception when updating to new layout version %d",
|
|
|
+ newStartTxn, newEndTxn, newLayoutVersion), ioe);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else if (lowestTxnId == INVALID_TXN_ID) {
|
|
|
+ Journal.LOG.info("Initializing edits cache starting from txn ID " +
|
|
|
+ newStartTxn);
|
|
|
+ initialize(newStartTxn);
|
|
|
+ } else if (highestTxnId + 1 != newStartTxn) {
|
|
|
+ // Cache is out of sync; clear to avoid storing noncontiguous regions
|
|
|
+ Journal.LOG.error(String.format("Edits cache is out of sync; " +
|
|
|
+ "looked for next txn id at %d but got start txn id for " +
|
|
|
+ "cache put request at %d. Reinitializing at new request.",
|
|
|
+ highestTxnId + 1, newStartTxn));
|
|
|
+ initialize(newStartTxn);
|
|
|
+ }
|
|
|
+
|
|
|
+ while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) {
|
|
|
+ Map.Entry<Long, byte[]> lowest = dataMap.firstEntry();
|
|
|
+ dataMap.remove(lowest.getKey());
|
|
|
+ totalSize -= lowest.getValue().length;
|
|
|
+ }
|
|
|
+ if (inputData.length > capacity) {
|
|
|
+ initialize(INVALID_TXN_ID);
|
|
|
+ Journal.LOG.warn(String.format("A single batch of edits was too " +
|
|
|
+ "large to fit into the cache: startTxn = %d, endTxn = %d, " +
|
|
|
+ "input length = %d. The capacity of the cache (%s) must be " +
|
|
|
+ "increased for it to work properly (current capacity %d)." +
|
|
|
+ "Cache is now empty.",
|
|
|
+ newStartTxn, newEndTxn, inputData.length,
|
|
|
+ DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (dataMap.isEmpty()) {
|
|
|
+ lowestTxnId = newStartTxn;
|
|
|
+ } else {
|
|
|
+ lowestTxnId = dataMap.firstKey();
|
|
|
+ }
|
|
|
+
|
|
|
+ dataMap.put(newStartTxn, inputData);
|
|
|
+ highestTxnId = newEndTxn;
|
|
|
+ totalSize += inputData.length;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Skip through a given stream of edits until the given transaction ID is
|
|
|
+ * found. Return the number of bytes that appear prior to the given
|
|
|
+ * transaction.
|
|
|
+ *
|
|
|
+ * @param buf A buffer containing a stream of serialized edits
|
|
|
+ * @param txnId The transaction ID to search for
|
|
|
+ * @return The number of bytes appearing in {@code buf} <i>before</i>
|
|
|
+ * the start of the transaction with ID {@code txnId}.
|
|
|
+ */
|
|
|
+ private int findTransactionPosition(byte[] buf, long txnId)
|
|
|
+ throws IOException {
|
|
|
+ ByteArrayInputStream bais = new ByteArrayInputStream(buf);
|
|
|
+ FSEditLogLoader.PositionTrackingInputStream tracker =
|
|
|
+ new FSEditLogLoader.PositionTrackingInputStream(bais);
|
|
|
+ FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(
|
|
|
+ new DataInputStream(tracker), tracker, layoutVersion);
|
|
|
+ long previousPos = 0;
|
|
|
+ while (reader.scanOp() < txnId) {
|
|
|
+ previousPos = tracker.getPos();
|
|
|
+ }
|
|
|
+ // tracker is backed by a byte[]; position cannot go above an integer
|
|
|
+ return (int) previousPos;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Update the layout version of the cache. This clears out all existing
|
|
|
+ * entries, and populates the new layout version and header for that version.
|
|
|
+ *
|
|
|
+ * @param newLayoutVersion The new layout version to be stored in the cache
|
|
|
+ * @param newStartTxn The new lowest transaction in the cache
|
|
|
+ */
|
|
|
+ private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
|
|
|
+ throws IOException {
|
|
|
+ StringBuilder logMsg = new StringBuilder()
|
|
|
+ .append("Updating edits cache to use layout version ")
|
|
|
+ .append(newLayoutVersion)
|
|
|
+ .append(" starting from txn ID ")
|
|
|
+ .append(newStartTxn);
|
|
|
+ if (layoutVersion != INVALID_LAYOUT_VERSION) {
|
|
|
+ logMsg.append("; previous version was ").append(layoutVersion)
|
|
|
+ .append("; old entries will be cleared.");
|
|
|
+ }
|
|
|
+ Journal.LOG.info(logMsg.toString());
|
|
|
+ initialize(newStartTxn);
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ EditLogFileOutputStream.writeHeader(newLayoutVersion,
|
|
|
+ new DataOutputStream(baos));
|
|
|
+ layoutVersion = newLayoutVersion;
|
|
|
+ layoutHeader = ByteBuffer.wrap(baos.toByteArray());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize the cache back to a clear state.
|
|
|
+ *
|
|
|
+ * @param newInitialTxnId The new lowest transaction ID stored in the cache.
|
|
|
+ * This should be {@value INVALID_TXN_ID} if the cache
|
|
|
+ * is to remain empty at this time.
|
|
|
+ */
|
|
|
+ private void initialize(long newInitialTxnId) {
|
|
|
+ dataMap.clear();
|
|
|
+ totalSize = 0;
|
|
|
+ initialTxnId = newInitialTxnId;
|
|
|
+ lowestTxnId = initialTxnId;
|
|
|
+ highestTxnId = INVALID_TXN_ID; // this will be set later
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the underlying data buffer used to store information about the
|
|
|
+ * given transaction ID.
|
|
|
+ *
|
|
|
+ * @param txnId Transaction ID whose containing buffer should be fetched.
|
|
|
+ * @return The data buffer for the transaction
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ byte[] getRawDataForTests(long txnId) {
|
|
|
+ try (AutoCloseableLock l = readLock.acquire()) {
|
|
|
+ return dataMap.floorEntry(txnId).getValue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private CacheMissException getCacheMissException(long requestedTxnId) {
|
|
|
+ if (lowestTxnId == INVALID_TXN_ID) {
|
|
|
+ return new CacheMissException(0, "Cache is empty; either it was never " +
|
|
|
+ "written to or the last write overflowed the cache capacity.");
|
|
|
+ } else if (requestedTxnId < initialTxnId) {
|
|
|
+ return new CacheMissException(initialTxnId - requestedTxnId,
|
|
|
+ "Cache started at txn ID %d but requested txns starting at %d.",
|
|
|
+ initialTxnId, requestedTxnId);
|
|
|
+ } else {
|
|
|
+ return new CacheMissException(lowestTxnId - requestedTxnId,
|
|
|
+ "Oldest txn ID available in the cache is %d, but requested txns " +
|
|
|
+ "starting at %d. The cache size (%s) may need to be increased " +
|
|
|
+ "to hold more transactions (currently %d bytes containing %d " +
|
|
|
+ "transactions)", lowestTxnId, requestedTxnId,
|
|
|
+ DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity,
|
|
|
+ highestTxnId - lowestTxnId + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class CacheMissException extends IOException {
|
|
|
+
|
|
|
+ private static final long serialVersionUID = 0L;
|
|
|
+
|
|
|
+ private final long cacheMissAmount;
|
|
|
+
|
|
|
+ CacheMissException(long cacheMissAmount, String msgFormat,
|
|
|
+ Object... msgArgs) {
|
|
|
+ super(String.format(msgFormat, msgArgs));
|
|
|
+ this.cacheMissAmount = cacheMissAmount;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getCacheMissAmount() {
|
|
|
+ return cacheMissAmount;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|