Browse Source

add missing file for HDFS-5950

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1573434 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 11 years ago
parent
commit
6465b0b55f

+ 332 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java

@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
+
+import java.io.Closeable;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.ShortCircuitShm;
+import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+
+/*
+ * Manages client short-circuit memory segments on the DataNode.
+ *
+ * DFSClients request shared memory segments from the DataNode.  The 
+ * ShortCircuitRegistry generates and manages these segments.  Each segment
+ * has a randomly generated 128-bit ID which uniquely identifies it.  The
+ * segments each contain several "slots."
+ *
+ * Before performing a short-circuit read, DFSClients must request a pair of
+ * file descriptors from the DataNode via the REQUEST_SHORT_CIRCUIT_FDS
+ * operation.  As part of this operation, DFSClients pass the ID of the shared
+ * memory segment they would like to use to communicate information about this
+ * replica, as well as the slot number within that segment they would like to
+ * use.  Slot allocation is always done by the client.
+ *
+ * Slots are used to track the state of the block on the both the client and
+ * datanode. When this DataNode mlocks a block, the corresponding slots for the
+ * replicas are marked as "anchorable".  Anchorable blocks can be safely read
+ * without verifying the checksum.  This means that BlockReaderLocal objects
+ * using these replicas can skip checksumming.  It also means that we can do
+ * zero-copy reads on these replicas (the ZCR interface has no way of
+ * verifying checksums.)
+ * 
+ * When a DN needs to munlock a block, it needs to first wait for the block to
+ * be unanchored by clients doing a no-checksum read or a zero-copy read. The 
+ * DN also marks the block's slots as "unanchorable" to prevent additional 
+ * clients from initiating these operations in the future.
+ * 
+ * The counterpart fo this class on the client is {@link DfsClientShmManager}.
+ */
+public class ShortCircuitRegistry {
+  public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
+
+  private static final int SHM_LENGTH = 8192;
+
+  private static class RegisteredShm extends ShortCircuitShm
+      implements DomainSocketWatcher.Handler {
+    private final ShortCircuitRegistry registry;
+
+    RegisteredShm(ShmId shmId, FileInputStream stream,
+        ShortCircuitRegistry registry) throws IOException {
+      super(shmId, stream);
+      this.registry = registry;
+    }
+
+    @Override
+    public boolean handle(DomainSocket sock) {
+      synchronized (registry) {
+        synchronized (this) {
+          registry.removeShm(this);
+        }
+      }
+      return true;
+    }
+  }
+
+  public synchronized void removeShm(ShortCircuitShm shm) {
+    if (LOG.isTraceEnabled()) {
+      LOG.debug("removing shm " + shm);
+    }
+    // Stop tracking the shmId.
+    RegisteredShm removedShm = segments.remove(shm.getShmId());
+    Preconditions.checkState(removedShm == shm,
+        "failed to remove " + shm.getShmId());
+    // Stop tracking the slots.
+    for (Iterator<Slot> iter = shm.slotIterator(); iter.hasNext(); ) {
+      Slot slot = iter.next();
+      boolean removed = slots.remove(slot.getBlockId(), slot);
+      Preconditions.checkState(removed);
+      slot.makeInvalid();
+    }
+    // De-allocate the memory map and close the shared file. 
+    shm.free();
+  }
+
+  /**
+   * Whether or not the registry is enabled.
+   */
+  private boolean enabled;
+
+  /**
+   * The factory which creates shared file descriptors.
+   */
+  private final SharedFileDescriptorFactory shmFactory;
+  
+  /**
+   * A watcher which sends out callbacks when the UNIX domain socket
+   * associated with a shared memory segment closes.
+   */
+  private final DomainSocketWatcher watcher;
+
+  private final HashMap<ShmId, RegisteredShm> segments =
+      new HashMap<ShmId, RegisteredShm>(0);
+  
+  private final HashMultimap<ExtendedBlockId, Slot> slots =
+      HashMultimap.create(0, 1);
+  
+  public ShortCircuitRegistry(Configuration conf) throws IOException {
+    boolean enabled = false;
+    SharedFileDescriptorFactory shmFactory = null;
+    DomainSocketWatcher watcher = null;
+    try {
+      if (!NativeIO.isAvailable()) {
+        LOG.debug("Disabling ShortCircuitRegistry because NativeIO is " +
+            "not available.");
+        return;
+      }
+      String shmPath = conf.get(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH,
+          DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT);
+      if (shmPath.isEmpty()) {
+        LOG.info("Disabling ShortCircuitRegistry because shmPath was not set.");
+        return;
+      }
+      int interruptCheck = conf.getInt(
+          DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
+          DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
+      if (interruptCheck <= 0) {
+        LOG.info("Disabling ShortCircuitRegistry because interruptCheckMs " +
+            "was set to " + interruptCheck);
+        return;
+      }
+      shmFactory = 
+          new SharedFileDescriptorFactory("HadoopShortCircuitShm_", shmPath);
+      watcher = new DomainSocketWatcher(interruptCheck);
+      enabled = true;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("created new ShortCircuitRegistry with interruptCheck=" +
+          interruptCheck + ", shmPath=" + shmPath);
+      }
+    } finally {
+      this.enabled = enabled;
+      this.shmFactory = shmFactory;
+      this.watcher = watcher;
+    }
+  }
+
+  /**
+   * Process a block mlock event from the FsDatasetCache.
+   *
+   * @param blockId    The block that was mlocked.
+   */
+  public synchronized void processBlockMlockEvent(ExtendedBlockId blockId) {
+    if (!enabled) return;
+    Set<Slot> affectedSlots = slots.get(blockId);
+    for (Slot slot : affectedSlots) {
+      slot.makeAnchorable();
+    }
+  }
+
+  /**
+   * Mark any slots associated with this blockId as unanchorable.
+   *
+   * @param blockId        The block ID.
+   * @return               True if we should allow the munlock request.
+   */
+  public synchronized boolean processBlockMunlockRequest(
+      ExtendedBlockId blockId) {
+    if (!enabled) return true;
+    boolean allowMunlock = true;
+    Set<Slot> affectedSlots = slots.get(blockId);
+    for (Slot slot : affectedSlots) {
+      slot.makeUnanchorable();
+      if (slot.isAnchored()) {
+        allowMunlock = false;
+      }
+    }
+    return allowMunlock;
+  }
+  
+  public static class NewShmInfo implements Closeable {
+    public final ShmId shmId;
+    public final FileInputStream stream;
+
+    NewShmInfo(ShmId shmId, FileInputStream stream) {
+      this.shmId = shmId;
+      this.stream = stream;
+    }
+
+    @Override
+    public void close() throws IOException {
+      stream.close();
+    }
+  }
+
+  /**
+   * Handle a DFSClient request to create a new memory segment.
+   *
+   * @param clientName    Client name as reported by the client.
+   * @param sock          The DomainSocket to associate with this memory
+   *                        segment.  When this socket is closed, or the
+   *                        other side writes anything to the socket, the
+   *                        segment will be closed.  This can happen at any
+   *                        time, including right after this function returns.
+   * @return              A NewShmInfo object.  The caller must close the
+   *                        NewShmInfo object once they are done with it.
+   * @throws IOException  If the new memory segment could not be created.
+   */
+  public NewShmInfo createNewMemorySegment(String clientName,
+      DomainSocket sock) throws IOException {
+    NewShmInfo info = null;
+    RegisteredShm shm = null;
+    ShmId shmId = null;
+    synchronized (this) {
+      if (!enabled) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("createNewMemorySegment: ShortCircuitRegistry is " +
+              "not enabled.");
+        }
+        throw new UnsupportedOperationException();
+      }
+      FileInputStream fis = null;
+      try {
+        do {
+          shmId = ShmId.createRandom();
+        } while (segments.containsKey(shmId));
+        fis = shmFactory.createDescriptor(clientName, SHM_LENGTH);
+        shm = new RegisteredShm(shmId, fis, this);
+      } finally {
+        if (shm == null) {
+          IOUtils.closeQuietly(fis);
+        }
+      }
+      info = new NewShmInfo(shmId, fis);
+      segments.put(shmId, shm);
+    }
+    // Drop the registry lock to prevent deadlock.
+    // After this point, RegisteredShm#handle may be called at any time.
+    watcher.add(sock, shm);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("createNewMemorySegment: created " + info.shmId);
+    }
+    return info;
+  }
+  
+  public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId)
+      throws InvalidRequestException {
+    if (!enabled) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("registerSlot: ShortCircuitRegistry is " +
+            "not enabled.");
+      }
+      throw new UnsupportedOperationException();
+    }
+    ShmId shmId = slotId.getShmId();
+    RegisteredShm shm = segments.get(shmId);
+    if (shm == null) {
+      throw new InvalidRequestException("there is no shared memory segment " +
+          "registered with shmId " + shmId);
+    }
+    Slot slot = shm.registerSlot(slotId.getSlotIdx(), blockId);
+    boolean added = slots.put(blockId, slot);
+    Preconditions.checkState(added);
+  }
+  
+  public synchronized void unregisterSlot(SlotId slotId)
+      throws InvalidRequestException {
+    if (!enabled) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("unregisterSlot: ShortCircuitRegistry is " +
+            "not enabled.");
+      }
+      throw new UnsupportedOperationException();
+    }
+    ShmId shmId = slotId.getShmId();
+    RegisteredShm shm = segments.get(shmId);
+    if (shm == null) {
+      throw new InvalidRequestException("there is no shared memory segment " +
+          "registered with shmId " + shmId);
+    }
+    Slot slot = shm.getSlot(slotId.getSlotIdx());
+    slot.makeInvalid();
+    shm.unregisterSlot(slotId.getSlotIdx());
+    slots.remove(slot.getBlockId(), slot);
+  }
+  
+  public void shutdown() {
+    synchronized (this) {
+      if (!enabled) return;
+      enabled = false;
+    }
+    IOUtils.closeQuietly(watcher);
+  }
+}