|
@@ -0,0 +1,290 @@
|
|
|
|
+/*
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing,
|
|
|
|
+ * software distributed under the License is distributed on an
|
|
|
|
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
+ * KIND, either express or implied. See the License for the
|
|
|
|
+ * specific language governing permissions and limitations
|
|
|
|
+ * under the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+package org.apache.hadoop.fs.impl;
|
|
|
|
+
|
|
|
|
+import java.nio.ByteBuffer;
|
|
|
|
+import java.util.IdentityHashMap;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
+
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
|
+import org.apache.hadoop.io.ByteBufferPool;
|
|
|
|
+
|
|
|
|
+import static java.lang.System.identityHashCode;
|
|
|
|
+import static java.util.Objects.requireNonNull;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers
|
|
|
|
+ * are released.
|
|
|
|
+ * <p>
|
|
|
|
+ * It throws the related exception at {@link #close()} if any buffer remains un-released.
|
|
|
|
+ * It also clears the buffers at release so if they continued being used it'll generate errors.
|
|
|
|
+ * <p>
|
|
|
|
+ * To be used for testing..
|
|
|
|
+ * <p>
|
|
|
|
+ * The stacktraces of the allocation are not stored by default because
|
|
|
|
+ * it can significantly decrease the unit test performance.
|
|
|
|
+ * Configuring this class to log at DEBUG will trigger their collection.
|
|
|
|
+ * @see ByteBufferAllocationStacktraceException
|
|
|
|
+ * <p>
|
|
|
|
+ * Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}.
|
|
|
|
+ */
|
|
|
|
+@VisibleForTesting
|
|
|
|
+public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable {
|
|
|
|
+
|
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class);
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Wrap an existing allocator with this tracking allocator.
|
|
|
|
+ * @param allocator allocator to wrap.
|
|
|
|
+ * @return a new allocator.
|
|
|
|
+ */
|
|
|
|
+ public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
|
|
|
|
+ return new TrackingByteBufferPool(allocator);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static class LeakDetectorHeapByteBufferPoolException
|
|
|
|
+ extends RuntimeException {
|
|
|
|
+
|
|
|
|
+ private LeakDetectorHeapByteBufferPoolException(String msg) {
|
|
|
|
+ super(msg);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) {
|
|
|
|
+ super(msg, cause);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private LeakDetectorHeapByteBufferPoolException(
|
|
|
|
+ String message,
|
|
|
|
+ Throwable cause,
|
|
|
|
+ boolean enableSuppression,
|
|
|
|
+ boolean writableStackTrace) {
|
|
|
|
+ super(message, cause, enableSuppression, writableStackTrace);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Strack trace of allocation as saved in the tracking map.
|
|
|
|
+ */
|
|
|
|
+ public static final class ByteBufferAllocationStacktraceException
|
|
|
|
+ extends LeakDetectorHeapByteBufferPoolException {
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Single stack trace instance to use when DEBUG is not enabled.
|
|
|
|
+ */
|
|
|
|
+ private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE =
|
|
|
|
+ new ByteBufferAllocationStacktraceException(false);
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create a stack trace for the map, either using the shared static one
|
|
|
|
+ * or a dynamically created one.
|
|
|
|
+ * @return a stack
|
|
|
|
+ */
|
|
|
|
+ private static ByteBufferAllocationStacktraceException create() {
|
|
|
|
+ return LOG.isDebugEnabled()
|
|
|
|
+ ? new ByteBufferAllocationStacktraceException()
|
|
|
|
+ : WITHOUT_STACKTRACE;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private ByteBufferAllocationStacktraceException() {
|
|
|
|
+ super("Allocation stacktrace of the first ByteBuffer:");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
|
|
|
|
+ * telling develoers how to see a trace per buffer.
|
|
|
|
+ */
|
|
|
|
+ private ByteBufferAllocationStacktraceException(boolean unused) {
|
|
|
|
+ super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces",
|
|
|
|
+ null,
|
|
|
|
+ false,
|
|
|
|
+ false);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the
|
|
|
|
+ * buffer to release was not in the hash map.
|
|
|
|
+ */
|
|
|
|
+ public static final class ReleasingUnallocatedByteBufferException
|
|
|
|
+ extends LeakDetectorHeapByteBufferPoolException {
|
|
|
|
+
|
|
|
|
+ private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
|
|
|
|
+ super(String.format("Releasing a ByteBuffer instance that is not allocated"
|
|
|
|
+ + " by this buffer pool or already been released: %s size %d; hash code %s",
|
|
|
|
+ b, b.capacity(), identityHashCode(b)));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Exception raised in {@link TrackingByteBufferPool#close()} if there
|
|
|
|
+ * was an unreleased buffer.
|
|
|
|
+ */
|
|
|
|
+ public static final class LeakedByteBufferException
|
|
|
|
+ extends LeakDetectorHeapByteBufferPoolException {
|
|
|
|
+
|
|
|
|
+ private final int count;
|
|
|
|
+
|
|
|
|
+ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) {
|
|
|
|
+ super(count + " ByteBuffer object(s) is/are remained unreleased"
|
|
|
|
+ + " after closing this buffer pool.", e);
|
|
|
|
+ this.count = count;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the number of unreleased buffers.
|
|
|
|
+ * @return number of unreleased buffers
|
|
|
|
+ */
|
|
|
|
+ public int getCount() {
|
|
|
|
+ return count;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Tracker of allocations.
|
|
|
|
+ * <p>
|
|
|
|
+ * The key maps by the object id of the buffer, and refers to either a common stack trace
|
|
|
|
+ * or one dynamically created for each allocation.
|
|
|
|
+ */
|
|
|
|
+ private final Map<ByteBuffer, ByteBufferAllocationStacktraceException> allocated =
|
|
|
|
+ new IdentityHashMap<>();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Wrapped buffer pool.
|
|
|
|
+ */
|
|
|
|
+ private final ByteBufferPool allocator;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Number of buffer allocations.
|
|
|
|
+ * <p>
|
|
|
|
+ * This is incremented in {@link #getBuffer(boolean, int)}.
|
|
|
|
+ */
|
|
|
|
+ private final AtomicInteger bufferAllocations = new AtomicInteger();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Number of buffer releases.
|
|
|
|
+ * <p>
|
|
|
|
+ * This is incremented in {@link #putBuffer(ByteBuffer)}.
|
|
|
|
+ */
|
|
|
|
+ private final AtomicInteger bufferReleases = new AtomicInteger();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * private constructor.
|
|
|
|
+ * @param allocator pool allocator.
|
|
|
|
+ */
|
|
|
|
+ private TrackingByteBufferPool(ByteBufferPool allocator) {
|
|
|
|
+ this.allocator = allocator;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int getBufferAllocations() {
|
|
|
|
+ return bufferAllocations.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int getBufferReleases() {
|
|
|
|
+ return bufferReleases.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get a buffer from the pool.
|
|
|
|
+ * <p>
|
|
|
|
+ * This increments the {@link #bufferAllocations} counter and stores the
|
|
|
|
+ * singleron or local allocation stack trace in the {@link #allocated} map.
|
|
|
|
+ * @param direct whether to allocate a direct buffer or not
|
|
|
|
+ * @param size size of the buffer to allocate
|
|
|
|
+ * @return a ByteBuffer instance
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized ByteBuffer getBuffer(final boolean direct, final int size) {
|
|
|
|
+ bufferAllocations.incrementAndGet();
|
|
|
|
+ ByteBuffer buffer = allocator.getBuffer(direct, size);
|
|
|
|
+ final ByteBufferAllocationStacktraceException ex =
|
|
|
|
+ ByteBufferAllocationStacktraceException.create();
|
|
|
|
+ allocated.put(buffer, ex);
|
|
|
|
+ LOG.debug("Creating ByteBuffer:{} size {} {}",
|
|
|
|
+ identityHashCode(buffer), size, buffer, ex);
|
|
|
|
+ return buffer;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Release a buffer back to the pool.
|
|
|
|
+ * <p>
|
|
|
|
+ * This increments the {@link #bufferReleases} counter and removes the
|
|
|
|
+ * buffer from the {@link #allocated} map.
|
|
|
|
+ * <p>
|
|
|
|
+ * If the buffer was not allocated by this pool, it throws
|
|
|
|
+ * {@link ReleasingUnallocatedByteBufferException}.
|
|
|
|
+ *
|
|
|
|
+ * @param buffer buffer to release
|
|
|
|
+ * @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void putBuffer(ByteBuffer buffer)
|
|
|
|
+ throws ReleasingUnallocatedByteBufferException {
|
|
|
|
+
|
|
|
|
+ bufferReleases.incrementAndGet();
|
|
|
|
+ requireNonNull(buffer);
|
|
|
|
+ LOG.debug("Releasing ByteBuffer: {}: {}", identityHashCode(buffer), buffer);
|
|
|
|
+ if (allocated.remove(buffer) == null) {
|
|
|
|
+ throw new ReleasingUnallocatedByteBufferException(buffer);
|
|
|
|
+ }
|
|
|
|
+ allocator.putBuffer(buffer);
|
|
|
|
+ // Clearing the buffer so subsequent access would probably generate errors
|
|
|
|
+ buffer.clear();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check if the buffer is in the pool.
|
|
|
|
+ * @param buffer buffer
|
|
|
|
+ * @return true if the buffer is in the pool
|
|
|
|
+ */
|
|
|
|
+ public boolean containsBuffer(ByteBuffer buffer) {
|
|
|
|
+ return allocated.containsKey(requireNonNull(buffer));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the number of allocated buffers.
|
|
|
|
+ * @return number of allocated buffers
|
|
|
|
+ */
|
|
|
|
+ public int size() {
|
|
|
|
+ return allocated.size();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Expect all buffers to be released -if not, log unreleased ones
|
|
|
|
+ * and then raise an exception with the stack trace of the first
|
|
|
|
+ * unreleased buffer.
|
|
|
|
+ * @throws LeakedByteBufferException if at least one buffer was not released
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void close() throws LeakedByteBufferException {
|
|
|
|
+ if (!allocated.isEmpty()) {
|
|
|
|
+ allocated.keySet().forEach(buffer ->
|
|
|
|
+ LOG.warn("Unreleased ByteBuffer {}; {}", identityHashCode(buffer), buffer));
|
|
|
|
+ LeakedByteBufferException ex = new LeakedByteBufferException(
|
|
|
|
+ allocated.size(),
|
|
|
|
+ allocated.values().iterator().next());
|
|
|
|
+ allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|