瀏覽代碼

HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri.

(cherry picked from commit c6e0bd640cdaf83a660fa050809cad6f1d4c6f4d)
Inigo Goiri 8 年之前
父節點
當前提交
4bf877b03f
共有 19 個文件被更改,包括 2329 次插入46 次删除
  1. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  2. 198 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
  3. 67 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java
  4. 138 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
  5. 44 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
  6. 23 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
  7. 8 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
  8. 24 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
  9. 429 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
  10. 161 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java
  11. 178 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java
  12. 77 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
  13. 19 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
  14. 66 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java
  15. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  16. 232 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
  17. 483 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
  18. 64 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java
  19. 88 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -25,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
 import org.apache.hadoop.http.HttpConfig;
 
@@ -1140,6 +1144,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT =
           StateStoreSerializerPBImpl.class;
 
+  public static final String FEDERATION_STORE_DRIVER_CLASS =
+      FEDERATION_STORE_PREFIX + "driver.class";
+  public static final Class<? extends StateStoreDriver>
+      FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class;
+
+  public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
+      FEDERATION_STORE_PREFIX + "connection.test";
+  public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(1);
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

+ 198 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Service to periodically execute a runnable.
+ */
+public abstract class PeriodicService extends AbstractService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PeriodicService.class);
+
+  /** Default interval in milliseconds for the periodic service. */
+  private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
+
+
+  /** Interval for running the periodic service in milliseconds. */
+  private long intervalMs;
+  /** Name of the service. */
+  private final String serviceName;
+
+  /** Scheduler for the periodic service. */
+  private final ScheduledExecutorService scheduler;
+
+  /** If the service is running. */
+  private volatile boolean isRunning = false;
+
+  /** How many times we run. */
+  private long runCount;
+  /** How many errors we got. */
+  private long errorCount;
+  /** When was the last time we executed this service successfully. */
+  private long lastRun;
+
+  /**
+   * Create a new periodic update service.
+   *
+   * @param name Name of the service.
+   */
+  public PeriodicService(String name) {
+    this(name, DEFAULT_INTERVAL_MS);
+  }
+
+  /**
+   * Create a new periodic update service.
+   *
+   * @param name Name of the service.
+   * @param interval Interval for the periodic service in milliseconds.
+   */
+  public PeriodicService(String name, long interval) {
+    super(name);
+    this.serviceName = name;
+    this.intervalMs = interval;
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(this.getName() + "-%d")
+        .build();
+    this.scheduler = Executors.newScheduledThreadPool(1, threadFactory);
+  }
+
+  /**
+   * Set the interval for the periodic service.
+   *
+   * @param interval Interval in milliseconds.
+   */
+  protected void setIntervalMs(long interval) {
+    if (getServiceState() == STATE.STARTED) {
+      throw new ServiceStateException("Periodic service already started");
+    } else {
+      this.intervalMs = interval;
+    }
+  }
+
+  /**
+   * Get the interval for the periodic service.
+   *
+   * @return Interval in milliseconds.
+   */
+  protected long getIntervalMs() {
+    return this.intervalMs;
+  }
+
+  /**
+   * Get how many times we failed to run the periodic service.
+   *
+   * @return Times we failed to run the periodic service.
+   */
+  protected long getErrorCount() {
+    return this.errorCount;
+  }
+
+  /**
+   * Get how many times we run the periodic service.
+   *
+   * @return Times we run the periodic service.
+   */
+  protected long getRunCount() {
+    return this.runCount;
+  }
+
+  /**
+   * Get the last time the periodic service was executed.
+   *
+   * @return Last time the periodic service was executed.
+   */
+  protected long getLastUpdate() {
+    return this.lastRun;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    LOG.info("Starting periodic service {}", this.serviceName);
+    startPeriodic();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopPeriodic();
+    LOG.info("Stopping periodic service {}", this.serviceName);
+    super.serviceStop();
+  }
+
+  /**
+   * Stop the periodic task.
+   */
+  protected synchronized void stopPeriodic() {
+    if (this.isRunning) {
+      LOG.info("{} is shutting down", this.serviceName);
+      this.isRunning = false;
+      this.scheduler.shutdownNow();
+    }
+  }
+
+  /**
+   * Start the periodic execution.
+   */
+  protected synchronized void startPeriodic() {
+    stopPeriodic();
+
+    // Create the runnable service
+    Runnable updateRunnable = new Runnable() {
+      @Override
+      public void run() {
+        LOG.debug("Running {} update task", serviceName);
+        try {
+          if (!isRunning) {
+            return;
+          }
+          periodicInvoke();
+          runCount++;
+          lastRun = Time.now();
+        } catch (Exception ex) {
+          errorCount++;
+          LOG.warn(serviceName + " service threw an exception", ex);
+        }
+      }
+    };
+
+    // Start the execution of the periodic service
+    this.isRunning = true;
+    this.scheduler.scheduleWithFixedDelay(
+        updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Method that the service will run periodically.
+   */
+  protected abstract void periodicInvoke();
+}

+ 67 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically monitor the connection of the StateStore
+ * {@link StateStoreService} data store and to re-open the connection
+ * to the data store if required.
+ */
+public class StateStoreConnectionMonitorService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreConnectionMonitorService.class);
+
+  /** Service that maintains the State Store connection. */
+  private final StateStoreService stateStore;
+
+
+  /**
+   * Create a new service to monitor the connectivity of the state store driver.
+   *
+   * @param store Instance of the state store to be monitored.
+   */
+  public StateStoreConnectionMonitorService(StateStoreService store) {
+    super(StateStoreConnectionMonitorService.class.getSimpleName());
+    this.stateStore = store;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.setIntervalMs(conf.getLong(
+        DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
+        DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    LOG.debug("Checking state store connection");
+    if (!stateStore.isDriverReady()) {
+      LOG.info("Attempting to open state store driver.");
+      stateStore.loadDriver();
+    }
+  }
+}

+ 138 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java

@@ -15,45 +15,168 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hdfs.server.federation.store;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * A service to initialize a
  * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
- * StateStoreDriver} and maintain the connection to the data store. There
- * are multiple state store driver connections supported:
+ * StateStoreDriver} and maintain the connection to the data store. There are
+ * multiple state store driver connections supported:
  * <ul>
- * <li>File {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
+ * <li>File
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
  * StateStoreFileImpl StateStoreFileImpl}
- * <li>ZooKeeper {@link org.apache.hadoop.hdfs.server.federation.store.driver.
- * impl.StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
+ * <li>ZooKeeper
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
+ * StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
  * </ul>
  * <p>
- * The service also supports the dynamic registration of data interfaces such as
- * the following:
+ * The service also supports the dynamic registration of record stores like:
  * <ul>
- * <li>{@link MembershipStateStore}: state of the Namenodes in the
+ * <li>{@link MembershipStore}: state of the Namenodes in the
  * federation.
  * <li>{@link MountTableStore}: Mount table between to subclusters.
  * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
- * <li>{@link RouterStateStore}: State of the routers in the federation.
  * </ul>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class StateStoreService extends CompositeService {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreService.class);
+
+
+  /** State Store configuration. */
+  private Configuration conf;
+
   /** Identifier for the service. */
   private String identifier;
 
-  // Stub class
-  public StateStoreService(String name) {
-    super(name);
+  /** Driver for the back end connection. */
+  private StateStoreDriver driver;
+
+  /** Service to maintain data store connection. */
+  private StateStoreConnectionMonitorService monitorService;
+
+
+  public StateStoreService() {
+    super(StateStoreService.class.getName());
+  }
+
+  /**
+   * Initialize the State Store and the connection to the backend.
+   *
+   * @param config Configuration for the State Store.
+   * @throws IOException
+   */
+  @Override
+  protected void serviceInit(Configuration config) throws Exception {
+    this.conf = config;
+
+    // Create implementation of State Store
+    Class<? extends StateStoreDriver> driverClass = this.conf.getClass(
+        DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+        DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
+        StateStoreDriver.class);
+    this.driver = ReflectionUtils.newInstance(driverClass, this.conf);
+
+    if (this.driver == null) {
+      throw new IOException("Cannot create driver for the State Store");
+    }
+
+    // Check the connection to the State Store periodically
+    this.monitorService = new StateStoreConnectionMonitorService(this);
+    this.addService(monitorService);
+
+    super.serviceInit(this.conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    loadDriver();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    closeDriver();
+
+    super.serviceStop();
+  }
+
+  /**
+   * List of records supported by this State Store.
+   *
+   * @return List of supported record classes.
+   */
+  public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
+    // TODO add list of records
+    return new LinkedList<>();
+  }
+
+  /**
+   * Load the State Store driver. If successful, refresh cached data tables.
+   */
+  public void loadDriver() {
+    synchronized (this.driver) {
+      if (!isDriverReady()) {
+        String driverName = this.driver.getClass().getSimpleName();
+        if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
+          LOG.info("Connection to the State Store driver {} is open and ready",
+              driverName);
+        } else {
+          LOG.error("Cannot initialize State Store driver {}", driverName);
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if the driver is ready to be used.
+   *
+   * @return If the driver is ready.
+   */
+  public boolean isDriverReady() {
+    return this.driver.isDriverReady();
+  }
+
+  /**
+   * Manually shuts down the driver.
+   *
+   * @throws Exception If the driver cannot be closed.
+   */
+  @VisibleForTesting
+  public void closeDriver() throws Exception {
+    if (this.driver != null) {
+      this.driver.close();
+    }
+  }
+
+  /**
+   * Get the state store driver.
+   *
+   * @return State store driver.
+   */
+  public StateStoreDriver getDriver() {
+    return this.driver;
   }
 
   /**
@@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService {
   public void setIdentifier(String id) {
     this.identifier = id;
   }
-}
+
+}

+ 44 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java

@@ -17,17 +17,22 @@
  */
 package org.apache.hadoop.hdfs.server.federation.store;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Set of utility functions used to query, create, update and delete data
- * records in the state store.
+ * Set of utility functions used to work with the State Store.
  */
 public final class StateStoreUtils {
 
-  private static final Log LOG = LogFactory.getLog(StateStoreUtils.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreUtils.class);
+
 
   private StateStoreUtils() {
     // Utility class
@@ -52,7 +57,7 @@ public final class StateStoreUtils {
 
     // Check if we went too far
     if (actualClazz.equals(BaseRecord.class)) {
-      LOG.error("We went too far (" + actualClazz + ") with " + clazz);
+      LOG.error("We went too far ({}) with {}", actualClazz, clazz);
       actualClazz = clazz;
     }
     return actualClazz;
@@ -69,4 +74,36 @@ public final class StateStoreUtils {
       Class<? extends BaseRecord> getRecordClass(final T record) {
     return getRecordClass(record.getClass());
   }
-}
+
+  /**
+   * Get the base class name for a record. If we get an implementation of a
+   * record we will return the real parent record class.
+   *
+   * @param clazz Class of the data record to check.
+   * @return Name of the base class for the record.
+   */
+  public static <T extends BaseRecord> String getRecordName(
+      final Class<T> clazz) {
+    return getRecordClass(clazz).getSimpleName();
+  }
+
+  /**
+   * Filters a list of records to find all records matching the query.
+   *
+   * @param query Map of field names and objects to use to filter results.
+   * @param records List of data records to filter.
+   * @return List of all records matching the query (or empty list if none
+   *         match), null if the data set could not be filtered.
+   */
+  public static <T extends BaseRecord> List<T> filterMultiple(
+      final Query<T> query, final Iterable<T> records) {
+
+    List<T> matchingList = new ArrayList<>();
+    for (T record : records) {
+      if (query.matches(record)) {
+        matchingList.add(record);
+      }
+    }
+    return matchingList;
+  }
+}

+ 23 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java

@@ -18,15 +18,16 @@
 package org.apache.hadoop.hdfs.server.federation.store.driver;
 
 import java.net.InetAddress;
-import java.util.List;
+import java.util.Collection;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Driver class for an implementation of a {@link StateStoreService}
@@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time;
  */
 public abstract class StateStoreDriver implements StateStoreRecordOperations {
 
-  private static final Log LOG = LogFactory.getLog(StateStoreDriver.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreDriver.class);
 
 
   /** State Store configuration. */
@@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
 
   /**
    * Initialize the state store connection.
+   *
    * @param config Configuration for the driver.
    * @param id Identifier for the driver.
    * @param records Records that are supported.
    * @return If initialized and ready, false if failed to initialize driver.
    */
   public boolean init(final Configuration config, final String id,
-      final List<Class<? extends BaseRecord>> records) {
+      final Collection<Class<? extends BaseRecord>> records) {
 
     this.conf = config;
     this.identifier = id;
@@ -62,8 +65,20 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
       LOG.warn("The identifier for the State Store connection is not set");
     }
 
-    // TODO stub
-    return false;
+    boolean success = initDriver();
+    if (!success) {
+      LOG.error("Cannot intialize driver for {}", getDriverName());
+      return false;
+    }
+
+    for (Class<? extends BaseRecord> cls : records) {
+      String recordString = StateStoreUtils.getRecordName(cls);
+      if (!initRecordStorage(recordString, cls)) {
+        LOG.error("Cannot initialize record store for {}", cls.getSimpleName());
+        return false;
+      }
+    }
+    return true;
   }
 
   /**
@@ -169,4 +184,4 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
     }
     return hostname;
   }
-}
+}

+ 8 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java

@@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
 import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.io.retry.Idempotent;
@@ -67,14 +67,14 @@ public interface StateStoreRecordOperations {
    * Get a single record from the store that matches the query.
    *
    * @param clazz Class of record to fetch.
-   * @param query Map of field names and objects to filter results.
+   * @param query Query to filter results.
    * @return A single record matching the query. Null if there are no matching
    *         records or more than one matching record in the store.
    * @throws IOException If multiple records match or if the data store cannot
    *           be queried.
    */
   @Idempotent
-  <T extends BaseRecord> T get(Class<T> clazz, Map<String, String> query)
+  <T extends BaseRecord> T get(Class<T> clazz, Query<T> query)
       throws IOException;
 
   /**
@@ -83,14 +83,14 @@ public interface StateStoreRecordOperations {
    * supports filtering it should overwrite this method.
    *
    * @param clazz Class of record to fetch.
-   * @param query Map of field names and objects to filter results.
+   * @param query Query to filter results.
    * @return Records of type clazz that match the query or empty list if none
    *         are found.
    * @throws IOException Throws exception if unable to query the data store.
    */
   @Idempotent
   <T extends BaseRecord> List<T> getMultiple(
-      Class<T> clazz, Map<String, String> query) throws IOException;
+      Class<T> clazz, Query<T> query) throws IOException;
 
   /**
    * Creates a single record. Optionally updates an existing record with same
@@ -152,13 +152,12 @@ public interface StateStoreRecordOperations {
    * Remove multiple records of a specific class that match a query. Requires
    * the getAll implementation to fetch fresh records on each call.
    *
-   * @param clazz Class of record to remove.
-   * @param filter matching filter to remove.
+   * @param query Query to filter what to remove.
    * @return The number of records removed.
    * @throws IOException Throws exception if unable to query the data store.
    */
   @AtMostOnce
-  <T extends BaseRecord> int remove(Class<T> clazz, Map<String, String> filter)
+  <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
       throws IOException;
 
-}
+}

+ 24 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java

@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
 
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
 
 /**
  * Base implementation of a State Store driver. It contains default
@@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
 
   @Override
   public <T extends BaseRecord> T get(
-      Class<T> clazz, Map<String, String> query) throws IOException {
+      Class<T> clazz, Query<T> query) throws IOException {
     List<T> records = getMultiple(clazz, query);
     if (records.size() > 1) {
       throw new IOException("Found more than one object in collection");
@@ -52,18 +55,32 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
     }
   }
 
+  @Override
+  public <T extends BaseRecord> List<T> getMultiple(
+      Class<T> clazz, Query<T> query) throws IOException  {
+    QueryResult<T> result = get(clazz);
+    List<T> records = result.getRecords();
+    List<T> ret = filterMultiple(query, records);
+    if (ret == null) {
+      throw new IOException("Cannot fetch records from the store");
+    }
+    return ret;
+  }
+
   @Override
   public <T extends BaseRecord> boolean put(
       T record, boolean allowUpdate, boolean errorIfExists) throws IOException {
-    List<T> singletonList = new ArrayList<T>();
+    List<T> singletonList = new ArrayList<>();
     singletonList.add(record);
     return putAll(singletonList, allowUpdate, errorIfExists);
   }
 
   @Override
   public <T extends BaseRecord> boolean remove(T record) throws IOException {
-    Map<String, String> primaryKeys = record.getPrimaryKeys();
-    Class<? extends BaseRecord> clazz = StateStoreUtils.getRecordClass(record);
-    return remove(clazz, primaryKeys) == 1;
+    final Query<T> query = new Query<T>(record);
+    Class<? extends BaseRecord> clazz = record.getClass();
+    @SuppressWarnings("unchecked")
+    Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
+    return remove(recordClass, query) == 1;
   }
-}
+}

+ 429 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java

@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StateStoreDriver} implementation based on a local file.
+ */
+public abstract class StateStoreFileBaseImpl
+    extends StateStoreSerializableImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
+
+  /** If it is initialized. */
+  private boolean initialized = false;
+
+  /** Name of the file containing the data. */
+  private static final String DATA_FILE_NAME = "records.data";
+
+
+  /**
+   * Lock reading records.
+   *
+   * @param clazz Class of the record.
+   */
+  protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz);
+
+  /**
+   * Unlock reading records.
+   *
+   * @param clazz Class of the record.
+   */
+  protected abstract <T extends BaseRecord> void unlockRecordRead(
+      Class<T> clazz);
+
+  /**
+   * Lock writing records.
+   *
+   * @param clazz Class of the record.
+   */
+  protected abstract <T extends BaseRecord> void lockRecordWrite(
+      Class<T> clazz);
+
+  /**
+   * Unlock writing records.
+   *
+   * @param clazz Class of the record.
+   */
+  protected abstract <T extends BaseRecord> void unlockRecordWrite(
+      Class<T> clazz);
+
+  /**
+   * Get the reader for the file system.
+   *
+   * @param clazz Class of the record.
+   */
+  protected abstract <T extends BaseRecord> BufferedReader getReader(
+      Class<T> clazz, String sub);
+
+  /**
+   * Get the writer for the file system.
+   *
+   * @param clazz Class of the record.
+   */
+  protected abstract <T extends BaseRecord> BufferedWriter getWriter(
+      Class<T> clazz, String sub);
+
+  /**
+   * Check if a path exists.
+   *
+   * @param path Path to check.
+   * @return If the path exists.
+   */
+  protected abstract boolean exists(String path);
+
+  /**
+   * Make a directory.
+   *
+   * @param path Path of the directory to create.
+   * @return If the directory was created.
+   */
+  protected abstract boolean mkdir(String path);
+
+  /**
+   * Get root directory.
+   *
+   * @return Root directory.
+   */
+  protected abstract String getRootDir();
+
+  /**
+   * Set the driver as initialized.
+   *
+   * @param ini If the driver is initialized.
+   */
+  public void setInitialized(boolean ini) {
+    this.initialized = ini;
+  }
+
+  @Override
+  public boolean initDriver() {
+    String rootDir = getRootDir();
+    try {
+      if (rootDir == null) {
+        LOG.error("Invalid root directory, unable to initialize driver.");
+        return false;
+      }
+
+      // Check root path
+      if (!exists(rootDir)) {
+        if (!mkdir(rootDir)) {
+          LOG.error("Cannot create State Store root directory {}", rootDir);
+          return false;
+        }
+      }
+    } catch (Exception ex) {
+      LOG.error(
+          "Cannot initialize filesystem using root directory {}", rootDir, ex);
+      return false;
+    }
+    setInitialized(true);
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(
+      String className, Class<T> recordClass) {
+
+    String dataDirPath = getRootDir() + "/" + className;
+    try {
+      // Create data directories for files
+      if (!exists(dataDirPath)) {
+        LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
+        if (!mkdir(dataDirPath)) {
+          LOG.error("Cannot create data directory {}", dataDirPath);
+          return false;
+        }
+        String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
+        if (!exists(dataFilePath)) {
+          // Create empty file
+          List<T> emtpyList = new ArrayList<>();
+          if(!writeAll(emtpyList, recordClass)) {
+            LOG.error("Cannot create data file {}", dataFilePath);
+            return false;
+          }
+        }
+      }
+    } catch (Exception ex) {
+      LOG.error("Cannot create data directory {}", dataDirPath, ex);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Read all lines from a file and deserialize into the desired record type.
+   *
+   * @param reader Open handle for the file.
+   * @param recordClass Record class to create.
+   * @param includeDates True if dateModified/dateCreated are serialized.
+   * @return List of records.
+   * @throws IOException
+   */
+  private <T extends BaseRecord> List<T> getAllFile(
+      BufferedReader reader, Class<T> clazz, boolean includeDates)
+          throws IOException {
+
+    List<T> ret = new ArrayList<T>();
+    String line;
+    while ((line = reader.readLine()) != null) {
+      if (!line.startsWith("#") && line.length() > 0) {
+        try {
+          T record = newRecord(line, clazz, includeDates);
+          ret.add(record);
+        } catch (Exception ex) {
+          LOG.error("Cannot parse line in data source file: {}", line, ex);
+        }
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+      throws IOException {
+    return get(clazz, (String)null);
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
+      throws IOException {
+    verifyDriverReady();
+    BufferedReader reader = null;
+    lockRecordRead(clazz);
+    try {
+      reader = getReader(clazz, sub);
+      List<T> data = getAllFile(reader, clazz, true);
+      return new QueryResult<T>(data, getTime());
+    } catch (Exception ex) {
+      LOG.error("Cannot fetch records {}", clazz.getSimpleName());
+      throw new IOException("Cannot read from data store " + ex.getMessage());
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          LOG.error("Failed closing file", e);
+        }
+      }
+      unlockRecordRead(clazz);
+    }
+  }
+
+  /**
+   * Overwrite the existing data with a new data set.
+   *
+   * @param list List of records to write.
+   * @param writer BufferedWriter stream to write to.
+   * @return If the records were succesfully written.
+   */
+  private <T extends BaseRecord> boolean writeAllFile(
+      Collection<T> records, BufferedWriter writer) {
+
+    try {
+      for (BaseRecord record : records) {
+        try {
+          String data = serializeString(record);
+          writer.write(data);
+          writer.newLine();
+        } catch (IllegalArgumentException ex) {
+          LOG.error("Cannot write record {} to file", record, ex);
+        }
+      }
+      writer.flush();
+      return true;
+    } catch (IOException e) {
+      LOG.error("Cannot commit records to file", e);
+      return false;
+    }
+  }
+
+  /**
+   * Overwrite the existing data with a new data set. Replaces all records in
+   * the data store for this record class. If all records in the data store are
+   * not successfully committed, this function must return false and leave the
+   * data store unchanged.
+   *
+   * @param records List of records to write. All records must be of type
+   *                recordClass.
+   * @param recordClass Class of record to replace.
+   * @return true if all operations were successful, false otherwise.
+   * @throws StateStoreUnavailableException
+   */
+  public <T extends BaseRecord> boolean writeAll(
+      Collection<T> records, Class<T> recordClass)
+          throws StateStoreUnavailableException {
+    verifyDriverReady();
+    lockRecordWrite(recordClass);
+    BufferedWriter writer = null;
+    try {
+      writer = getWriter(recordClass, null);
+      return writeAllFile(records, writer);
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot add records to file for {}", recordClass.getSimpleName(), e);
+      return false;
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close();
+        } catch (IOException e) {
+          LOG.error(
+              "Cannot close writer for {}", recordClass.getSimpleName(), e);
+        }
+      }
+      unlockRecordWrite(recordClass);
+    }
+  }
+
+  /**
+   * Get the data file name.
+   *
+   * @return Data file name.
+   */
+  protected String getDataFileName() {
+    return DATA_FILE_NAME;
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    return this.initialized;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(
+      List<T> records, boolean allowUpdate, boolean errorIfExists)
+          throws StateStoreUnavailableException {
+    verifyDriverReady();
+
+    if (records.isEmpty()) {
+      return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
+    QueryResult<T> result;
+    try {
+      result = get(clazz);
+    } catch (IOException e) {
+      return false;
+    }
+    Map<Object, T> writeList = new HashMap<>();
+
+    // Write all of the existing records
+    for (T existingRecord : result.getRecords()) {
+      String key = existingRecord.getPrimaryKey();
+      writeList.put(key, existingRecord);
+    }
+
+    // Add inserts and updates, overwrite any existing values
+    for (T updatedRecord : records) {
+      try {
+        updatedRecord.validate();
+        String key = updatedRecord.getPrimaryKey();
+        if (writeList.containsKey(key) && allowUpdate) {
+          // Update
+          writeList.put(key, updatedRecord);
+          // Update the mod time stamp. Many backends will use their
+          // own timestamp for the mod time.
+          updatedRecord.setDateModified(this.getTime());
+        } else if (!writeList.containsKey(key)) {
+          // Insert
+          // Create/Mod timestamps are already initialized
+          writeList.put(key, updatedRecord);
+        } else if (errorIfExists) {
+          LOG.error("Attempt to insert record {} that already exists",
+              updatedRecord);
+          return false;
+        }
+      } catch (IllegalArgumentException ex) {
+        LOG.error("Cannot write invalid record to State Store", ex);
+        return false;
+      }
+    }
+
+    // Write all
+    boolean status = writeAll(writeList.values(), clazz);
+    return status;
+  }
+
+  @Override
+  public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
+      throws StateStoreUnavailableException {
+    verifyDriverReady();
+
+    if (query == null) {
+      return 0;
+    }
+
+    int removed = 0;
+    // Get the current records
+    try {
+      final QueryResult<T> result = get(clazz);
+      final List<T> existingRecords = result.getRecords();
+      // Write all of the existing records except those to be removed
+      final List<T> recordsToRemove = filterMultiple(query, existingRecords);
+      removed = recordsToRemove.size();
+      final List<T> newRecords = new LinkedList<>();
+      for (T record : existingRecords) {
+        if (!recordsToRemove.contains(record)) {
+          newRecords.add(record);
+        }
+      }
+      if (!writeAll(newRecords, clazz)) {
+        throw new IOException(
+            "Cannot remove record " + clazz + " query " + query);
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot remove records {} query {}", clazz, query, e);
+    }
+
+    return removed;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
+      throws StateStoreUnavailableException {
+    verifyDriverReady();
+    List<T> emptyList = new ArrayList<>();
+    boolean status = writeAll(emptyList, clazz);
+    return status;
+  }
+}

+ 161 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java

@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+/**
+ * StateStoreDriver implementation based on a local file.
+ */
+public class StateStoreFileImpl extends StateStoreFileBaseImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreFileImpl.class);
+
+  /** Configuration keys. */
+  public static final String FEDERATION_STORE_FILE_DIRECTORY =
+      DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory";
+
+  /** Synchronization. */
+  private static final ReadWriteLock READ_WRITE_LOCK =
+      new ReentrantReadWriteLock();
+
+  /** Root directory for the state store. */
+  private String rootDirectory;
+
+
+  @Override
+  protected boolean exists(String path) {
+    File test = new File(path);
+    return test.exists();
+  }
+
+  @Override
+  protected boolean mkdir(String path) {
+    File dir = new File(path);
+    return dir.mkdirs();
+  }
+
+  @Override
+  protected String getRootDir() {
+    if (this.rootDirectory == null) {
+      String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
+      if (dir == null) {
+        File tempDir = Files.createTempDir();
+        dir = tempDir.getAbsolutePath();
+      }
+      this.rootDirectory = dir;
+    }
+    return this.rootDirectory;
+  }
+
+  @Override
+  protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) {
+    // TODO - Synchronize via FS
+    READ_WRITE_LOCK.writeLock().lock();
+  }
+
+  @Override
+  protected <T extends BaseRecord> void unlockRecordWrite(
+      Class<T> recordClass) {
+    // TODO - Synchronize via FS
+    READ_WRITE_LOCK.writeLock().unlock();
+  }
+
+  @Override
+  protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) {
+    // TODO - Synchronize via FS
+    READ_WRITE_LOCK.readLock().lock();
+  }
+
+  @Override
+  protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) {
+    // TODO - Synchronize via FS
+    READ_WRITE_LOCK.readLock().unlock();
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedReader getReader(
+      Class<T> clazz, String sub) {
+    String filename = StateStoreUtils.getRecordName(clazz);
+    if (sub != null && sub.length() > 0) {
+      filename += "/" + sub;
+    }
+    filename += "/" + getDataFileName();
+
+    try {
+      LOG.debug("Loading file: {}", filename);
+      File file = new File(getRootDir(), filename);
+      FileInputStream fis = new FileInputStream(file);
+      InputStreamReader isr =
+          new InputStreamReader(fis, StandardCharsets.UTF_8);
+      BufferedReader reader = new BufferedReader(isr);
+      return reader;
+    } catch (Exception ex) {
+      LOG.error(
+          "Cannot open read stream for record {}", clazz.getSimpleName(), ex);
+      return null;
+    }
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedWriter getWriter(
+      Class<T> clazz, String sub) {
+    String filename = StateStoreUtils.getRecordName(clazz);
+    if (sub != null && sub.length() > 0) {
+      filename += "/" + sub;
+    }
+    filename += "/" + getDataFileName();
+
+    try {
+      File file = new File(getRootDir(), filename);
+      FileOutputStream fos = new FileOutputStream(file, false);
+      OutputStreamWriter osw =
+          new OutputStreamWriter(fos, StandardCharsets.UTF_8);
+      BufferedWriter writer = new BufferedWriter(osw);
+      return writer;
+    } catch (IOException ex) {
+      LOG.error(
+          "Cannot open read stream for record {}", clazz.getSimpleName(), ex);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    setInitialized(false);
+  }
+}

+ 178 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StateStoreDriver} implementation based on a filesystem. The most common uses
+ * HDFS as a backend.
+ */
+public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreFileSystemImpl.class);
+
+
+  /** Configuration keys. */
+  public static final String FEDERATION_STORE_FS_PATH =
+      DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path";
+
+  /** File system to back the State Store. */
+  private FileSystem fs;
+  /** Working path in the filesystem. */
+  private String workPath;
+
+  @Override
+  protected boolean exists(String path) {
+    try {
+      return fs.exists(new Path(path));
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
+  protected boolean mkdir(String path) {
+    try {
+      return fs.mkdirs(new Path(path));
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override
+  protected String getRootDir() {
+    if (this.workPath == null) {
+      String rootPath = getConf().get(FEDERATION_STORE_FS_PATH);
+      URI workUri;
+      try {
+        workUri = new URI(rootPath);
+        fs = FileSystem.get(workUri, getConf());
+      } catch (Exception ex) {
+        return null;
+      }
+      this.workPath = rootPath;
+    }
+    return this.workPath;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+  }
+
+  /**
+   * Get the folder path for the record class' data.
+   *
+   * @param cls Data record class.
+   * @return Path of the folder containing the record class' data files.
+   */
+  private Path getPathForClass(Class<? extends BaseRecord> clazz) {
+    if (clazz == null) {
+      return null;
+    }
+    // TODO extract table name from class: entry.getTableName()
+    String className = StateStoreUtils.getRecordName(clazz);
+    return new Path(workPath, className);
+  }
+
+  @Override
+  protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) {
+    // Not required, synced with HDFS leasing
+  }
+
+  @Override
+  protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) {
+    // Not required, synced with HDFS leasing
+  }
+
+  @Override
+  protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) {
+    // TODO -> wait for lease to be available
+  }
+
+  @Override
+  protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) {
+    // TODO -> ensure lease is closed for the file
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedReader getReader(
+      Class<T> clazz, String sub) {
+
+    Path path = getPathForClass(clazz);
+    if (sub != null && sub.length() > 0) {
+      path = Path.mergePaths(path, new Path("/" + sub));
+    }
+    path = Path.mergePaths(path, new Path("/" + getDataFileName()));
+
+    try {
+      FSDataInputStream fdis = fs.open(path);
+      InputStreamReader isr =
+          new InputStreamReader(fdis, StandardCharsets.UTF_8);
+      BufferedReader reader = new BufferedReader(isr);
+      return reader;
+    } catch (IOException ex) {
+      LOG.error("Cannot open write stream for {}  to {}",
+          clazz.getSimpleName(), path);
+      return null;
+    }
+  }
+
+  @Override
+  protected <T extends BaseRecord> BufferedWriter getWriter(
+      Class<T> clazz, String sub) {
+
+    Path path = getPathForClass(clazz);
+    if (sub != null && sub.length() > 0) {
+      path = Path.mergePaths(path, new Path("/" + sub));
+    }
+    path = Path.mergePaths(path, new Path("/" + getDataFileName()));
+
+    try {
+      FSDataOutputStream fdos = fs.create(path, true);
+      OutputStreamWriter osw =
+          new OutputStreamWriter(fdos, StandardCharsets.UTF_8);
+      BufferedWriter writer = new BufferedWriter(osw);
+      return writer;
+    } catch (IOException ex) {
+      LOG.error("Cannot open write stream for {} to {}",
+          clazz.getSimpleName(), path);
+      return null;
+    }
+  }
+}

+ 77 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+
+/**
+ * State Store driver that stores a serialization of the records. The serializer
+ * is pluggable.
+ */
+public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
+
+  /** Default serializer for this driver. */
+  private StateStoreSerializer serializer;
+
+
+  @Override
+  public boolean init(final Configuration config, final String id,
+      final Collection<Class<? extends BaseRecord>> records) {
+    boolean ret = super.init(config, id, records);
+
+    this.serializer = StateStoreSerializer.getSerializer(config);
+
+    return ret;
+  }
+
+  /**
+   * Serialize a record using the serializer.
+   * @param record Record to serialize.
+   * @return Byte array with the serialization of the record.
+   */
+  protected <T extends BaseRecord> byte[] serialize(T record) {
+    return serializer.serialize(record);
+  }
+
+  /**
+   * Serialize a record using the serializer.
+   * @param record Record to serialize.
+   * @return String with the serialization of the record.
+   */
+  protected <T extends BaseRecord> String serializeString(T record) {
+    return serializer.serializeString(record);
+  }
+
+  /**
+   * Creates a record from an input data string.
+   * @param data Serialized text of the record.
+   * @param clazz Record class.
+   * @param includeDates If dateModified and dateCreated are serialized.
+   * @return The created record.
+   * @throws IOException
+   */
+  protected <T extends BaseRecord> T newRecord(
+      String data, Class<T> clazz, boolean includeDates) throws IOException {
+    return serializer.deserialize(data, clazz);
+  }
+}

+ 19 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java

@@ -122,6 +122,24 @@ public abstract class BaseRecord implements Comparable<BaseRecord> {
     return builder.toString();
   }
 
+  /**
+   * Check if this record matches a partial record.
+   *
+   * @param other Partial record.
+   * @return If this record matches.
+   */
+  public boolean like(BaseRecord other) {
+    if (other == null) {
+      return false;
+    }
+    Map<String, String> thisKeys = this.getPrimaryKeys();
+    Map<String, String> otherKeys = other.getPrimaryKeys();
+    if (thisKeys == null) {
+      return otherKeys == null;
+    }
+    return thisKeys.equals(otherKeys);
+  }
+
   /**
    * Override equals check to use primary key(s) for comparison.
    */
@@ -186,4 +204,4 @@ public abstract class BaseRecord implements Comparable<BaseRecord> {
   public String toString() {
     return getPrimaryKey();
   }
-}
+}

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+/**
+ * Check if a record matches a query. The query is usually a partial record.
+ *
+ * @param <T> Type of the record to query.
+ */
+public class Query<T extends BaseRecord> {
+
+  /** Partial object to compare against. */
+  private final T partial;
+
+
+  /**
+   * Create a query to search for a partial record.
+   *
+   * @param partial It defines the attributes to search.
+   */
+  public Query(final T part) {
+    this.partial = part;
+  }
+
+  /**
+   * Get the partial record used to query.
+   *
+   * @return The partial record used for the query.
+   */
+  public T getPartial() {
+    return this.partial;
+  }
+
+  /**
+   * Check if a record matches the primary keys or the partial record.
+   *
+   * @param other Record to check.
+   * @return If the record matches. Don't match if there is no partial.
+   */
+  public boolean matches(T other) {
+    if (this.partial == null) {
+      return false;
+    }
+    return this.partial.like(other);
+  }
+
+  @Override
+  public String toString() {
+    return "Checking: " + this.partial;
+  }
+}

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -4674,4 +4674,20 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.store.driver.class</name>
+    <value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl</value>
+    <description>
+      Class to implement the State Store. By default it uses the local disk.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.store.connection.test</name>
+    <value>60000</value>
+    <description>
+      How often to check for the connection to the State Store in milliseconds.
+    </description>
+  </property>
+
 </configuration>

+ 232 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java

@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS;
+import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Utilities to test the State Store.
+ */
+public final class FederationStateStoreTestUtils {
+
+  private FederationStateStoreTestUtils() {
+    // Utility Class
+  }
+
+  /**
+   * Get the default State Store driver implementation.
+   *
+   * @return Class of the default State Store driver implementation.
+   */
+  public static Class<? extends StateStoreDriver> getDefaultDriver() {
+    return DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT;
+  }
+
+  /**
+   * Create a default State Store configuration.
+   *
+   * @return State Store configuration.
+   */
+  public static Configuration getStateStoreConfiguration() {
+    Class<? extends StateStoreDriver> clazz = getDefaultDriver();
+    return getStateStoreConfiguration(clazz);
+  }
+
+  /**
+   * Create a new State Store configuration for a particular driver.
+   *
+   * @param clazz Class of the driver to create.
+   * @return State Store configuration.
+   */
+  public static Configuration getStateStoreConfiguration(
+      Class<? extends StateStoreDriver> clazz) {
+    Configuration conf = new HdfsConfiguration(false);
+
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test");
+
+    conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);
+
+    if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) {
+      setFileConfiguration(conf);
+    }
+    return conf;
+  }
+
+  /**
+   * Create a new State Store based on a configuration.
+   *
+   * @param configuration Configuration for the State Store.
+   * @return New State Store service.
+   * @throws IOException If it cannot create the State Store.
+   * @throws InterruptedException If we cannot wait for the store to start.
+   */
+  public static StateStoreService getStateStore(
+      Configuration configuration) throws IOException, InterruptedException {
+
+    StateStoreService stateStore = new StateStoreService();
+    assertNotNull(stateStore);
+
+    // Set unique identifier, this is normally the router address
+    String identifier = UUID.randomUUID().toString();
+    stateStore.setIdentifier(identifier);
+
+    stateStore.init(configuration);
+    stateStore.start();
+
+    // Wait for state store to connect
+    waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10));
+
+    return stateStore;
+  }
+
+  /**
+   * Wait for the State Store to initialize its driver.
+   *
+   * @param stateStore State Store.
+   * @param timeoutMs Time out in milliseconds.
+   * @throws IOException If the State Store cannot be reached.
+   * @throws InterruptedException If the sleep is interrupted.
+   */
+  public static void waitStateStore(StateStoreService stateStore,
+      long timeoutMs) throws IOException, InterruptedException {
+    long startingTime = Time.monotonicNow();
+    while (!stateStore.isDriverReady()) {
+      Thread.sleep(100);
+      if (Time.monotonicNow() - startingTime > timeoutMs) {
+        throw new IOException("Timeout waiting for State Store to connect");
+      }
+    }
+  }
+
+  /**
+   * Delete the default State Store.
+   *
+   * @throws IOException
+   */
+  public static void deleteStateStore() throws IOException {
+    Class<? extends StateStoreDriver> driverClass = getDefaultDriver();
+    deleteStateStore(driverClass);
+  }
+
+  /**
+   * Delete the State Store.
+   * @param driverClass Class of the State Store driver implementation.
+   * @throws IOException If it cannot be deleted.
+   */
+  public static void deleteStateStore(
+      Class<? extends StateStoreDriver> driverClass) throws IOException {
+
+    if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) {
+      String workingDirectory = System.getProperty("user.dir");
+      File dir = new File(workingDirectory + "/statestore");
+      if (dir.exists()) {
+        FileUtils.cleanDirectory(dir);
+      }
+    }
+  }
+
+  /**
+   * Set the default configuration for drivers based on files.
+   *
+   * @param conf Configuration to extend.
+   */
+  public static void setFileConfiguration(Configuration conf) {
+    String workingPath = System.getProperty("user.dir");
+    String stateStorePath = workingPath + "/statestore";
+    conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
+  }
+
+  /**
+   * Clear all the records from the State Store.
+   *
+   * @param store State Store to remove records from.
+   * @return If the State Store was cleared.
+   * @throws IOException If it cannot clear the State Store.
+   */
+  public static boolean clearAllRecords(StateStoreService store)
+      throws IOException {
+    Collection<Class<? extends BaseRecord>> allRecords =
+        store.getSupportedRecords();
+    for (Class<? extends BaseRecord> recordType : allRecords) {
+      if (!clearRecords(store, recordType)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Clear records from a certain type from the State Store.
+   *
+   * @param store State Store to remove records from.
+   * @param recordClass Class of the records to remove.
+   * @return If the State Store was cleared.
+   * @throws IOException If it cannot clear the State Store.
+   */
+  public static <T extends BaseRecord> boolean clearRecords(
+      StateStoreService store, Class<T> recordClass) throws IOException {
+    List<T> emptyList = new ArrayList<>();
+    if (!synchronizeRecords(store, emptyList, recordClass)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Synchronize a set of records. Remove all and keep the ones specified.
+   *
+   * @param stateStore State Store service managing the driver.
+   * @param records Records to add.
+   * @param clazz Class of the record to synchronize.
+   * @return If the synchronization succeeded.
+   * @throws IOException If it cannot connect to the State Store.
+   */
+  public static <T extends BaseRecord> boolean synchronizeRecords(
+      StateStoreService stateStore, List<T> records, Class<T> clazz)
+          throws IOException {
+    StateStoreDriver driver = stateStore.getDriver();
+    driver.verifyDriverReady();
+    if (driver.removeAll(clazz)) {
+      if (driver.putAll(records, true, false)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

+ 483 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java

@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base tests for the driver. The particular implementations will use this to
+ * test their functionality.
+ */
+public class TestStateStoreDriverBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStateStoreDriverBase.class);
+
+  private static StateStoreService stateStore;
+  private static Configuration conf;
+
+
+  /**
+   * Get the State Store driver.
+   * @return State Store driver.
+   */
+  protected StateStoreDriver getStateStoreDriver() {
+    return stateStore.getDriver();
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    if (stateStore != null) {
+      stateStore.stop();
+    }
+  }
+
+  /**
+   * Get a new State Store using this configuration.
+   *
+   * @param config Configuration for the State Store.
+   * @throws Exception If we cannot get the State Store.
+   */
+  public static void getStateStore(Configuration config) throws Exception {
+    conf = config;
+    stateStore = FederationStateStoreTestUtils.getStateStore(conf);
+  }
+
+  private <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass)
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+
+    // TODO add record
+    return null;
+  }
+
+  /**
+   * Validate if a record is the same.
+   *
+   * @param original
+   * @param committed
+   * @param assertEquals Assert if the records are equal or just return.
+   * @return
+   * @throws IllegalArgumentException
+   * @throws IllegalAccessException
+   */
+  private boolean validateRecord(
+      BaseRecord original, BaseRecord committed, boolean assertEquals)
+          throws IllegalArgumentException, IllegalAccessException {
+
+    boolean ret = true;
+
+    Map<String, Class<?>> fields = getFields(original);
+    for (String key : fields.keySet()) {
+      if (key.equals("dateModified") ||
+          key.equals("dateCreated") ||
+          key.equals("proto")) {
+        // Fields are updated/set on commit and fetch and may not match
+        // the fields that are initialized in a non-committed object.
+        continue;
+      }
+      Object data1 = getField(original, key);
+      Object data2 = getField(committed, key);
+      if (assertEquals) {
+        assertEquals("Field " + key + " does not match", data1, data2);
+      } else if (!data1.equals(data2)) {
+        ret = false;
+      }
+    }
+
+    long now = stateStore.getDriver().getTime();
+    assertTrue(
+        committed.getDateCreated() <= now && committed.getDateCreated() > 0);
+    assertTrue(committed.getDateModified() >= committed.getDateCreated());
+
+    return ret;
+  }
+
+  public static void removeAll(StateStoreDriver driver) throws IOException {
+    // TODO add records to remove
+  }
+
+  public <T extends BaseRecord> void testInsert(
+      StateStoreDriver driver, Class<T> recordClass)
+          throws IllegalArgumentException, IllegalAccessException, IOException {
+
+    assertTrue(driver.removeAll(recordClass));
+    QueryResult<T> records = driver.get(recordClass);
+    assertTrue(records.getRecords().isEmpty());
+
+    // Insert single
+    BaseRecord record = generateFakeRecord(recordClass);
+    driver.put(record, true, false);
+
+    // Verify
+    records = driver.get(recordClass);
+    assertEquals(1, records.getRecords().size());
+    validateRecord(record, records.getRecords().get(0), true);
+
+    // Insert multiple
+    List<T> insertList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      T newRecord = generateFakeRecord(recordClass);
+      insertList.add(newRecord);
+    }
+    driver.putAll(insertList, true, false);
+
+    // Verify
+    records = driver.get(recordClass);
+    assertEquals(11, records.getRecords().size());
+  }
+
+  public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
+      Class<T> clazz) throws IllegalAccessException, IOException {
+
+    // Fetch empty list
+    driver.removeAll(clazz);
+    QueryResult<T> result0 = driver.get(clazz);
+    assertNotNull(result0);
+    List<T> records0 = result0.getRecords();
+    assertEquals(records0.size(), 0);
+
+    // Insert single
+    BaseRecord record = generateFakeRecord(clazz);
+    assertTrue(driver.put(record, true, false));
+
+    // Verify
+    QueryResult<T> result1 = driver.get(clazz);
+    List<T> records1 = result1.getRecords();
+    assertEquals(1, records1.size());
+    validateRecord(record, records1.get(0), true);
+
+    // Test fetch single object with a bad query
+    final T fakeRecord = generateFakeRecord(clazz);
+    final Query<T> query = new Query<T>(fakeRecord);
+    T getRecord = driver.get(clazz, query);
+    assertNull(getRecord);
+
+    // Test fetch multiple objects does not exist returns empty list
+    assertEquals(driver.getMultiple(clazz, query).size(), 0);
+  }
+
+  public <T extends BaseRecord> void testPut(
+      StateStoreDriver driver, Class<T> clazz)
+          throws IllegalArgumentException, ReflectiveOperationException,
+          IOException, SecurityException {
+
+    driver.removeAll(clazz);
+    QueryResult<T> records = driver.get(clazz);
+    assertTrue(records.getRecords().isEmpty());
+
+    // Insert multiple
+    List<T> insertList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      T newRecord = generateFakeRecord(clazz);
+      insertList.add(newRecord);
+    }
+
+    // Verify
+    assertTrue(driver.putAll(insertList, false, true));
+    records = driver.get(clazz);
+    assertEquals(records.getRecords().size(), 10);
+
+    // Generate a new record with the same PK fields as an existing record
+    BaseRecord updatedRecord = generateFakeRecord(clazz);
+    BaseRecord existingRecord = records.getRecords().get(0);
+    Map<String, String> primaryKeys = existingRecord.getPrimaryKeys();
+    for (Entry<String, String> entry : primaryKeys.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      Class<?> fieldType = getFieldType(existingRecord, key);
+      Object field = fromString(value, fieldType);
+      assertTrue(setField(updatedRecord, key, field));
+    }
+
+    // Attempt an update of an existing entry, but it is not allowed.
+    assertFalse(driver.put(updatedRecord, false, true));
+
+    // Verify no update occurred, all original records are unchanged
+    QueryResult<T> newRecords = driver.get(clazz);
+    assertTrue(newRecords.getRecords().size() == 10);
+    assertEquals("A single entry was improperly updated in the store", 10,
+        countMatchingEntries(records.getRecords(), newRecords.getRecords()));
+
+    // Update the entry (allowing updates)
+    assertTrue(driver.put(updatedRecord, true, false));
+
+    // Verify that one entry no longer matches the original set
+    newRecords = driver.get(clazz);
+    assertEquals(10, newRecords.getRecords().size());
+    assertEquals(
+        "Record of type " + clazz + " not updated in the store", 9,
+        countMatchingEntries(records.getRecords(), newRecords.getRecords()));
+  }
+
+  private int countMatchingEntries(
+      Collection<? extends BaseRecord> committedList,
+      Collection<? extends BaseRecord> matchList) {
+
+    int matchingCount = 0;
+    for (BaseRecord committed : committedList) {
+      for (BaseRecord match : matchList) {
+        try {
+          if (match.getPrimaryKey().equals(committed.getPrimaryKey())) {
+            if (validateRecord(match, committed, false)) {
+              matchingCount++;
+            }
+            break;
+          }
+        } catch (Exception ex) {
+        }
+      }
+    }
+    return matchingCount;
+  }
+
+  public <T extends BaseRecord> void testRemove(
+      StateStoreDriver driver, Class<T> clazz)
+          throws IllegalArgumentException, IllegalAccessException, IOException {
+
+    // Remove all
+    assertTrue(driver.removeAll(clazz));
+    QueryResult<T> records = driver.get(clazz);
+    assertTrue(records.getRecords().isEmpty());
+
+    // Insert multiple
+    List<T> insertList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      T newRecord = generateFakeRecord(clazz);
+      insertList.add(newRecord);
+    }
+
+    // Verify
+    assertTrue(driver.putAll(insertList, false, true));
+    records = driver.get(clazz);
+    assertEquals(records.getRecords().size(), 10);
+
+    // Remove Single
+    assertTrue(driver.remove(records.getRecords().get(0)));
+
+    // Verify
+    records = driver.get(clazz);
+    assertEquals(records.getRecords().size(), 9);
+
+    // Remove with filter
+    final T firstRecord = records.getRecords().get(0);
+    final Query<T> query0 = new Query<T>(firstRecord);
+    assertTrue(driver.remove(clazz, query0) > 0);
+
+    final T secondRecord = records.getRecords().get(1);
+    final Query<T> query1 = new Query<T>(secondRecord);
+    assertTrue(driver.remove(clazz, query1) > 0);
+
+    // Verify
+    records = driver.get(clazz);
+    assertEquals(records.getRecords().size(), 7);
+
+    // Remove all
+    assertTrue(driver.removeAll(clazz));
+
+    // Verify
+    records = driver.get(clazz);
+    assertTrue(records.getRecords().isEmpty());
+  }
+
+  public void testInsert(StateStoreDriver driver)
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    // TODO add records
+  }
+
+  public void testPut(StateStoreDriver driver)
+      throws IllegalArgumentException, ReflectiveOperationException,
+      IOException, SecurityException {
+    // TODO add records
+  }
+
+  public void testRemove(StateStoreDriver driver)
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    // TODO add records
+  }
+
+  public void testFetchErrors(StateStoreDriver driver)
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    // TODO add records
+  }
+
+  /**
+   * Sets the value of a field on the object.
+   *
+   * @param fieldName The string name of the field.
+   * @param data The data to pass to the field's setter.
+   *
+   * @return True if successful, fails if failed.
+   */
+  private static boolean setField(
+      BaseRecord record, String fieldName, Object data) {
+
+    Method m = locateSetter(record, fieldName);
+    if (m != null) {
+      try {
+        m.invoke(record, data);
+      } catch (Exception e) {
+        LOG.error("Cannot set field " + fieldName + " on object "
+            + record.getClass().getName() + " to data " + data + " of type "
+            + data.getClass(), e);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Finds the appropriate setter for a field name.
+   *
+   * @param fieldName The legacy name of the field.
+   * @return The matching setter or null if not found.
+   */
+  private static Method locateSetter(BaseRecord record, String fieldName) {
+    for (Method m : record.getClass().getMethods()) {
+      if (m.getName().equalsIgnoreCase("set" + fieldName)) {
+        return m;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns all serializable fields in the object.
+   *
+   * @return Map with the fields.
+   */
+  private static Map<String, Class<?>> getFields(BaseRecord record) {
+    Map<String, Class<?>> getters = new HashMap<>();
+    for (Method m : record.getClass().getDeclaredMethods()) {
+      if (m.getName().startsWith("get")) {
+        try {
+          Class<?> type = m.getReturnType();
+          char[] c = m.getName().substring(3).toCharArray();
+          c[0] = Character.toLowerCase(c[0]);
+          String key = new String(c);
+          getters.put(key, type);
+        } catch (Exception e) {
+          LOG.error("Cannot execute getter " + m.getName()
+              + " on object " + record);
+        }
+      }
+    }
+    return getters;
+  }
+
+  /**
+   * Get the type of a field.
+   *
+   * @param fieldName
+   * @return Field type
+   */
+  private static Class<?> getFieldType(BaseRecord record, String fieldName) {
+    Method m = locateGetter(record, fieldName);
+    return m.getReturnType();
+  }
+
+  /**
+   * Fetches the value for a field name.
+   *
+   * @param fieldName the legacy name of the field.
+   * @return The field data or null if not found.
+   */
+  private static Object getField(BaseRecord record, String fieldName) {
+    Object result = null;
+    Method m = locateGetter(record, fieldName);
+    if (m != null) {
+      try {
+        result = m.invoke(record);
+      } catch (Exception e) {
+        LOG.error("Cannot get field " + fieldName + " on object " + record);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Finds the appropriate getter for a field name.
+   *
+   * @param fieldName The legacy name of the field.
+   * @return The matching getter or null if not found.
+   */
+  private static Method locateGetter(BaseRecord record, String fieldName) {
+    for (Method m : record.getClass().getMethods()) {
+      if (m.getName().equalsIgnoreCase("get" + fieldName)) {
+        return m;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Expands a data object from the store into an record object. Default store
+   * data type is a String. Override if additional serialization is required.
+   *
+   * @param data Object containing the serialized data. Only string is
+   *          supported.
+   * @param clazz Target object class to hold the deserialized data.
+   * @return An instance of the target data object initialized with the
+   *         deserialized data.
+   */
+  @Deprecated
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private static <T> T fromString(String data, Class<T> clazz) {
+
+    if (data.equals("null")) {
+      return null;
+    } else if (clazz == String.class) {
+      return (T) data;
+    } else if (clazz == Long.class || clazz == long.class) {
+      return (T) Long.valueOf(data);
+    } else if (clazz == Integer.class || clazz == int.class) {
+      return (T) Integer.valueOf(data);
+    } else if (clazz == Double.class || clazz == double.class) {
+      return (T) Double.valueOf(data);
+    } else if (clazz == Float.class || clazz == float.class) {
+      return (T) Float.valueOf(data);
+    } else if (clazz == Boolean.class || clazz == boolean.class) {
+      return (T) Boolean.valueOf(data);
+    } else if (clazz.isEnum()) {
+      return (T) Enum.valueOf((Class<Enum>) clazz, data);
+    }
+    return null;
+  }
+}

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
+ */
+public class TestStateStoreFile extends TestStateStoreDriverBase {
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    Configuration conf = getStateStoreConfiguration(StateStoreFileImpl.class);
+    getStateStore(conf);
+  }
+
+  @Before
+  public void startup() throws IOException {
+    removeAll(getStateStoreDriver());
+  }
+
+  @Test
+  public void testInsert()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testInsert(getStateStoreDriver());
+  }
+
+  @Test
+  public void testUpdate()
+      throws IllegalArgumentException, ReflectiveOperationException,
+      IOException, SecurityException {
+    testPut(getStateStoreDriver());
+  }
+
+  @Test
+  public void testDelete()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testRemove(getStateStoreDriver());
+  }
+}

+ 88 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
+ */
+public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
+
+  private static MiniDFSCluster dfsCluster;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    Configuration conf = FederationStateStoreTestUtils
+        .getStateStoreConfiguration(StateStoreFileSystemImpl.class);
+    conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH,
+        "/hdfs-federation/");
+
+    // Create HDFS cluster to back the state tore
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    builder.numDataNodes(1);
+    dfsCluster = builder.build();
+    dfsCluster.waitClusterUp();
+    getStateStore(conf);
+  }
+
+  @AfterClass
+  public static void tearDownCluster() {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  @Before
+  public void startup() throws IOException {
+    removeAll(getStateStoreDriver());
+  }
+
+  @Test
+  public void testInsert()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testInsert(getStateStoreDriver());
+  }
+
+  @Test
+  public void testUpdate()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testInsert(getStateStoreDriver());
+  }
+
+  @Test
+  public void testDelete()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testInsert(getStateStoreDriver());
+  }
+
+  @Test
+  public void testFetchErrors()
+      throws IllegalArgumentException, IllegalAccessException, IOException {
+    testFetchErrors(getStateStoreDriver());
+  }
+}