Browse Source

HDFS-10880. Federation Mount Table State Store internal API. Contributed by Jason Kace and Inigo Goiri.

(cherry picked from commit 58b97df661441150d35abd44b3a8606206b46441)
(cherry picked from commit 6f0de2731806628b5b01bd1350225692147590da)
Inigo Goiri 7 years ago
parent
commit
cc7c12ff18
36 changed files with 3437 additions and 76 deletions
  1. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  2. 80 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
  3. 544 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
  4. 107 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java
  5. 29 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
  6. 29 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java
  7. 5 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
  8. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
  9. 49 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
  10. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
  11. 116 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
  12. 47 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java
  13. 42 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryResponse.java
  14. 49 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesRequest.java
  15. 53 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesResponse.java
  16. 49 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryRequest.java
  17. 42 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryResponse.java
  18. 51 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryRequest.java
  19. 43 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryResponse.java
  20. 84 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryRequestPBImpl.java
  21. 76 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryResponsePBImpl.java
  22. 76 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesRequestPBImpl.java
  23. 104 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesResponsePBImpl.java
  24. 76 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryRequestPBImpl.java
  25. 76 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryResponsePBImpl.java
  26. 96 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryRequestPBImpl.java
  27. 76 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryResponsePBImpl.java
  28. 301 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
  29. 213 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
  30. 60 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
  31. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
  32. 396 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
  33. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
  34. 250 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
  35. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
  36. 176 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
@@ -1070,8 +1072,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // HDFS Router State Store connection
   public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
       FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";
-  public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT =
-      "org.apache.hadoop.hdfs.server.federation.MockResolver";
+  public static final Class<? extends FileSubclusterResolver>
+      FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT =
+          MountTableResolver.class;
   public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS =
       FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class";
   public static final Class<? extends ActiveNamenodeResolver>

+ 80 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+
+/**
+ * Manage a mount table.
+ */
+public interface MountTableManager {
+
+  /**
+   * Add an entry to the mount table.
+   *
+   * @param request Fully populated request object.
+   * @return True if the mount table entry was successfully committed to the
+   *         data store.
+   * @throws IOException Throws exception if the data store is not initialized.
+   */
+  AddMountTableEntryResponse addMountTableEntry(
+      AddMountTableEntryRequest request) throws IOException;
+
+  /**
+   * Updates an existing entry in the mount table.
+   *
+   * @param request Fully populated request object.
+   * @return True if the mount table entry was successfully committed to the
+   *         data store.
+   * @throws IOException Throws exception if the data store is not initialized.
+   */
+  UpdateMountTableEntryResponse updateMountTableEntry(
+      UpdateMountTableEntryRequest request) throws IOException;
+
+  /**
+   * Remove an entry from the mount table.
+   *
+   * @param request Fully populated request object.
+   * @return True the mount table entry was removed from the data store.
+   * @throws IOException Throws exception if the data store is not initialized.
+   */
+  RemoveMountTableEntryResponse removeMountTableEntry(
+      RemoveMountTableEntryRequest request) throws IOException;
+
+  /**
+   * List all mount table entries present at or below the path. Fetches from the
+   * state store.
+   *
+   * @param request Fully populated request object.
+   *
+   * @return List of all mount table entries under the path. Zero-length list if
+   *         none are found.
+   * @throws IOException Throws exception if the data store cannot be queried.
+   */
+  GetMountTableEntriesResponse getMountTableEntries(
+      GetMountTableEntriesRequest request) throws IOException;
+}

+ 544 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java

@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Mount table to map between global paths and remote locations. This allows the
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} to map
+ * the global HDFS view to the remote namespaces. This is similar to
+ * {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
+ * This is implemented as a tree.
+ */
+public class MountTableResolver
+    implements FileSubclusterResolver, StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MountTableResolver.class);
+
+  /** Reference to Router. */
+  private final Router router;
+  /** Reference to the State Store. */
+  private final StateStoreService stateStore;
+  /** Interface to the mount table store. */
+  private MountTableStore mountTableStore;
+
+  /** If the tree has been initialized. */
+  private boolean init = false;
+  /** Path -> Remote HDFS location. */
+  private final TreeMap<String, MountTable> tree = new TreeMap<>();
+  /** Path -> Remote location. */
+  private final ConcurrentNavigableMap<String, PathLocation> locationCache =
+      new ConcurrentSkipListMap<>();
+
+  /** Default nameservice when no mount matches the math. */
+  private String defaultNameService = "";
+
+  /** Synchronization for both the tree and the cache. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+
+  @VisibleForTesting
+  public MountTableResolver(Configuration conf) {
+    this(conf, (StateStoreService)null);
+  }
+
+  public MountTableResolver(Configuration conf, Router routerService) {
+    this.router = routerService;
+    if (this.router != null) {
+      this.stateStore = this.router.getStateStore();
+    } else {
+      this.stateStore = null;
+    }
+
+    registerCacheExternal();
+    initDefaultNameService(conf);
+  }
+
+  public MountTableResolver(Configuration conf, StateStoreService store) {
+    this.router = null;
+    this.stateStore = store;
+
+    registerCacheExternal();
+    initDefaultNameService(conf);
+  }
+
+  /**
+   * Request cache updates from the State Store for this resolver.
+   */
+  private void registerCacheExternal() {
+    if (this.stateStore != null) {
+      this.stateStore.registerCacheExternal(this);
+    }
+  }
+
+  /**
+   * Nameservice for APIs that cannot be resolved to a specific one.
+   *
+   * @param conf Configuration for this resolver.
+   */
+  private void initDefaultNameService(Configuration conf) {
+    try {
+      this.defaultNameService = conf.get(
+          DFS_ROUTER_DEFAULT_NAMESERVICE,
+          DFSUtil.getNamenodeNameServiceId(conf));
+    } catch (HadoopIllegalArgumentException e) {
+      LOG.error("Cannot find default name service, setting it to the first");
+      Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
+      this.defaultNameService = nsIds.iterator().next();
+      LOG.info("Default name service: {}", this.defaultNameService);
+    }
+  }
+
+  /**
+   * Get a reference for the Router for this resolver.
+   *
+   * @return Router for this resolver.
+   */
+  protected Router getRouter() {
+    return this.router;
+  }
+
+  /**
+   * Get the mount table store for this resolver.
+   *
+   * @return Mount table store.
+   * @throws IOException If it cannot connect to the State Store.
+   */
+  protected MountTableStore getMountTableStore() throws IOException {
+    if (this.mountTableStore == null) {
+      this.mountTableStore = this.stateStore.getRegisteredRecordStore(
+          MountTableStore.class);
+      if (this.mountTableStore == null) {
+        throw new IOException("State Store does not have an interface for " +
+            MountTableStore.class);
+      }
+    }
+    return this.mountTableStore;
+  }
+
+  /**
+   * Add a mount entry to the table.
+   *
+   * @param entry The mount table record to add from the state store.
+   */
+  public void addEntry(final MountTable entry) {
+    writeLock.lock();
+    try {
+      String srcPath = entry.getSourcePath();
+      this.tree.put(srcPath, entry);
+      invalidateLocationCache(srcPath);
+    } finally {
+      writeLock.unlock();
+    }
+    this.init = true;
+  }
+
+  /**
+   * Remove a mount table entry.
+   *
+   * @param srcPath Source path for the entry to remove.
+   */
+  public void removeEntry(final String srcPath) {
+    writeLock.lock();
+    try {
+      this.tree.remove(srcPath);
+      invalidateLocationCache(srcPath);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Invalidates all cache entries below this path. It requires the write lock.
+   *
+   * @param src Source path.
+   */
+  private void invalidateLocationCache(final String path) {
+    if (locationCache.isEmpty()) {
+      return;
+    }
+    // Determine next lexicographic entry after source path
+    String nextSrc = path + Character.MAX_VALUE;
+    ConcurrentNavigableMap<String, PathLocation> subMap =
+        locationCache.subMap(path, nextSrc);
+    for (final String key : subMap.keySet()) {
+      locationCache.remove(key);
+    }
+  }
+
+  /**
+   * Updates the mount path tree with a new set of mount table entries. It also
+   * updates the needed caches.
+   *
+   * @param entries Full set of mount table entries to update.
+   */
+  @VisibleForTesting
+  public void refreshEntries(final Collection<MountTable> entries) {
+    // The tree read/write must be atomic
+    writeLock.lock();
+    try {
+      // New entries
+      Map<String, MountTable> newEntries = new ConcurrentHashMap<>();
+      for (MountTable entry : entries) {
+        String srcPath = entry.getSourcePath();
+        newEntries.put(srcPath, entry);
+      }
+
+      // Old entries (reversed to sort from the leaves to the root)
+      Set<String> oldEntries = new TreeSet<>(Collections.reverseOrder());
+      for (MountTable entry : getTreeValues("/")) {
+        String srcPath = entry.getSourcePath();
+        oldEntries.add(srcPath);
+      }
+
+      // Entries that need to be removed
+      for (String srcPath : oldEntries) {
+        if (!newEntries.containsKey(srcPath)) {
+          this.tree.remove(srcPath);
+          invalidateLocationCache(srcPath);
+          LOG.info("Removed stale mount point {} from resolver", srcPath);
+        }
+      }
+
+      // Entries that need to be added
+      for (MountTable entry : entries) {
+        String srcPath = entry.getSourcePath();
+        if (!oldEntries.contains(srcPath)) {
+          // Add node, it does not exist
+          this.tree.put(srcPath, entry);
+          LOG.info("Added new mount point {} to resolver", srcPath);
+        } else {
+          // Node exists, check for updates
+          MountTable existingEntry = this.tree.get(srcPath);
+          if (existingEntry != null && !existingEntry.equals(entry)) {
+            // Entry has changed
+            invalidateLocationCache(srcPath);
+            LOG.info("Updated mount point {} in resolver");
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+    this.init = true;
+  }
+
+  /**
+   * Replaces the current in-memory cached of the mount table with a new
+   * version fetched from the data store.
+   */
+  @Override
+  public boolean loadCache(boolean force) {
+    try {
+      // Our cache depends on the store, update it first
+      MountTableStore mountTable = this.getMountTableStore();
+      mountTable.loadCache(force);
+
+      GetMountTableEntriesRequest request =
+          GetMountTableEntriesRequest.newInstance("/");
+      GetMountTableEntriesResponse response =
+          mountTable.getMountTableEntries(request);
+      List<MountTable> records = response.getEntries();
+      refreshEntries(records);
+    } catch (IOException e) {
+      LOG.error("Cannot fetch mount table entries from State Store", e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Clears all data.
+   */
+  public void clear() {
+    LOG.info("Clearing all mount location caches");
+    writeLock.lock();
+    try {
+      this.locationCache.clear();
+      this.tree.clear();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public PathLocation getDestinationForPath(final String path)
+      throws IOException {
+    verifyMountTable();
+    readLock.lock();
+    try {
+      return this.locationCache.computeIfAbsent(
+          path, this::lookupLocation);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Build the path location to insert into the cache atomically. It must hold
+   * the read lock.
+   * @param path Path to check/insert.
+   * @return New remote location.
+   */
+  public PathLocation lookupLocation(final String path) {
+    PathLocation ret = null;
+    MountTable entry = findDeepest(path);
+    if (entry != null) {
+      ret = buildLocation(path, entry);
+    } else {
+      // Not found, use default location
+      RemoteLocation remoteLocation =
+          new RemoteLocation(defaultNameService, path);
+      List<RemoteLocation> locations =
+          Collections.singletonList(remoteLocation);
+      ret = new PathLocation(null, locations);
+    }
+    return ret;
+  }
+
+  /**
+   * Get the mount table entry for a path.
+   *
+   * @param path Path to look for.
+   * @return Mount table entry the path belongs.
+   * @throws IOException If the State Store could not be reached.
+   */
+  public MountTable getMountPoint(final String path) throws IOException {
+    verifyMountTable();
+    return findDeepest(path);
+  }
+
+  @Override
+  public List<String> getMountPoints(final String path) throws IOException {
+    verifyMountTable();
+
+    Set<String> children = new TreeSet<>();
+    readLock.lock();
+    try {
+      String from = path;
+      String to = path + Character.MAX_VALUE;
+      SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
+
+      boolean exists = false;
+      for (String subPath : subMap.keySet()) {
+        String child = subPath;
+
+        // Special case for /
+        if (!path.equals(Path.SEPARATOR)) {
+          // Get the children
+          int ini = path.length();
+          child = subPath.substring(ini);
+        }
+
+        if (child.isEmpty()) {
+          // This is a mount point but without children
+          exists = true;
+        } else if (child.startsWith(Path.SEPARATOR)) {
+          // This is a mount point with children
+          exists = true;
+          child = child.substring(1);
+
+          // We only return immediate children
+          int fin = child.indexOf(Path.SEPARATOR);
+          if (fin > -1) {
+            child = child.substring(0, fin);
+          }
+          if (!child.isEmpty()) {
+            children.add(child);
+          }
+        }
+      }
+      if (!exists) {
+        return null;
+      }
+      return new LinkedList<>(children);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get all the mount records at or beneath a given path.
+   * @param path Path to get the mount points from.
+   * @return List of mount table records under the path or null if the path is
+   *         not found.
+   * @throws IOException If it's not connected to the State Store.
+   */
+  public List<MountTable> getMounts(final String path) throws IOException {
+    verifyMountTable();
+
+    return getTreeValues(path, false);
+  }
+
+  /**
+   * Check if the Mount Table is ready to be used.
+   * @throws StateStoreUnavailableException If it cannot connect to the store.
+   */
+  private void verifyMountTable() throws StateStoreUnavailableException {
+    if (!this.init) {
+      throw new StateStoreUnavailableException("Mount Table not initialized");
+    }
+  }
+
+  @Override
+  public String toString() {
+    readLock.lock();
+    try {
+      return this.tree.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Build a location for this result beneath the discovered mount point.
+   *
+   * @param result Tree node search result.
+   * @return PathLocation containing the namespace, local path.
+   */
+  private static PathLocation buildLocation(
+      final String path, final MountTable entry) {
+
+    String srcPath = entry.getSourcePath();
+    if (!path.startsWith(srcPath)) {
+      LOG.error("Cannot build location, {} not a child of {}", path, srcPath);
+      return null;
+    }
+    String remainingPath = path.substring(srcPath.length());
+    if (remainingPath.startsWith(Path.SEPARATOR)) {
+      remainingPath = remainingPath.substring(1);
+    }
+
+    List<RemoteLocation> locations = new LinkedList<>();
+    for (RemoteLocation oneDst : entry.getDestinations()) {
+      String nsId = oneDst.getNameserviceId();
+      String dest = oneDst.getDest();
+      String newPath = dest;
+      if (!newPath.endsWith(Path.SEPARATOR)) {
+        newPath += Path.SEPARATOR;
+      }
+      newPath += remainingPath;
+      RemoteLocation remoteLocation = new RemoteLocation(nsId, newPath);
+      locations.add(remoteLocation);
+    }
+    DestinationOrder order = entry.getDestOrder();
+    return new PathLocation(srcPath, locations, order);
+  }
+
+  @Override
+  public String getDefaultNamespace() {
+    return this.defaultNameService;
+  }
+
+  /**
+   * Find the deepest mount point for a path.
+   * @param path Path to look for.
+   * @return Mount table entry.
+   */
+  private MountTable findDeepest(final String path) {
+    readLock.lock();
+    try {
+      Entry<String, MountTable> entry = this.tree.floorEntry(path);
+      while (entry != null && !path.startsWith(entry.getKey())) {
+        entry = this.tree.lowerEntry(entry.getKey());
+      }
+      if (entry == null) {
+        return null;
+      }
+      return entry.getValue();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the mount table entries under a path.
+   * @param path Path to search from.
+   * @return Mount Table entries.
+   */
+  private List<MountTable> getTreeValues(final String path) {
+    return getTreeValues(path, false);
+  }
+
+  /**
+   * Get the mount table entries under a path.
+   * @param path Path to search from.
+   * @param reverse If the order should be reversed.
+   * @return Mount Table entries.
+   */
+  private List<MountTable> getTreeValues(final String path, boolean reverse) {
+    LinkedList<MountTable> ret = new LinkedList<>();
+    readLock.lock();
+    try {
+      String from = path;
+      String to = path + Character.MAX_VALUE;
+      SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
+      for (MountTable entry : subMap.values()) {
+        if (!reverse) {
+          ret.add(entry);
+        } else {
+          ret.addFirst(entry);
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return ret;
+  }
+}

+ 107 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java

@@ -23,21 +23,27 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A map of the properties and target destinations (name space + path) for
- * a path in the global/federated namespace.
+ * a path in the global/federated name space.
  * This data is generated from the @see MountTable records.
  */
 public class PathLocation {
 
+  private static final Logger LOG = LoggerFactory.getLogger(PathLocation.class);
+
+
   /** Source path in global namespace. */
   private final String sourcePath;
 
-  /** Remote paths in the target namespaces. */
+  /** Remote paths in the target name spaces. */
   private final List<RemoteLocation> destinations;
-
-  /** List of name spaces present. */
-  private final Set<String> namespaces;
+  /** Order for the destinations. */
+  private final DestinationOrder destOrder;
 
 
   /**
@@ -45,14 +51,23 @@ public class PathLocation {
    *
    * @param source Source path in the global name space.
    * @param dest Destinations of the mount table entry.
-   * @param namespaces Unique identifier representing the combination of
-   *          name spaces present in the destination list.
+   * @param order Order of the locations.
    */
   public PathLocation(
-      String source, List<RemoteLocation> dest, Set<String> nss) {
+      String source, List<RemoteLocation> dest, DestinationOrder order) {
     this.sourcePath = source;
-    this.destinations = dest;
-    this.namespaces = nss;
+    this.destinations = Collections.unmodifiableList(dest);
+    this.destOrder = order;
+  }
+
+  /**
+   * Create a new PathLocation with default HASH order.
+   *
+   * @param source Source path in the global name space.
+   * @param dest Destinations of the mount table entry.
+   */
+  public PathLocation(String source, List<RemoteLocation> dest) {
+    this(source, dest, DestinationOrder.HASH);
   }
 
   /**
@@ -60,10 +75,55 @@ public class PathLocation {
    *
    * @param other Other path location to copy from.
    */
-  public PathLocation(PathLocation other) {
+  public PathLocation(final PathLocation other) {
     this.sourcePath = other.sourcePath;
-    this.destinations = new LinkedList<RemoteLocation>(other.destinations);
-    this.namespaces = new HashSet<String>(other.namespaces);
+    this.destinations = Collections.unmodifiableList(other.destinations);
+    this.destOrder = other.destOrder;
+  }
+
+  /**
+   * Create a path location from another path with the destinations sorted.
+   *
+   * @param other Other path location to copy from.
+   * @param firstNsId Identifier of the namespace to place first.
+   */
+  public PathLocation(PathLocation other, String firstNsId) {
+    this.sourcePath = other.sourcePath;
+    this.destOrder = other.destOrder;
+    this.destinations = orderedNamespaces(other.destinations, firstNsId);
+  }
+
+  /**
+   * Prioritize a location/destination by its name space/nameserviceId.
+   * This destination might be used by other threads, so the source is not
+   * modifiable.
+   *
+   * @param original List of destinations to order.
+   * @param nsId The name space/nameserviceID to prioritize.
+   * @return Prioritized list of detinations that cannot be modified.
+   */
+  private static List<RemoteLocation> orderedNamespaces(
+      final List<RemoteLocation> original, final String nsId) {
+    if (original.size() <= 1) {
+      return original;
+    }
+
+    LinkedList<RemoteLocation> newDestinations = new LinkedList<>();
+    boolean found = false;
+    for (RemoteLocation dest : original) {
+      if (dest.getNameserviceId().equals(nsId)) {
+        found = true;
+        newDestinations.addFirst(dest);
+      } else {
+        newDestinations.add(dest);
+      }
+    }
+
+    if (!found) {
+      LOG.debug("Cannot find location with namespace {} in {}",
+          nsId, original);
+    }
+    return Collections.unmodifiableList(newDestinations);
   }
 
   /**
@@ -76,16 +136,37 @@ public class PathLocation {
   }
 
   /**
-   * Get the list of subclusters defined for the destinations.
+   * Get the subclusters defined for the destinations.
+   *
+   * @return Set containing the subclusters.
    */
   public Set<String> getNamespaces() {
-    return Collections.unmodifiableSet(this.namespaces);
+    Set<String> namespaces = new HashSet<>();
+    List<RemoteLocation> locations = this.getDestinations();
+    for (RemoteLocation location : locations) {
+      String nsId = location.getNameserviceId();
+      namespaces.add(nsId);
+    }
+    return namespaces;
   }
 
   @Override
   public String toString() {
-    RemoteLocation loc = getDefaultLocation();
-    return loc.getNameserviceId() + "->" + loc.getDest();
+    StringBuilder sb = new StringBuilder();
+    for (RemoteLocation destination : this.destinations) {
+      String nsId = destination.getNameserviceId();
+      String path = destination.getDest();
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(nsId + "->" + path);
+    }
+    if (this.destinations.size() > 1) {
+      sb.append(" [");
+      sb.append(this.destOrder.toString());
+      sb.append("]");
+    }
+    return sb.toString();
   }
 
   /**
@@ -107,6 +188,15 @@ public class PathLocation {
     return Collections.unmodifiableList(this.destinations);
   }
 
+  /**
+   * Get the order for the destinations.
+   *
+   * @return Order for the destinations.
+   */
+  public DestinationOrder getDestinationOrder() {
+    return this.destOrder;
+  }
+
   /**
    * Get the default or highest priority location.
    *

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+/**
+ * Order of the destinations when we have multiple of them. When the resolver
+ * of files to subclusters (FileSubclusterResolver) has multiple destinations,
+ * this determines which location should be checked first.
+ */
+public enum DestinationOrder {
+  HASH, // Follow consistent hashing
+  LOCAL, // Local first
+  RANDOM // Random order
+}

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A federated location can be resolved to multiple subclusters. This package
+ * takes care of the order in which this multiple destinations should be used.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 5 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java

@@ -135,66 +135,20 @@ public final class FederationUtil {
     }
   }
 
-  /**
-   * Create an instance of an interface with a constructor using a state store
-   * constructor.
-   *
-   * @param conf Configuration
-   * @param context Context object to pass to the instance.
-   * @param contextType Type of the context passed to the constructor.
-   * @param configurationKeyName Configuration key to retrieve the class to load
-   * @param defaultClassName Default class to load if the configuration key is
-   *          not set
-   * @param clazz Class/interface that must be implemented by the instance.
-   * @return New instance of the specified class that implements the desired
-   *         interface and a single parameter constructor containing a
-   *         StateStore reference.
-   */
-  private static <T, R> T newInstance(final Configuration conf,
-      final R context, final Class<R> contextClass,
-      final String configKeyName, final String defaultClassName,
-      final Class<T> clazz) {
-
-    String className = conf.get(configKeyName, defaultClassName);
-    try {
-      Class<?> instance = conf.getClassByName(className);
-      if (clazz.isAssignableFrom(instance)) {
-        if (contextClass == null) {
-          // Default constructor if no context
-          @SuppressWarnings("unchecked")
-          Constructor<T> constructor =
-              (Constructor<T>) instance.getConstructor();
-          return constructor.newInstance();
-        } else {
-          // Constructor with context
-          @SuppressWarnings("unchecked")
-          Constructor<T> constructor = (Constructor<T>) instance.getConstructor(
-              Configuration.class, contextClass);
-          return constructor.newInstance(conf, context);
-        }
-      } else {
-        throw new RuntimeException("Class " + className + " not instance of "
-            + clazz.getCanonicalName());
-      }
-    } catch (ReflectiveOperationException e) {
-      LOG.error("Could not instantiate: " + className, e);
-      return null;
-    }
-  }
-
   /**
    * Creates an instance of a FileSubclusterResolver from the configuration.
    *
    * @param conf Configuration that defines the file resolver class.
-   * @param obj Context object passed to class constructor.
-   * @return FileSubclusterResolver
+   * @param router Router service.
+   * @return New file subcluster resolver.
    */
   public static FileSubclusterResolver newFileSubclusterResolver(
-      Configuration conf, StateStoreService stateStore) {
-    return newInstance(conf, stateStore, StateStoreService.class,
+      Configuration conf, Router router) {
+    Class<? extends FileSubclusterResolver> clazz = conf.getClass(
         DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
         DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
         FileSubclusterResolver.class);
+    return newInstance(conf, router, Router.class, clazz);
   }
 
   /**

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java

@@ -124,8 +124,7 @@ public class Router extends CompositeService {
     }
 
     // Lookup interface to map between the global and subcluster name spaces
-    this.subclusterResolver = newFileSubclusterResolver(
-        this.conf, this.stateStore);
+    this.subclusterResolver = newFileSubclusterResolver(this.conf, this);
     if (this.subclusterResolver == null) {
       throw new IOException("Cannot find subcluster resolver");
     }

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * Management API for the HDFS mount table information stored in
+ * {@link org.apache.hadoop.hdfs.server.federation.store.records.MountTable
+ * MountTable} records. The mount table contains entries that map a particular
+ * global namespace path one or more HDFS nameservices (NN) + target path. It is
+ * possible to map mount locations for root folders, directories or individual
+ * files.
+ * <p>
+ * Once fetched from the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver}, MountTable records are cached in a tree for faster access.
+ * Each path in the global namespace is mapped to a nameserivce ID and local
+ * path upon request. The cache is periodically updated by the @{link
+ * StateStoreCacheUpdateService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MountTableStore extends CachedRecordStore<MountTable>
+    implements MountTableManager {
+
+  public MountTableStore(StateStoreDriver driver) {
+    super(MountTable.class, driver);
+  }
+}

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.service.CompositeService;
@@ -136,6 +137,7 @@ public class StateStoreService extends CompositeService {
 
     // Add supported record stores
     addRecordStore(MembershipStoreImpl.class);
+    addRecordStore(MountTableStoreImpl.class);
 
     // Check the connection to the State Store periodically
     this.monitorService = new StateStoreConnectionMonitorService(this);

+ 116 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Implementation of the {@link MountTableStore} state store API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MountTableStoreImpl extends MountTableStore {
+
+  public MountTableStoreImpl(StateStoreDriver driver) {
+    super(driver);
+  }
+
+  @Override
+  public AddMountTableEntryResponse addMountTableEntry(
+      AddMountTableEntryRequest request) throws IOException {
+    boolean status = getDriver().put(request.getEntry(), false, true);
+    AddMountTableEntryResponse response =
+        AddMountTableEntryResponse.newInstance();
+    response.setStatus(status);
+    return response;
+  }
+
+  @Override
+  public UpdateMountTableEntryResponse updateMountTableEntry(
+      UpdateMountTableEntryRequest request) throws IOException {
+    MountTable entry = request.getEntry();
+    boolean status = getDriver().put(entry, true, true);
+    UpdateMountTableEntryResponse response =
+        UpdateMountTableEntryResponse.newInstance();
+    response.setStatus(status);
+    return response;
+  }
+
+  @Override
+  public RemoveMountTableEntryResponse removeMountTableEntry(
+      RemoveMountTableEntryRequest request) throws IOException {
+    final String srcPath = request.getSrcPath();
+    final MountTable partial = MountTable.newInstance();
+    partial.setSourcePath(srcPath);
+    final Query<MountTable> query = new Query<>(partial);
+    int removedRecords = getDriver().remove(getRecordClass(), query);
+    boolean status = (removedRecords == 1);
+    RemoveMountTableEntryResponse response =
+        RemoveMountTableEntryResponse.newInstance();
+    response.setStatus(status);
+    return response;
+  }
+
+  @Override
+  public GetMountTableEntriesResponse getMountTableEntries(
+      GetMountTableEntriesRequest request) throws IOException {
+
+    // Get all values from the cache
+    List<MountTable> records = getCachedRecords();
+
+    // Sort and filter
+    Collections.sort(records);
+    String reqSrcPath = request.getSrcPath();
+    if (reqSrcPath != null && !reqSrcPath.isEmpty()) {
+      // Return only entries beneath this path
+      Iterator<MountTable> it = records.iterator();
+      while (it.hasNext()) {
+        MountTable record = it.next();
+        String srcPath = record.getSourcePath();
+        if (!srcPath.startsWith(reqSrcPath)) {
+          it.remove();
+        }
+      }
+    }
+
+    GetMountTableEntriesResponse response =
+        GetMountTableEntriesResponse.newInstance();
+    response.setEntries(records);
+    response.setTimestamp(Time.now());
+    return response;
+  }
+}

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * API request for adding a mount table entry to the state store.
+ */
+public abstract class AddMountTableEntryRequest {
+
+  public static AddMountTableEntryRequest newInstance() {
+    return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class);
+  }
+
+  public static AddMountTableEntryRequest newInstance(MountTable newEntry) {
+    AddMountTableEntryRequest request = newInstance();
+    request.setEntry(newEntry);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract MountTable getEntry();
+
+  @Public
+  @Unstable
+  public abstract void setEntry(MountTable mount);
+}

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryResponse.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for adding a mount table entry to the state store.
+ */
+public abstract class AddMountTableEntryResponse {
+
+  public static AddMountTableEntryResponse newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(AddMountTableEntryResponse.class);
+  }
+
+  @Public
+  @Unstable
+  public abstract boolean getStatus();
+
+  @Public
+  @Unstable
+  public abstract void setStatus(boolean result);
+}

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesRequest.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API request for listing mount table entries present in the state store.
+ */
+public abstract class GetMountTableEntriesRequest {
+
+  public static GetMountTableEntriesRequest newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(GetMountTableEntriesRequest.class);
+  }
+
+  public static GetMountTableEntriesRequest newInstance(String srcPath)
+      throws IOException {
+    GetMountTableEntriesRequest request = newInstance();
+    request.setSrcPath(srcPath);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getSrcPath();
+
+  @Public
+  @Unstable
+  public abstract void setSrcPath(String path);
+}

+ 53 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesResponse.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * API response for listing mount table entries present in the state store.
+ */
+public abstract class GetMountTableEntriesResponse {
+
+  public static GetMountTableEntriesResponse newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(GetMountTableEntriesResponse.class);
+  }
+
+  @Public
+  @Unstable
+  public abstract List<MountTable> getEntries() throws IOException;
+
+  @Public
+  @Unstable
+  public abstract void setEntries(List<MountTable> entries)
+      throws IOException;
+
+  @Public
+  @Unstable
+  public abstract long getTimestamp();
+
+  @Public
+  @Unstable
+  public abstract void setTimestamp(long time);
+}

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryRequest.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API request for removing a mount table path present in the state store.
+ */
+public abstract class RemoveMountTableEntryRequest {
+
+  public static RemoveMountTableEntryRequest newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(RemoveMountTableEntryRequest.class);
+  }
+
+  public static RemoveMountTableEntryRequest newInstance(String path)
+      throws IOException {
+    RemoveMountTableEntryRequest request = newInstance();
+    request.setSrcPath(path);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getSrcPath();
+
+  @Public
+  @Unstable
+  public abstract void setSrcPath(String path);
+}

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryResponse.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for removing a mount table path present in the state store.
+ */
+public abstract class RemoveMountTableEntryResponse {
+
+  public static RemoveMountTableEntryResponse newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(RemoveMountTableEntryResponse.class);
+  }
+
+  @Public
+  @Unstable
+  public abstract boolean getStatus();
+
+  @Public
+  @Unstable
+  public abstract void setStatus(boolean result);
+}

+ 51 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryRequest.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * API request for updating the destination of an existing mount point in the
+ * state store.
+ */
+public abstract class UpdateMountTableEntryRequest {
+
+  public static UpdateMountTableEntryRequest newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(UpdateMountTableEntryRequest.class);
+  }
+
+  public static UpdateMountTableEntryRequest newInstance(MountTable entry)
+      throws IOException {
+    UpdateMountTableEntryRequest request = newInstance();
+    request.setEntry(entry);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract MountTable getEntry() throws IOException;
+
+  @Public
+  @Unstable
+  public abstract void setEntry(MountTable mount) throws IOException;
+}

+ 43 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryResponse.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for updating the destination of an existing mount point in the
+ * state store.
+ */
+public abstract class UpdateMountTableEntryResponse {
+
+  public static UpdateMountTableEntryResponse newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(UpdateMountTableEntryResponse.class);
+  }
+
+  @Public
+  @Unstable
+  public abstract boolean getStatus();
+
+  @Public
+  @Unstable
+  public abstract void setStatus(boolean result);
+}

+ 84 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryRequestPBImpl.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProtoOrBuilder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * AddMountTableEntryRequest.
+ */
+public class AddMountTableEntryRequestPBImpl
+    extends AddMountTableEntryRequest implements PBRecord {
+
+  private FederationProtocolPBTranslator<AddMountTableEntryRequestProto,
+      AddMountTableEntryRequestProto.Builder,
+      AddMountTableEntryRequestProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<AddMountTableEntryRequestProto,
+              AddMountTableEntryRequestProto.Builder,
+              AddMountTableEntryRequestProtoOrBuilder>(
+                  AddMountTableEntryRequestProto.class);
+
+  public AddMountTableEntryRequestPBImpl() {
+  }
+
+  public AddMountTableEntryRequestPBImpl(AddMountTableEntryRequestProto proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public AddMountTableEntryRequestProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public MountTable getEntry() {
+    MountTableRecordProto entryProto =
+        this.translator.getProtoOrBuilder().getEntry();
+    return new MountTablePBImpl(entryProto);
+  }
+
+  @Override
+  public void setEntry(MountTable mount) {
+    if (mount instanceof MountTablePBImpl) {
+      MountTablePBImpl mountPB = (MountTablePBImpl)mount;
+      MountTableRecordProto mountProto = mountPB.getProto();
+      translator.getBuilder().setEntry(mountProto);
+    }
+  }
+}

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryResponsePBImpl.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * AddMountTableEntryResponse.
+ */
+public class AddMountTableEntryResponsePBImpl
+    extends AddMountTableEntryResponse implements PBRecord {
+
+  private FederationProtocolPBTranslator<AddMountTableEntryResponseProto,
+      AddMountTableEntryResponseProto.Builder,
+      AddMountTableEntryResponseProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<AddMountTableEntryResponseProto,
+              AddMountTableEntryResponseProto.Builder,
+              AddMountTableEntryResponseProtoOrBuilder>(
+                  AddMountTableEntryResponseProto.class);
+
+  public AddMountTableEntryResponsePBImpl() {
+  }
+
+  public AddMountTableEntryResponsePBImpl(
+      AddMountTableEntryResponseProto proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public AddMountTableEntryResponseProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public boolean getStatus() {
+    return this.translator.getProtoOrBuilder().getStatus();
+  }
+
+  @Override
+  public void setStatus(boolean result) {
+    this.translator.getBuilder().setStatus(result);
+  }
+}

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesRequestPBImpl.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * GetMountTableEntriesRequest.
+ */
+public class GetMountTableEntriesRequestPBImpl
+    extends GetMountTableEntriesRequest implements PBRecord {
+
+  private FederationProtocolPBTranslator<GetMountTableEntriesRequestProto,
+      GetMountTableEntriesRequestProto.Builder,
+      GetMountTableEntriesRequestProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<GetMountTableEntriesRequestProto,
+              GetMountTableEntriesRequestProto.Builder,
+              GetMountTableEntriesRequestProtoOrBuilder>(
+                  GetMountTableEntriesRequestProto.class);
+
+  public GetMountTableEntriesRequestPBImpl() {
+  }
+
+  public GetMountTableEntriesRequestPBImpl(
+      GetMountTableEntriesRequestProto proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public GetMountTableEntriesRequestProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public String getSrcPath() {
+    return this.translator.getProtoOrBuilder().getSrcPath();
+  }
+
+  @Override
+  public void setSrcPath(String path) {
+    this.translator.getBuilder().setSrcPath(path);
+  }
+}

+ 104 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesResponsePBImpl.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * GetMountTableEntriesResponse.
+ */
+public class GetMountTableEntriesResponsePBImpl
+    extends GetMountTableEntriesResponse implements PBRecord {
+
+  private FederationProtocolPBTranslator<GetMountTableEntriesResponseProto,
+      GetMountTableEntriesResponseProto.Builder,
+      GetMountTableEntriesResponseProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<GetMountTableEntriesResponseProto,
+              GetMountTableEntriesResponseProto.Builder,
+              GetMountTableEntriesResponseProtoOrBuilder>(
+                  GetMountTableEntriesResponseProto.class);
+
+  public GetMountTableEntriesResponsePBImpl() {
+  }
+
+  public GetMountTableEntriesResponsePBImpl(
+      GetMountTableEntriesResponseProto proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public GetMountTableEntriesResponseProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public List<MountTable> getEntries() throws IOException {
+    List<MountTableRecordProto> entries =
+        this.translator.getProtoOrBuilder().getEntriesList();
+    List<MountTable> ret = new ArrayList<MountTable>();
+    for (MountTableRecordProto entry : entries) {
+      MountTable record = new MountTablePBImpl(entry);
+      ret.add(record);
+    }
+    return ret;
+  }
+
+  @Override
+  public void setEntries(List<MountTable> records) throws IOException {
+    this.translator.getBuilder().clearEntries();
+    for (MountTable entry : records) {
+      if (entry instanceof MountTablePBImpl) {
+        MountTablePBImpl entryPB = (MountTablePBImpl)entry;
+        this.translator.getBuilder().addEntries(entryPB.getProto());
+      }
+    }
+  }
+
+  @Override
+  public long getTimestamp() {
+    return this.translator.getProtoOrBuilder().getTimestamp();
+  }
+
+  @Override
+  public void setTimestamp(long time) {
+    this.translator.getBuilder().setTimestamp(time);
+  }
+}

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryRequestPBImpl.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * RemoveMountTableEntryRequest.
+ */
+public class RemoveMountTableEntryRequestPBImpl
+    extends RemoveMountTableEntryRequest implements PBRecord {
+
+  private FederationProtocolPBTranslator<RemoveMountTableEntryRequestProto,
+      RemoveMountTableEntryRequestProto.Builder,
+      RemoveMountTableEntryRequestProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<RemoveMountTableEntryRequestProto,
+              RemoveMountTableEntryRequestProto.Builder,
+              RemoveMountTableEntryRequestProtoOrBuilder>(
+                  RemoveMountTableEntryRequestProto.class);
+
+  public RemoveMountTableEntryRequestPBImpl() {
+  }
+
+  public RemoveMountTableEntryRequestPBImpl(
+      RemoveMountTableEntryRequestProto proto) {
+    this.setProto(proto);
+  }
+
+  @Override
+  public RemoveMountTableEntryRequestProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public String getSrcPath() {
+    return this.translator.getProtoOrBuilder().getSrcPath();
+  }
+
+  @Override
+  public void setSrcPath(String path) {
+    this.translator.getBuilder().setSrcPath(path);
+  }
+}

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryResponsePBImpl.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto.Builder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * RemoveMountTableEntryResponse.
+ */
+public class RemoveMountTableEntryResponsePBImpl
+    extends RemoveMountTableEntryResponse implements PBRecord {
+
+  private FederationProtocolPBTranslator<RemoveMountTableEntryResponseProto,
+      Builder, RemoveMountTableEntryResponseProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<RemoveMountTableEntryResponseProto,
+              RemoveMountTableEntryResponseProto.Builder,
+              RemoveMountTableEntryResponseProtoOrBuilder>(
+                  RemoveMountTableEntryResponseProto.class);
+
+  public RemoveMountTableEntryResponsePBImpl() {
+  }
+
+  public RemoveMountTableEntryResponsePBImpl(
+      RemoveMountTableEntryResponseProto proto) {
+    this.setProto(proto);
+  }
+
+  @Override
+  public RemoveMountTableEntryResponseProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public boolean getStatus() {
+    return this.translator.getProtoOrBuilder().getStatus();
+  }
+
+  @Override
+  public void setStatus(boolean result) {
+    this.translator.getBuilder().setStatus(result);
+  }
+}

+ 96 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryRequestPBImpl.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * UpdateMountTableEntryRequest.
+ */
+public class UpdateMountTableEntryRequestPBImpl
+    extends UpdateMountTableEntryRequest implements PBRecord {
+
+  private FederationProtocolPBTranslator<UpdateMountTableEntryRequestProto,
+      UpdateMountTableEntryRequestProto.Builder,
+      UpdateMountTableEntryRequestProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<UpdateMountTableEntryRequestProto,
+              UpdateMountTableEntryRequestProto.Builder,
+              UpdateMountTableEntryRequestProtoOrBuilder>(
+                  UpdateMountTableEntryRequestProto.class);
+
+  public UpdateMountTableEntryRequestPBImpl() {
+  }
+
+  public UpdateMountTableEntryRequestPBImpl(
+      UpdateMountTableEntryRequestProto proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public UpdateMountTableEntryRequestProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public MountTable getEntry() throws IOException {
+    MountTableRecordProto statsProto =
+        this.translator.getProtoOrBuilder().getEntry();
+    MountTable stats = StateStoreSerializer.newRecord(MountTable.class);
+    if (stats instanceof MountTablePBImpl) {
+      MountTablePBImpl entryPB = (MountTablePBImpl)stats;
+      entryPB.setProto(statsProto);
+      return entryPB;
+    } else {
+      throw new IOException("Cannot get stats for the membership");
+    }
+  }
+
+  @Override
+  public void setEntry(MountTable mount) throws IOException {
+    if (mount instanceof MountTablePBImpl) {
+      MountTablePBImpl mountPB = (MountTablePBImpl)mount;
+      MountTableRecordProto mountProto =
+          (MountTableRecordProto)mountPB.getProto();
+      this.translator.getBuilder().setEntry(mountProto);
+    } else {
+      throw new IOException("Cannot set mount table entry");
+    }
+  }
+}

+ 76 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryResponsePBImpl.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * UpdateMountTableEntryResponse.
+ */
+public class UpdateMountTableEntryResponsePBImpl
+    extends UpdateMountTableEntryResponse implements PBRecord {
+
+  private FederationProtocolPBTranslator<UpdateMountTableEntryResponseProto,
+      UpdateMountTableEntryResponseProto.Builder,
+      UpdateMountTableEntryResponseProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<UpdateMountTableEntryResponseProto,
+          UpdateMountTableEntryResponseProto.Builder,
+          UpdateMountTableEntryResponseProtoOrBuilder>(
+              UpdateMountTableEntryResponseProto.class);
+
+  public UpdateMountTableEntryResponsePBImpl() {
+  }
+
+  public UpdateMountTableEntryResponsePBImpl(
+      UpdateMountTableEntryResponseProto proto) {
+    this.setProto(proto);
+  }
+
+  @Override
+  public UpdateMountTableEntryResponseProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public boolean getStatus() {
+    return this.translator.getProtoOrBuilder().getStatus();
+  }
+
+  @Override
+  public void setStatus(boolean result) {
+    this.translator.getBuilder().setStatus(result);
+  }
+}

+ 301 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java

@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data schema for
+ * {@link org.apache.hadoop.hdfs.server.federation.store.
+ * MountTableStore FederationMountTableStore} data stored in the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.
+ * StateStoreService FederationStateStoreService}. Supports string
+ * serialization.
+ */
+public abstract class MountTable extends BaseRecord {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MountTable.class);
+
+
+  /**
+   * Default constructor for a mount table entry.
+   */
+  public MountTable() {
+    super();
+  }
+
+  public static MountTable newInstance() {
+    MountTable record = StateStoreSerializer.newRecord(MountTable.class);
+    record.init();
+    return record;
+  }
+
+  /**
+   * Constructor for a mount table entry with a single destinations.
+   *
+   * @param src Source path in the mount entry.
+   * @param destinations Nameservice destination of the mount point.
+   * @param dateCreated Created date.
+   * @param dateModified Modified date.
+   * @throws IOException
+   */
+  public static MountTable newInstance(final String src,
+      final Map<String, String> destinations,
+      long dateCreated, long dateModified) throws IOException {
+
+    MountTable record = newInstance(src, destinations);
+    record.setDateCreated(dateCreated);
+    record.setDateModified(dateModified);
+    return record;
+  }
+
+  /**
+   * Constructor for a mount table entry with multiple destinations.
+   *
+   * @param src Source path in the mount entry.
+   * @param destinations Nameservice destinations of the mount point.
+   * @throws IOException
+   */
+  public static MountTable newInstance(final String src,
+      final Map<String, String> destinations) throws IOException {
+    MountTable record = newInstance();
+
+    // Normalize the mount path
+    record.setSourcePath(normalizeFileSystemPath(src));
+
+    // Build a list of remote locations
+    final List<RemoteLocation> locations = new LinkedList<>();
+    for (Entry<String, String> entry : destinations.entrySet()) {
+      String nsId = entry.getKey();
+      String path = normalizeFileSystemPath(entry.getValue());
+      RemoteLocation location = new RemoteLocation(nsId, path);
+      locations.add(location);
+    }
+
+    // Set the serialized dest string
+    record.setDestinations(locations);
+
+    // Validate
+    record.validate();
+    return record;
+  }
+
+  /**
+   * Get source path in the federated namespace.
+   *
+   * @return Source path in the federated namespace.
+   */
+  public abstract String getSourcePath();
+
+  /**
+   * Set source path in the federated namespace.
+   *
+   * @param path Source path in the federated namespace.
+   */
+  public abstract void setSourcePath(String path);
+
+  /**
+   * Get a list of destinations (namespace + path) present for this entry.
+   *
+   * @return List of RemoteLocation destinations. Null if no destinations.
+   */
+  public abstract List<RemoteLocation> getDestinations();
+
+  /**
+   * Set the destination paths.
+   *
+   * @param paths Destination paths.
+   */
+  public abstract void setDestinations(List<RemoteLocation> dests);
+
+  /**
+   * Add a new destination to this mount table entry.
+   */
+  public abstract boolean addDestination(String nsId, String path);
+
+  /**
+   * Check if the entry is read only.
+   *
+   * @return If the entry is read only.
+   */
+  public abstract boolean isReadOnly();
+
+  /**
+   * Set an entry to be read only.
+   *
+   * @param ro If the entry is read only.
+   */
+  public abstract void setReadOnly(boolean ro);
+
+  /**
+   * Get the order of the destinations for this mount table entry.
+   *
+   * @return Order of the destinations.
+   */
+  public abstract DestinationOrder getDestOrder();
+
+  /**
+   * Set the order of the destinations for this mount table entry.
+   *
+   * @param order Order of the destinations.
+   */
+  public abstract void setDestOrder(DestinationOrder order);
+
+  /**
+   * Get the default location.
+   * @return The default location.
+   */
+  public RemoteLocation getDefaultLocation() {
+    List<RemoteLocation> dests = this.getDestinations();
+    if (dests == null || dests.isEmpty()) {
+      return null;
+    }
+    return dests.get(0);
+  }
+
+  @Override
+  public boolean like(final BaseRecord o) {
+    if (o instanceof MountTable) {
+      MountTable other = (MountTable)o;
+      if (getSourcePath() != null &&
+          !getSourcePath().equals(other.getSourcePath())) {
+        return false;
+      }
+      if (getDestinations() != null &&
+          !getDestinations().equals(other.getDestinations())) {
+        return false;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.getSourcePath());
+    sb.append("->");
+    List<RemoteLocation> destinations = this.getDestinations();
+    sb.append(destinations);
+    if (destinations != null && destinations.size() > 1) {
+      sb.append("[" + this.getDestOrder() + "]");
+    }
+    if (this.isReadOnly()) {
+      sb.append("[RO]");
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public SortedMap<String, String> getPrimaryKeys() {
+    SortedMap<String, String> map = new TreeMap<>();
+    map.put("sourcePath", this.getSourcePath());
+    return map;
+  }
+
+  @Override
+  public boolean validate() {
+    boolean ret = super.validate();
+    if (this.getSourcePath() == null || this.getSourcePath().length() == 0) {
+      LOG.error("Invalid entry, no source path specified ", this);
+      ret = false;
+    }
+    if (!this.getSourcePath().startsWith("/")) {
+      LOG.error("Invalid entry, all mount points must start with / ", this);
+      ret = false;
+    }
+    if (this.getDestinations() == null || this.getDestinations().size() == 0) {
+      LOG.error("Invalid entry, no destination paths specified ", this);
+      ret = false;
+    }
+    for (RemoteLocation loc : getDestinations()) {
+      String nsId = loc.getNameserviceId();
+      if (nsId == null || nsId.length() == 0) {
+        LOG.error("Invalid entry, invalid destination nameservice ", this);
+        ret = false;
+      }
+      if (loc.getDest() == null || loc.getDest().length() == 0) {
+        LOG.error("Invalid entry, invalid destination path ", this);
+        ret = false;
+      }
+      if (!loc.getDest().startsWith("/")) {
+        LOG.error("Invalid entry, all destination must start with / ", this);
+        ret = false;
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public long getExpirationMs() {
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 31)
+        .append(this.getSourcePath())
+        .append(this.getDestinations())
+        .append(this.isReadOnly())
+        .append(this.getDestOrder())
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof MountTable) {
+      MountTable other = (MountTable)obj;
+      if (!this.getSourcePath().equals(other.getSourcePath())) {
+        return false;
+      } else if (!this.getDestinations().equals(other.getDestinations())) {
+        return false;
+      } else if (this.isReadOnly() != other.isReadOnly()) {
+        return false;
+      } else if (!this.getDestOrder().equals(other.getDestOrder())) {
+        return false;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Normalize a path for that filesystem.
+   *
+   * @param path Path to normalize.
+   * @return Normalized path.
+   */
+  private static String normalizeFileSystemPath(final String path) {
+    Path normalizedPath = new Path(path);
+    return normalizedPath.toString();
+  }
+}

+ 213 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java

@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.Builder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the MountTable record.
+ */
+public class MountTablePBImpl extends MountTable implements PBRecord {
+
+  private FederationProtocolPBTranslator<MountTableRecordProto, Builder,
+      MountTableRecordProtoOrBuilder> translator =
+          new FederationProtocolPBTranslator<MountTableRecordProto, Builder,
+              MountTableRecordProtoOrBuilder>(MountTableRecordProto.class);
+
+  public MountTablePBImpl() {
+  }
+
+  public MountTablePBImpl(MountTableRecordProto proto) {
+    this.setProto(proto);
+  }
+
+  @Override
+  public MountTableRecordProto getProto() {
+    return this.translator.build();
+  }
+
+  @Override
+  public void setProto(Message proto) {
+    this.translator.setProto(proto);
+  }
+
+  @Override
+  public void readInstance(String base64String) throws IOException {
+    this.translator.readInstance(base64String);
+  }
+
+  @Override
+  public String getSourcePath() {
+    MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+    if (!proto.hasSrcPath()) {
+      return null;
+    }
+    return proto.getSrcPath();
+  }
+
+  @Override
+  public void setSourcePath(String path) {
+    Builder builder = this.translator.getBuilder();
+    if (path == null) {
+      builder.clearSrcPath();
+    } else {
+      builder.setSrcPath(path);
+    }
+  }
+
+  @Override
+  public List<RemoteLocation> getDestinations() {
+    MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+    if (proto.getDestinationsCount() == 0) {
+      return null;
+    }
+
+    final List<RemoteLocation> ret = new LinkedList<>();
+    final List<RemoteLocationProto> destList = proto.getDestinationsList();
+    for (RemoteLocationProto dest : destList) {
+      String nsId = dest.getNameserviceId();
+      String path = dest.getPath();
+      RemoteLocation loc = new RemoteLocation(nsId, path);
+      ret.add(loc);
+    }
+    return ret;
+  }
+
+  @Override
+  public void setDestinations(final List<RemoteLocation> dests) {
+    Builder builder = this.translator.getBuilder();
+    builder.clearDestinations();
+    for (RemoteLocation dest : dests) {
+      RemoteLocationProto.Builder itemBuilder =
+          RemoteLocationProto.newBuilder();
+      String nsId = dest.getNameserviceId();
+      String path = dest.getDest();
+      itemBuilder.setNameserviceId(nsId);
+      itemBuilder.setPath(path);
+      RemoteLocationProto item = itemBuilder.build();
+      builder.addDestinations(item);
+    }
+  }
+
+  @Override
+  public boolean addDestination(String nsId, String path) {
+    // Check if the location is already there
+    List<RemoteLocation> dests = getDestinations();
+    for (RemoteLocation dest : dests) {
+      if (dest.getNameserviceId().equals(nsId) && dest.getDest().equals(path)) {
+        return false;
+      }
+    }
+
+    // Add it to the existing list
+    Builder builder = this.translator.getBuilder();
+    RemoteLocationProto.Builder itemBuilder =
+        RemoteLocationProto.newBuilder();
+    itemBuilder.setNameserviceId(nsId);
+    itemBuilder.setPath(path);
+    RemoteLocationProto item = itemBuilder.build();
+    builder.addDestinations(item);
+    return true;
+  }
+
+  @Override
+  public void setDateModified(long time) {
+    this.translator.getBuilder().setDateModified(time);
+  }
+
+  @Override
+  public long getDateModified() {
+    return this.translator.getProtoOrBuilder().getDateModified();
+  }
+
+  @Override
+  public void setDateCreated(long time) {
+    this.translator.getBuilder().setDateCreated(time);
+  }
+
+  @Override
+  public long getDateCreated() {
+    return this.translator.getProtoOrBuilder().getDateCreated();
+  }
+
+  @Override
+  public boolean isReadOnly() {
+    MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+    if (!proto.hasReadOnly()) {
+      return false;
+    }
+    return proto.getReadOnly();
+  }
+
+  @Override
+  public void setReadOnly(boolean ro) {
+    this.translator.getBuilder().setReadOnly(ro);
+  }
+
+  @Override
+  public DestinationOrder getDestOrder() {
+    MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+    return convert(proto.getDestOrder());
+  }
+
+  @Override
+  public void setDestOrder(DestinationOrder order) {
+    Builder builder = this.translator.getBuilder();
+    if (order == null) {
+      builder.clearDestOrder();
+    } else {
+      builder.setDestOrder(convert(order));
+    }
+  }
+
+  private DestinationOrder convert(DestOrder order) {
+    switch (order) {
+    case LOCAL:
+      return DestinationOrder.LOCAL;
+    case RANDOM:
+      return DestinationOrder.RANDOM;
+    default:
+      return DestinationOrder.HASH;
+    }
+  }
+
+  private DestOrder convert(DestinationOrder order) {
+    switch (order) {
+    case LOCAL:
+      return DestOrder.LOCAL;
+    case RANDOM:
+      return DestOrder.RANDOM;
+    default:
+      return DestOrder.HASH;
+    }
+  }
+}

+ 60 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto

@@ -104,4 +104,63 @@ message NamenodeHeartbeatRequestProto {
 
 message NamenodeHeartbeatResponseProto {
   optional bool status = 1;
-}
+}
+
+
+/////////////////////////////////////////////////
+// Mount table
+/////////////////////////////////////////////////
+
+message RemoteLocationProto {
+  optional string nameserviceId = 1;
+  optional string path = 2;
+}
+
+message MountTableRecordProto {
+  optional string srcPath = 1;
+  repeated RemoteLocationProto destinations = 2;
+  optional uint64 dateCreated = 3;
+  optional uint64 dateModified = 4;
+  optional bool readOnly = 5 [default = false];
+
+  enum DestOrder {
+    HASH = 0;
+    LOCAL = 1;
+    RANDOM = 2;
+  }
+  optional DestOrder destOrder = 6 [default = HASH];
+}
+
+message AddMountTableEntryRequestProto {
+  optional MountTableRecordProto entry = 1;
+}
+
+message AddMountTableEntryResponseProto {
+  optional bool status = 1;
+}
+
+message UpdateMountTableEntryRequestProto {
+  optional MountTableRecordProto entry = 1;
+}
+
+message UpdateMountTableEntryResponseProto {
+  optional bool status = 1;
+}
+
+message RemoveMountTableEntryRequestProto {
+  optional string srcPath = 1;
+}
+
+message RemoveMountTableEntryResponseProto{
+  optional bool status = 1;
+}
+
+message GetMountTableEntriesRequestProto {
+  optional string srcPath = 1;
+}
+
+message GetMountTableEntriesResponseProto {
+  repeated MountTableRecordProto entries = 1;
+  optional uint64 timestamp = 2;
+}
+

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityCompara
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
 import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.util.Time;
 
@@ -68,6 +69,10 @@ public class MockResolver
     this();
   }
 
+  public MockResolver(Configuration conf, Router router) {
+    this();
+  }
+
   public void addLocation(String mount, String nsId, String location) {
     List<RemoteLocation> locationsList = this.locations.get(mount);
     if (locationsList == null) {
@@ -258,7 +263,6 @@ public class MockResolver
 
   @Override
   public PathLocation getDestinationForPath(String path) throws IOException {
-    Set<String> namespaceSet = new HashSet<>();
     List<RemoteLocation> remoteLocations = new LinkedList<>();
     for (String key : this.locations.keySet()) {
       if (path.startsWith(key)) {
@@ -268,7 +272,6 @@ public class MockResolver
           RemoteLocation remoteLocation =
               new RemoteLocation(nameservice, finalPath);
           remoteLocations.add(remoteLocation);
-          namespaceSet.add(nameservice);
         }
         break;
       }
@@ -277,7 +280,7 @@ public class MockResolver
       // Path isn't supported, mimic resolver behavior.
       return null;
     }
-    return new PathLocation(path, remoteLocations, namespaceSet);
+    return new PathLocation(path, remoteLocations);
   }
 
   @Override

+ 396 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java

@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the {@link MountTableStore} from the {@link Router}.
+ */
+public class TestMountTableResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMountTableResolver.class);
+
+  private MountTableResolver mountTable;
+
+  private Map<String, String> getMountTableEntry(
+      String subcluster, String path) {
+    Map<String, String> ret = new HashMap<>();
+    ret.put(subcluster, path);
+    return ret;
+  }
+
+  /**
+   * Setup the mount table.
+   * / -> 1:/
+   * __tmp -> 2:/tmp
+   * __user -> 3:/user
+   * ____a -> 2:/user/test
+   * ______demo
+   * ________test
+   * __________a -> 1:/user/test
+   * __________b -> 3:/user/test
+   * ____b
+   * ______file1.txt -> 4:/user/file1.txt
+   * __usr
+   * ____bin -> 2:/bin
+   *
+   * @throws IOException If it cannot set the mount table.
+   */
+  private void setupMountTable() throws IOException {
+    Configuration conf = new Configuration();
+    mountTable = new MountTableResolver(conf);
+
+    // Root mount point
+    Map<String, String> map = getMountTableEntry("1", "/");
+    mountTable.addEntry(MountTable.newInstance("/", map));
+
+    // /tmp
+    map = getMountTableEntry("2", "/");
+    mountTable.addEntry(MountTable.newInstance("/tmp", map));
+
+    // /user
+    map = getMountTableEntry("3", "/user");
+    mountTable.addEntry(MountTable.newInstance("/user", map));
+
+    // /usr/bin
+    map = getMountTableEntry("2", "/bin");
+    mountTable.addEntry(MountTable.newInstance("/usr/bin", map));
+
+    // /user/a
+    map = getMountTableEntry("2", "/user/test");
+    mountTable.addEntry(MountTable.newInstance("/user/a", map));
+
+    // /user/b/file1.txt
+    map = getMountTableEntry("4", "/user/file1.txt");
+    mountTable.addEntry(MountTable.newInstance("/user/b/file1.txt", map));
+
+    // /user/a/demo/test/a
+    map = getMountTableEntry("1", "/user/test");
+    mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/a", map));
+
+    // /user/a/demo/test/b
+    map = getMountTableEntry("3", "/user/test");
+    mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map));
+  }
+
+  @Before
+  public void setup() throws IOException {
+    setupMountTable();
+  }
+
+  @Test
+  public void testDestination() throws IOException {
+
+    // Check files
+    assertEquals("1->/tesfile1.txt",
+        mountTable.getDestinationForPath("/tesfile1.txt").toString());
+
+    assertEquals("3->/user/testfile2.txt",
+        mountTable.getDestinationForPath("/user/testfile2.txt").toString());
+
+    assertEquals("2->/user/test/testfile3.txt",
+        mountTable.getDestinationForPath("/user/a/testfile3.txt").toString());
+
+    assertEquals("3->/user/b/testfile4.txt",
+        mountTable.getDestinationForPath("/user/b/testfile4.txt").toString());
+
+    assertEquals("1->/share/file5.txt",
+        mountTable.getDestinationForPath("/share/file5.txt").toString());
+
+    assertEquals("2->/bin/file7.txt",
+        mountTable.getDestinationForPath("/usr/bin/file7.txt").toString());
+
+    assertEquals("1->/usr/file8.txt",
+        mountTable.getDestinationForPath("/usr/file8.txt").toString());
+
+    assertEquals("2->/user/test/demo/file9.txt",
+        mountTable.getDestinationForPath("/user/a/demo/file9.txt").toString());
+
+    // Check folders
+    assertEquals("3->/user/testfolder",
+        mountTable.getDestinationForPath("/user/testfolder").toString());
+
+    assertEquals("2->/user/test/b",
+        mountTable.getDestinationForPath("/user/a/b").toString());
+
+    assertEquals("3->/user/test/a",
+        mountTable.getDestinationForPath("/user/test/a").toString());
+
+  }
+
+  private void compareLists(List<String> list1, String[] list2) {
+    assertEquals(list1.size(), list2.length);
+    for (String item : list2) {
+      assertTrue(list1.contains(item));
+    }
+  }
+
+  @Test
+  public void testGetMountPoints() throws IOException {
+
+    // Check getting all mount points (virtual and real) beneath a path
+    List<String> mounts = mountTable.getMountPoints("/");
+    assertEquals(3, mounts.size());
+    compareLists(mounts, new String[] {"tmp", "user", "usr"});
+
+    mounts = mountTable.getMountPoints("/user");
+    assertEquals(2, mounts.size());
+    compareLists(mounts, new String[] {"a", "b"});
+
+    mounts = mountTable.getMountPoints("/user/a");
+    assertEquals(1, mounts.size());
+    compareLists(mounts, new String[] {"demo"});
+
+    mounts = mountTable.getMountPoints("/user/a/demo");
+    assertEquals(1, mounts.size());
+    compareLists(mounts, new String[] {"test"});
+
+    mounts = mountTable.getMountPoints("/user/a/demo/test");
+    assertEquals(2, mounts.size());
+    compareLists(mounts, new String[] {"a", "b"});
+
+    mounts = mountTable.getMountPoints("/tmp");
+    assertEquals(0, mounts.size());
+
+    mounts = mountTable.getMountPoints("/t");
+    assertNull(mounts);
+
+    mounts = mountTable.getMountPoints("/unknownpath");
+    assertNull(mounts);
+  }
+
+  private void compareRecords(List<MountTable> list1, String[] list2) {
+    assertEquals(list1.size(), list2.length);
+    for (String item : list2) {
+      for (MountTable record : list1) {
+        if (record.getSourcePath().equals(item)) {
+          return;
+        }
+      }
+    }
+    fail();
+  }
+
+  @Test
+  public void testGetMounts() throws IOException {
+
+    // Check listing the mount table records at or beneath a path
+    List<MountTable> records = mountTable.getMounts("/");
+    assertEquals(8, records.size());
+    compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin",
+        "user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt"});
+
+    records = mountTable.getMounts("/user");
+    assertEquals(5, records.size());
+    compareRecords(records, new String[] {"/user", "/user/a/demo/a",
+        "/user/a/demo/b", "user/a", "/user/b/file1.txt"});
+
+    records = mountTable.getMounts("/user/a");
+    assertEquals(3, records.size());
+    compareRecords(records,
+        new String[] {"/user/a/demo/a", "/user/a/demo/b", "/user/a"});
+
+    records = mountTable.getMounts("/tmp");
+    assertEquals(1, records.size());
+    compareRecords(records, new String[] {"/tmp"});
+  }
+
+  @Test
+  public void testRemoveSubTree()
+      throws UnsupportedOperationException, IOException {
+
+    // 3 mount points are present /tmp, /user, /usr
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "tmp"});
+
+    // /tmp currently points to namespace 2
+    assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt")
+        .getDefaultLocation().getNameserviceId());
+
+    // Remove tmp
+    mountTable.removeEntry("/tmp");
+
+    // Now 2 mount points are present /user, /usr
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr"});
+
+    // /tmp no longer exists, uses default namespace for mapping /
+    assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt")
+        .getDefaultLocation().getNameserviceId());
+  }
+
+  @Test
+  public void testRemoveVirtualNode()
+      throws UnsupportedOperationException, IOException {
+
+    // 3 mount points are present /tmp, /user, /usr
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "tmp"});
+
+    // /usr is virtual, uses namespace 1->/
+    assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt")
+        .getDefaultLocation().getNameserviceId());
+
+    // Attempt to remove /usr
+    mountTable.removeEntry("/usr");
+
+    // Verify the remove failed
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "tmp"});
+  }
+
+  @Test
+  public void testRemoveLeafNode()
+      throws UnsupportedOperationException, IOException {
+
+    // /user/a/demo/test/a currently points to namespace 1
+    assertEquals("1", mountTable.getDestinationForPath("/user/a/demo/test/a")
+        .getDefaultLocation().getNameserviceId());
+
+    // Remove /user/a/demo/test/a
+    mountTable.removeEntry("/user/a/demo/test/a");
+
+    // Now /user/a/demo/test/a points to namespace 2 using the entry for /user/a
+    assertEquals("2", mountTable.getDestinationForPath("/user/a/demo/test/a")
+        .getDefaultLocation().getNameserviceId());
+
+    // Verify the virtual node at /user/a/demo still exists and was not deleted
+    compareLists(mountTable.getMountPoints("/user/a"), new String[] {"demo"});
+
+    // Verify the sibling node was unaffected and still points to ns 3
+    assertEquals("3", mountTable.getDestinationForPath("/user/a/demo/test/b")
+        .getDefaultLocation().getNameserviceId());
+  }
+
+  @Test
+  public void testRefreshEntries()
+      throws UnsupportedOperationException, IOException {
+
+    // Initial table loaded
+    testDestination();
+    assertEquals(8, mountTable.getMounts("/").size());
+
+    // Replace table with /1 and /2
+    List<MountTable> records = new ArrayList<>();
+    Map<String, String> map1 = getMountTableEntry("1", "/");
+    records.add(MountTable.newInstance("/1", map1));
+    Map<String, String> map2 = getMountTableEntry("2", "/");
+    records.add(MountTable.newInstance("/2", map2));
+    mountTable.refreshEntries(records);
+
+    // Verify addition
+    PathLocation destination1 = mountTable.getDestinationForPath("/1");
+    RemoteLocation defaultLoc1 = destination1.getDefaultLocation();
+    assertEquals("1", defaultLoc1.getNameserviceId());
+
+    PathLocation destination2 = mountTable.getDestinationForPath("/2");
+    RemoteLocation defaultLoc2 = destination2.getDefaultLocation();
+    assertEquals("2", defaultLoc2.getNameserviceId());
+
+    // Verify existing entries were removed
+    assertEquals(2, mountTable.getMounts("/").size());
+    boolean assertionThrown = false;
+    try {
+      testDestination();
+      fail();
+    } catch (AssertionError e) {
+      // The / entry was removed, so it triggers an exception
+      assertionThrown = true;
+    }
+    assertTrue(assertionThrown);
+  }
+
+  @Test
+  public void testMountTableScalability() throws IOException {
+
+    List<MountTable> emptyList = new ArrayList<>();
+    mountTable.refreshEntries(emptyList);
+
+    // Add 100,000 entries in flat list
+    for (int i = 0; i < 100000; i++) {
+      Map<String, String> map = getMountTableEntry("1", "/" + i);
+      MountTable record = MountTable.newInstance("/" + i, map);
+      mountTable.addEntry(record);
+      if (i % 10000 == 0) {
+        LOG.info("Adding flat mount record {}: {}", i, record);
+      }
+    }
+
+    assertEquals(100000, mountTable.getMountPoints("/").size());
+    assertEquals(100000, mountTable.getMounts("/").size());
+
+    // Add 1000 entries in deep list
+    mountTable.refreshEntries(emptyList);
+    String parent = "/";
+    for (int i = 0; i < 1000; i++) {
+      final int index = i;
+      Map<String, String> map = getMountTableEntry("1", "/" + index);
+      if (i > 0) {
+        parent = parent + "/";
+      }
+      parent = parent + i;
+      MountTable record = MountTable.newInstance(parent, map);
+      mountTable.addEntry(record);
+    }
+
+    assertEquals(1, mountTable.getMountPoints("/").size());
+    assertEquals(1000, mountTable.getMounts("/").size());
+
+    // Add 100,000 entries in deep and wide tree
+    mountTable.refreshEntries(emptyList);
+    Random rand = new Random();
+    parent = "/" + Integer.toString(rand.nextInt());
+    int numRootTrees = 1;
+    for (int i = 0; i < 100000; i++) {
+      final int index = i;
+      Map<String, String> map = getMountTableEntry("1", "/" + index);
+      parent = parent + "/" + i;
+      if (parent.length() > 2000) {
+        // Start new tree
+        parent = "/" + Integer.toString(rand.nextInt());
+        numRootTrees++;
+      }
+      MountTable record = MountTable.newInstance(parent, map);
+      mountTable.addEntry(record);
+    }
+
+    assertEquals(numRootTrees, mountTable.getMountPoints("/").size());
+    assertEquals(100000, mountTable.getMounts("/").size());
+  }
+}

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java

@@ -25,7 +25,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFile
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.util.Time;
 
 /**
@@ -234,6 +237,19 @@ public final class FederationStateStoreTestUtils {
     return false;
   }
 
+  public static List<MountTable> createMockMountTable(
+      List<String> nameservices) throws IOException {
+    // create table entries
+    List<MountTable> entries = new ArrayList<>();
+    for (String ns : nameservices) {
+      Map<String, String> destMap = new HashMap<>();
+      destMap.put(ns, "/target-" + ns);
+      MountTable entry = MountTable.newInstance("/" + ns, destMap);
+      entries.add(entry);
+    }
+    return entries;
+  }
+
   public static MembershipState createMockRegistrationForNamenode(
       String nameserviceId, String namenodeId,
       FederationNamenodeServiceState state) throws IOException {

+ 250 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java

@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link StateStoreService}
+ * {@link MountTableStore} functionality.
+ */
+public class TestStateStoreMountTable extends TestStateStoreBase {
+
+  private static List<String> nameservices;
+  private static MountTableStore mountStore;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    nameservices = new ArrayList<>();
+    nameservices.add(NAMESERVICES[0]);
+    nameservices.add(NAMESERVICES[1]);
+  }
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    mountStore =
+        getStateStore().getRegisteredRecordStore(MountTableStore.class);
+    // Clear Mount table registrations
+    assertTrue(clearRecords(getStateStore(), MountTable.class));
+  }
+
+  @Test
+  public void testStateStoreDisconnected() throws Exception {
+
+    // Close the data store driver
+    getStateStore().closeDriver();
+    assertFalse(getStateStore().isDriverReady());
+
+    // Test APIs that access the store to check they throw the correct exception
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance();
+    verifyException(mountStore, "addMountTableEntry",
+        StateStoreUnavailableException.class,
+        new Class[] {AddMountTableEntryRequest.class},
+        new Object[] {addRequest});
+
+    UpdateMountTableEntryRequest updateRequest =
+        UpdateMountTableEntryRequest.newInstance();
+    verifyException(mountStore, "updateMountTableEntry",
+        StateStoreUnavailableException.class,
+        new Class[] {UpdateMountTableEntryRequest.class},
+        new Object[] {updateRequest});
+
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance();
+    verifyException(mountStore, "removeMountTableEntry",
+        StateStoreUnavailableException.class,
+        new Class[] {RemoveMountTableEntryRequest.class},
+        new Object[] {removeRequest});
+
+    GetMountTableEntriesRequest getRequest =
+        GetMountTableEntriesRequest.newInstance();
+    mountStore.loadCache(true);
+    verifyException(mountStore, "getMountTableEntries",
+        StateStoreUnavailableException.class,
+        new Class[] {GetMountTableEntriesRequest.class},
+        new Object[] {getRequest});
+  }
+
+  @Test
+  public void testSynchronizeMountTable() throws IOException {
+    // Synchronize and get mount table entries
+    List<MountTable> entries = createMockMountTable(nameservices);
+    assertTrue(synchronizeRecords(getStateStore(), entries, MountTable.class));
+    for (MountTable e : entries) {
+      mountStore.loadCache(true);
+      MountTable entry = getMountTableEntry(e.getSourcePath());
+      assertNotNull(entry);
+      assertEquals(e.getDefaultLocation().getDest(),
+          entry.getDefaultLocation().getDest());
+    }
+  }
+
+  @Test
+  public void testAddMountTableEntry() throws IOException {
+
+    // Add 1
+    List<MountTable> entries = createMockMountTable(nameservices);
+    List<MountTable> entries1 = getMountTableEntries("/").getRecords();
+    assertEquals(0, entries1.size());
+    MountTable entry0 = entries.get(0);
+    AddMountTableEntryRequest request =
+        AddMountTableEntryRequest.newInstance(entry0);
+    AddMountTableEntryResponse response =
+        mountStore.addMountTableEntry(request);
+    assertTrue(response.getStatus());
+
+    mountStore.loadCache(true);
+    List<MountTable> entries2 = getMountTableEntries("/").getRecords();
+    assertEquals(1, entries2.size());
+  }
+
+  @Test
+  public void testRemoveMountTableEntry() throws IOException {
+
+    // Add many
+    List<MountTable> entries = createMockMountTable(nameservices);
+    synchronizeRecords(getStateStore(), entries, MountTable.class);
+    mountStore.loadCache(true);
+    List<MountTable> entries1 = getMountTableEntries("/").getRecords();
+    assertEquals(entries.size(), entries1.size());
+
+    // Remove 1
+    RemoveMountTableEntryRequest request =
+        RemoveMountTableEntryRequest.newInstance();
+    request.setSrcPath(entries.get(0).getSourcePath());
+    assertTrue(mountStore.removeMountTableEntry(request).getStatus());
+
+    // Verify remove
+    mountStore.loadCache(true);
+    List<MountTable> entries2 = getMountTableEntries("/").getRecords();
+    assertEquals(entries.size() - 1, entries2.size());
+  }
+
+  @Test
+  public void testUpdateMountTableEntry() throws IOException {
+
+    // Add 1
+    List<MountTable> entries = createMockMountTable(nameservices);
+    MountTable entry0 = entries.get(0);
+    String srcPath = entry0.getSourcePath();
+    String nsId = entry0.getDefaultLocation().getNameserviceId();
+    AddMountTableEntryRequest request =
+        AddMountTableEntryRequest.newInstance(entry0);
+    AddMountTableEntryResponse response =
+        mountStore.addMountTableEntry(request);
+    assertTrue(response.getStatus());
+
+    // Verify
+    mountStore.loadCache(true);
+    MountTable matchingEntry0 = getMountTableEntry(srcPath);
+    assertNotNull(matchingEntry0);
+    assertEquals(nsId, matchingEntry0.getDefaultLocation().getNameserviceId());
+
+    // Edit destination nameservice for source path
+    Map<String, String> destMap =
+        Collections.singletonMap("testnameservice", "/");
+    MountTable replacement =
+        MountTable.newInstance(srcPath, destMap);
+    UpdateMountTableEntryRequest updateRequest =
+        UpdateMountTableEntryRequest.newInstance(replacement);
+    UpdateMountTableEntryResponse updateResponse =
+        mountStore.updateMountTableEntry(updateRequest);
+    assertTrue(updateResponse.getStatus());
+
+    // Verify
+    mountStore.loadCache(true);
+    MountTable matchingEntry1 = getMountTableEntry(srcPath);
+    assertNotNull(matchingEntry1);
+    assertEquals("testnameservice",
+        matchingEntry1.getDefaultLocation().getNameserviceId());
+  }
+
+  /**
+   * Gets an existing mount table record in the state store.
+   *
+   * @param mount The mount point of the record to remove.
+   * @return The matching record if found, null if it is not found.
+   * @throws IOException If the state store could not be accessed.
+   */
+  private MountTable getMountTableEntry(String mount) throws IOException {
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance(mount);
+    GetMountTableEntriesResponse response =
+        mountStore.getMountTableEntries(request);
+    List<MountTable> results = response.getEntries();
+    if (results.size() > 0) {
+      // First result is sorted to have the shortest mount string length
+      return results.get(0);
+    }
+    return null;
+  }
+
+  /**
+   * Fetch all mount table records beneath a root path.
+   *
+   * @param store FederationMountTableStore instance to commit the data.
+   * @param mount The root search path, enter "/" to return all mount table
+   *          records.
+   *
+   * @return A list of all mount table records found below the root mount.
+   *
+   * @throws IOException If the state store could not be accessed.
+   */
+  private QueryResult<MountTable> getMountTableEntries(String mount)
+      throws IOException {
+    if (mount == null) {
+      throw new IOException("Please specify a root search path");
+    }
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance();
+    request.setSrcPath(mount);
+    GetMountTableEntriesResponse response =
+        mountStore.getMountTableEntries(request);
+    List<MountTable> records = response.getEntries();
+    long timestamp = response.getTimestamp();
+    return new QueryResult<MountTable>(records, timestamp);
+  }
+}

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUt
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.hdfs.server.federation.store.records.Query;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
 import org.junit.AfterClass;
@@ -109,6 +111,11 @@ public class TestStateStoreDriverBase {
           generateRandomString(), generateRandomString(),
           generateRandomString(), generateRandomString(),
           generateRandomEnum(FederationNamenodeServiceState.class), false);
+    } else if (recordClass == MountTable.class) {
+      String src = "/" + generateRandomString();
+      Map<String, String> destMap = Collections.singletonMap(
+          generateRandomString(), "/" + generateRandomString());
+      return (T) MountTable.newInstance(src, destMap);
     }
 
     return null;
@@ -155,6 +162,7 @@ public class TestStateStoreDriverBase {
 
   public static void removeAll(StateStoreDriver driver) throws IOException {
     driver.removeAll(MembershipState.class);
+    driver.removeAll(MountTable.class);
   }
 
   public <T extends BaseRecord> void testInsert(
@@ -347,22 +355,26 @@ public class TestStateStoreDriverBase {
   public void testInsert(StateStoreDriver driver)
       throws IllegalArgumentException, IllegalAccessException, IOException {
     testInsert(driver, MembershipState.class);
+    testInsert(driver, MountTable.class);
   }
 
   public void testPut(StateStoreDriver driver)
       throws IllegalArgumentException, ReflectiveOperationException,
       IOException, SecurityException {
     testPut(driver, MembershipState.class);
+    testPut(driver, MountTable.class);
   }
 
   public void testRemove(StateStoreDriver driver)
       throws IllegalArgumentException, IllegalAccessException, IOException {
     testRemove(driver, MembershipState.class);
+    testRemove(driver, MountTable.class);
   }
 
   public void testFetchErrors(StateStoreDriver driver)
       throws IllegalArgumentException, IllegalAccessException, IOException {
     testFetchErrors(driver, MembershipState.class);
+    testFetchErrors(driver, MountTable.class);
   }
 
   /**

+ 176 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java

@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.junit.Test;
+
+/**
+ * Test the Mount Table entry in the State Store.
+ */
+public class TestMountTable {
+
+  private static final String SRC = "/test";
+  private static final String DST_NS_0 = "ns0";
+  private static final String DST_NS_1 = "ns1";
+  private static final String DST_PATH_0 = "/path1";
+  private static final String DST_PATH_1 = "/path/path2";
+  private static final List<RemoteLocation> DST = new LinkedList<>();
+  static {
+    DST.add(new RemoteLocation(DST_NS_0, DST_PATH_0));
+    DST.add(new RemoteLocation(DST_NS_1, DST_PATH_1));
+  }
+  private static final Map<String, String> DST_MAP = new LinkedHashMap<>();
+  static {
+    DST_MAP.put(DST_NS_0, DST_PATH_0);
+    DST_MAP.put(DST_NS_1, DST_PATH_1);
+  }
+
+  private static final long DATE_CREATED = 100;
+  private static final long DATE_MOD = 200;
+
+
+  @Test
+  public void testGetterSetter() throws IOException {
+
+    MountTable record = MountTable.newInstance(SRC, DST_MAP);
+
+    validateDestinations(record);
+    assertEquals(SRC, record.getSourcePath());
+    assertEquals(DST, record.getDestinations());
+    assertTrue(DATE_CREATED > 0);
+    assertTrue(DATE_MOD > 0);
+
+    MountTable record2 =
+        MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD);
+
+    validateDestinations(record2);
+    assertEquals(SRC, record2.getSourcePath());
+    assertEquals(DST, record2.getDestinations());
+    assertEquals(DATE_CREATED, record2.getDateCreated());
+    assertEquals(DATE_MOD, record2.getDateModified());
+    assertFalse(record.isReadOnly());
+    assertEquals(DestinationOrder.HASH, record.getDestOrder());
+  }
+
+  @Test
+  public void testSerialization() throws IOException {
+    testSerialization(DestinationOrder.RANDOM);
+    testSerialization(DestinationOrder.HASH);
+    testSerialization(DestinationOrder.LOCAL);
+  }
+
+  private void testSerialization(final DestinationOrder order)
+      throws IOException {
+
+    MountTable record = MountTable.newInstance(
+        SRC, DST_MAP, DATE_CREATED, DATE_MOD);
+    record.setReadOnly(true);
+    record.setDestOrder(order);
+
+    StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
+    String serializedString = serializer.serializeString(record);
+    MountTable record2 =
+        serializer.deserialize(serializedString, MountTable.class);
+
+    validateDestinations(record2);
+    assertEquals(SRC, record2.getSourcePath());
+    assertEquals(DST, record2.getDestinations());
+    assertEquals(DATE_CREATED, record2.getDateCreated());
+    assertEquals(DATE_MOD, record2.getDateModified());
+    assertTrue(record2.isReadOnly());
+    assertEquals(order, record2.getDestOrder());
+  }
+
+  @Test
+  public void testReadOnly() throws IOException {
+
+    Map<String, String> dest = new HashMap<>();
+    dest.put(DST_NS_0, DST_PATH_0);
+    dest.put(DST_NS_1, DST_PATH_1);
+    MountTable record1 = MountTable.newInstance(SRC, dest);
+    record1.setReadOnly(true);
+
+    validateDestinations(record1);
+    assertEquals(SRC, record1.getSourcePath());
+    assertEquals(DST, record1.getDestinations());
+    assertTrue(DATE_CREATED > 0);
+    assertTrue(DATE_MOD > 0);
+    assertTrue(record1.isReadOnly());
+
+    MountTable record2 = MountTable.newInstance(
+        SRC, DST_MAP, DATE_CREATED, DATE_MOD);
+    record2.setReadOnly(true);
+
+    validateDestinations(record2);
+    assertEquals(SRC, record2.getSourcePath());
+    assertEquals(DST, record2.getDestinations());
+    assertEquals(DATE_CREATED, record2.getDateCreated());
+    assertEquals(DATE_MOD, record2.getDateModified());
+    assertTrue(record2.isReadOnly());
+  }
+
+  @Test
+  public void testOrder() throws IOException {
+    testOrder(DestinationOrder.HASH);
+    testOrder(DestinationOrder.LOCAL);
+    testOrder(DestinationOrder.RANDOM);
+  }
+
+  private void testOrder(final DestinationOrder order)
+      throws IOException {
+
+    MountTable record = MountTable.newInstance(
+        SRC, DST_MAP, DATE_CREATED, DATE_MOD);
+    record.setDestOrder(order);
+
+    validateDestinations(record);
+    assertEquals(SRC, record.getSourcePath());
+    assertEquals(DST, record.getDestinations());
+    assertEquals(DATE_CREATED, record.getDateCreated());
+    assertEquals(DATE_MOD, record.getDateModified());
+    assertEquals(order, record.getDestOrder());
+  }
+
+  private void validateDestinations(MountTable record) {
+
+    assertEquals(SRC, record.getSourcePath());
+    assertEquals(2, record.getDestinations().size());
+
+    RemoteLocation location1 = record.getDestinations().get(0);
+    assertEquals(DST_NS_0, location1.getNameserviceId());
+    assertEquals(DST_PATH_0, location1.getDest());
+
+    RemoteLocation location2 = record.getDestinations().get(1);
+    assertEquals(DST_NS_1, location2.getNameserviceId());
+    assertEquals(DST_PATH_1, location2.getDest());
+  }
+}