Browse Source

HDFS-14234. Limit WebHDFS to specifc user, host, directory triples.
Contributed by Clay B.

Anu Engineer 5 years ago
parent
commit
101d5b5f86

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -154,7 +154,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       600000;
 
   public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
-    "dfs.namenode.path.based.cache.block.map.allocation.percent";
+      "dfs.namenode.path.based.cache.block.map.allocation.percent";
   public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
 
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT =
@@ -185,7 +185,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT to populate
   // needed replication queues before exiting safe mode
   public static final String  DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY =
-    "dfs.namenode.replqueue.threshold-pct";
+      "dfs.namenode.replqueue.threshold-pct";
   public static final String  DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
   public static final int     DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY =
@@ -786,6 +786,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_HTTP_POLICY_KEY = "dfs.http.policy";
   public static final String  DFS_HTTP_POLICY_DEFAULT =  HttpConfig.Policy.HTTP_ONLY.name();
+  public static final String  DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
+  public static final String  DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS_DEFAULT = "org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler";
   public static final String  DFS_DEFAULT_CHUNK_VIEW_SIZE_KEY = "dfs.default.chunk.view.size";
   public static final int     DFS_DEFAULT_CHUNK_VIEW_SIZE_DEFAULT = 32*1024;
   public static final String  DFS_DATANODE_HTTPS_ADDRESS_KEY = "dfs.datanode.https.address";
@@ -1192,7 +1194,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   // Slow io warning log threshold settings for dfsclient and datanode.
   public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
-    "dfs.datanode.slow.io.warning.threshold.ms";
+      "dfs.datanode.slow.io.warning.threshold.ms";
   public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
 
   // Number of parallel threads to load multiple datanode volumes
@@ -1224,15 +1226,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // comma separated list of nntop reporting periods in minutes
   public static final String NNTOP_WINDOWS_MINUTES_KEY =
       "dfs.namenode.top.windows.minutes";
-  public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
+  public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1", "5", "25"};
   public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
   public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
 
   // Key Provider Cache Expiry
   public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED =
-    "dfs.datanode.block-pinning.enabled";
+      "dfs.datanode.block-pinning.enabled";
   public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
-    false;
+      false;
 
   public static final String
       DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY =

+ 443 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HostRestrictingAuthorizationFilter.java

@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.net.util.SubnetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An HTTP filter that can filter requests based on Hosts.
+ */
+public class HostRestrictingAuthorizationFilter implements Filter {
+  public static final String HDFS_CONFIG_PREFIX = "dfs.web.authentication.";
+  public static final String RESTRICTION_CONFIG = "host.allow.rules";
+  // A Java Predicate for query string parameters on which to filter requests
+  public static final Predicate<String> RESTRICTED_OPERATIONS =
+      qStr -> (qStr.trim().equalsIgnoreCase("op=OPEN") ||
+      qStr.trim().equalsIgnoreCase("op=GETDELEGATIONTOKEN"));
+  private final Map<String, CopyOnWriteArrayList<Rule>> rulemap =
+      new ConcurrentHashMap<>();
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HostRestrictingAuthorizationFilter.class);
+
+  /*
+   * Constructs a mapping of configuration properties to be used for filter
+   * initialization.  The mapping includes all properties that start with the
+   * specified configuration prefix.  Property names in the mapping are trimmed
+   * to remove the configuration prefix.
+   *
+   * @param conf configuration to read
+   * @param confPrefix configuration prefix
+   * @return mapping of configuration properties to be used for filter
+   *     initialization
+   */
+  public static Map<String, String> getFilterParams(Configuration conf,
+      String confPrefix) {
+    return conf.getPropsWithPrefix(confPrefix);
+  }
+
+  /*
+   * Check all rules for this user to see if one matches for this host/path pair
+   *
+   * @param: user - user to check rules for
+   * @param: host - IP address (e.g. "192.168.0.1")
+   * @param: path - file path with no scheme (e.g. /path/foo)
+   * @returns: true if a rule matches this user, host, path tuple false if an
+   * error occurs or no match
+   */
+  private boolean matchRule(String user, String remoteIp, String path) {
+    // allow lookups for blank in the rules for user and path
+    user = (user != null ? user : "");
+    path = (path != null ? path : "");
+
+    LOG.trace("Got user: {}, remoteIp: {}, path: {}", user, remoteIp, path);
+
+    // isInRange fails for null/blank IPs, require an IP to approve
+    if (remoteIp == null) {
+      LOG.trace("Returned false due to null rempteIp");
+      return false;
+    }
+
+    List<Rule> userRules = ((userRules = rulemap.get(user)) != null) ?
+        userRules : new ArrayList<Rule>();
+    List<Rule> anyRules = ((anyRules = rulemap.get("*")) != null) ?
+        anyRules : new ArrayList<Rule>();
+
+    List<Rule> rules = Stream.of(userRules, anyRules)
+        .flatMap(l -> l.stream()).collect(Collectors.toList());
+
+    for (Rule rule : rules) {
+      SubnetUtils.SubnetInfo subnet = rule.getSubnet();
+      String rulePath = rule.getPath();
+      LOG.trace("Evaluating rule, subnet: {}, path: {}",
+          subnet != null ? subnet.getCidrSignature() : "*", rulePath);
+      try {
+        if ((subnet == null || subnet.isInRange(remoteIp))
+            && FilenameUtils.directoryContains(rulePath, path)) {
+          LOG.debug("Found matching rule, subnet: {}, path: {}; returned true",
+              rule.getSubnet() != null ? subnet.getCidrSignature() : null,
+              rulePath);
+          return true;
+        }
+      } catch (IOException e) {
+        LOG.warn("Got IOException {}; returned false", e);
+        return false;
+      }
+    }
+
+    LOG.trace("Found no rules for user");
+    return false;
+  }
+
+  @Override
+  public void destroy() {
+  }
+
+  @Override
+  public void init(FilterConfig config) throws ServletException {
+    // Process dropbox rules
+    String dropboxRules = config.getInitParameter(RESTRICTION_CONFIG);
+    loadRuleMap(dropboxRules);
+  }
+
+  /*
+   * Initializes the rule map state for the filter
+   *
+   * @param ruleString - a string of newline delineated, comma separated
+   * three field records
+   * @throws IllegalArgumentException - when a rule can not be properly parsed
+   * Postconditions:
+   * <ul>
+   * <li>The {@rulemap} hash will be populated with all parsed rules.</li>
+   * </ul>
+   */
+  private void loadRuleMap(String ruleString) throws IllegalArgumentException {
+    if (ruleString == null || ruleString.equals("")) {
+      LOG.debug("Got no rules - will disallow anyone access");
+    } else {
+      // value: user1,network/bits1,path_glob1|user2,network/bits2,path_glob2...
+      Pattern comma_split = Pattern.compile(",");
+      Pattern rule_split = Pattern.compile("\\||\n");
+      // split all rule lines
+      Map<Integer, List<String[]>> splits = rule_split.splitAsStream(ruleString)
+          .map(x -> comma_split.split(x, 3))
+          .collect(Collectors.groupingBy(x -> x.length));
+      // verify all rules have three parts
+      if (!splits.keySet().equals(Collections.singleton(3))) {
+        // instead of re-joining parts, re-materialize lines which do not split
+        // correctly for the exception
+        String bad_lines = rule_split.splitAsStream(ruleString)
+            .filter(x -> comma_split.split(x, 3).length != 3)
+            .collect(Collectors.joining("\n"));
+        throw new IllegalArgumentException("Bad rule definition: " + bad_lines);
+      }
+      // create a list of Rules
+      int user = 0;
+      int cidr = 1;
+      int path = 2;
+      BiFunction<CopyOnWriteArrayList<Rule>, CopyOnWriteArrayList<Rule>,
+          CopyOnWriteArrayList<Rule>> arrayListMerge = (v1, v2) -> {
+        v1.addAll(v2);
+        return v1;
+      };
+      for (String[] split : splits.get(3)) {
+        LOG.debug("Loaded rule: user: {}, network/bits: {} path: {}",
+            split[user], split[cidr], split[path]);
+        Rule rule = (split[cidr].trim().equals("*") ? new Rule(null,
+            split[path]) : new Rule(new SubnetUtils(split[cidr]).getInfo(),
+            split[path]));
+        // Rule map is {"user": [rule1, rule2, ...]}, update the user's array
+        CopyOnWriteArrayList<Rule> arrayListRule =
+            new CopyOnWriteArrayList<Rule>() {
+          {
+            add(rule);
+          }
+        };
+        rulemap.merge(split[user], arrayListRule, arrayListMerge);
+      }
+    }
+  }
+
+  /*
+   * doFilter() is a shim to create an HttpInteraction object and pass that to
+   * the actual processing logic
+   */
+  @Override
+  public void doFilter(ServletRequest request, ServletResponse response,
+      FilterChain filterChain)
+      throws IOException, ServletException {
+    final HttpServletRequest httpRequest = (HttpServletRequest) request;
+    HttpServletResponse httpResponse = (HttpServletResponse) response;
+
+    handleInteraction(new ServletFilterHttpInteraction(httpRequest,
+        httpResponse, filterChain));
+  }
+
+  /*
+   * The actual processing logic of the Filter
+   * Uses our {@HttpInteraction} shim which can be called from a variety of
+   * incoming request sources
+   * @param interaction - An HttpInteraction object from any of our callers
+   */
+  public void handleInteraction(HttpInteraction interaction)
+      throws IOException, ServletException {
+    final String address = interaction.getRemoteAddr();
+    final String query = interaction.getQueryString();
+    final String path =
+        interaction.getRequestURI()
+            .substring(WebHdfsFileSystem.PATH_PREFIX.length());
+    String user = interaction.getRemoteUser();
+
+    LOG.trace("Got request user: {}, remoteIp: {}, query: {}, path: {}",
+        user, address, query, path);
+    boolean authenticatedQuery =
+        Arrays.stream(Optional.ofNullable(query).orElse("")
+            .trim()
+            .split("&"))
+            .anyMatch(RESTRICTED_OPERATIONS);
+    if (!interaction.isCommitted() && authenticatedQuery) {
+      // loop over all query parts
+      String[] queryParts = query.split("&");
+
+      if (user == null) {
+        LOG.trace("Looking for delegation token to identify user");
+        for (String part : queryParts) {
+          if (part.trim().startsWith("delegation=")) {
+            Token t = new Token();
+            t.decodeFromUrlString(part.split("=", 2)[1]);
+            ByteArrayInputStream buf =
+                new ByteArrayInputStream(t.getIdentifier());
+            DelegationTokenIdentifier identifier =
+                new DelegationTokenIdentifier();
+            identifier.readFields(new DataInputStream(buf));
+            user = identifier.getUser().getUserName();
+            LOG.trace("Updated request user: {}, remoteIp: {}, query: {}, " +
+                "path: {}", user, address, query, path);
+          }
+        }
+      }
+
+      if (authenticatedQuery && !(matchRule("*", address,
+          path) || matchRule(user, address, path))) {
+        LOG.trace("Rejecting interaction; no rule found");
+        interaction.sendError(HttpServletResponse.SC_FORBIDDEN,
+            "WebHDFS is configured write-only for " + user + "@" + address +
+                " for file: " + path);
+        return;
+      }
+    }
+
+    LOG.trace("Proceeding with interaction");
+    interaction.proceed();
+  }
+
+  /*
+   * Defines the minimal API requirements for the filter to execute its
+   * filtering logic.  This interface exists to facilitate integration in
+   * components that do not run within a servlet container and therefore cannot
+   * rely on a servlet container to dispatch to the {@link #doFilter} method.
+   * Applications that do run inside a servlet container will not need to write
+   * code that uses this interface.  Instead, they can use typical servlet
+   * container configuration mechanisms to insert the filter.
+   */
+  public interface HttpInteraction {
+
+    /*
+     * Returns if the request has been committed.
+     *
+     * @return boolean
+     */
+    boolean isCommitted();
+
+    /*
+     * Returns the value of the requesting client address.
+     *
+     * @return the remote address
+     */
+    String getRemoteAddr();
+
+    /*
+     * Returns the user ID making the request.
+     *
+     * @return the user
+     */
+    String getRemoteUser();
+
+    /*
+     * Returns the value of the request URI.
+     *
+     * @return the request URI
+     */
+    String getRequestURI();
+
+    /*
+     * Returns the value of the query string.
+     *
+     * @return an optional contianing the URL query string
+     */
+    String getQueryString();
+
+    /*
+     * Returns the method.
+     *
+     * @return method
+     */
+    String getMethod();
+
+    /*
+     * Called by the filter after it decides that the request may proceed.
+     *
+     * @throws IOException if there is an I/O error
+     * @throws ServletException if the implementation relies on the servlet API
+     *     and a servlet API call has failed
+     */
+    void proceed() throws IOException, ServletException;
+
+    /*
+     * Called by the filter after it decides that the request is an
+     * unauthorized request and therefore must be rejected.
+     *
+     * @param code status code to send
+     * @param message response message
+     * @throws IOException if there is an I/O error
+     */
+    void sendError(int code, String message) throws IOException;
+  }
+
+  private static class Rule {
+    private final SubnetUtils.SubnetInfo subnet;
+    private final String path;
+
+    /*
+     * A class for holding dropbox filter rules
+     *
+     * @param subnet - the IPv4 subnet for which this rule is valid (pass
+     * null for any network location)
+     * @param path - the HDFS path for which this rule is valid
+     */
+    Rule(SubnetUtils.SubnetInfo subnet, String path) {
+      this.subnet = subnet;
+      this.path = path;
+    }
+
+    public SubnetUtils.SubnetInfo getSubnet() {
+      return (subnet);
+    }
+
+    public String getPath() {
+      return (path);
+    }
+  }
+
+  /*
+   * {@link HttpInteraction} implementation for use in the servlet filter.
+   */
+  private static final class ServletFilterHttpInteraction
+      implements HttpInteraction {
+
+    private final FilterChain chain;
+    private final HttpServletRequest httpRequest;
+    private final HttpServletResponse httpResponse;
+
+    /*
+     * Creates a new ServletFilterHttpInteraction.
+     *
+     * @param httpRequest request to process
+     * @param httpResponse response to process
+     * @param chain filter chain to forward to if HTTP interaction is allowed
+     */
+    public ServletFilterHttpInteraction(HttpServletRequest httpRequest,
+        HttpServletResponse httpResponse, FilterChain chain) {
+      this.httpRequest = httpRequest;
+      this.httpResponse = httpResponse;
+      this.chain = chain;
+    }
+
+    @Override
+    public boolean isCommitted() {
+      return (httpResponse.isCommitted());
+    }
+
+    @Override
+    public String getRemoteAddr() {
+      return (httpRequest.getRemoteAddr());
+    }
+
+    @Override
+    public String getRemoteUser() {
+      return (httpRequest.getRemoteUser());
+    }
+
+    @Override
+    public String getRequestURI() {
+      return (httpRequest.getRequestURI());
+    }
+
+    @Override
+    public String getQueryString() {
+      return (httpRequest.getQueryString());
+    }
+
+    @Override
+    public String getMethod() {
+      return httpRequest.getMethod();
+    }
+
+    @Override
+    public void proceed() throws IOException, ServletException {
+      chain.doFilter(httpRequest, httpResponse);
+    }
+
+    @Override
+    public void sendError(int code, String message) throws IOException {
+      httpResponse.sendError(code, message);
+    }
+
+  }
+}

+ 139 - 107
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java

@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,18 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.web;
 
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY;
-
-import java.util.Enumeration;
-import java.util.Map;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-
 import io.netty.bootstrap.ChannelFactory;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -40,9 +32,6 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -55,11 +44,17 @@ import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.http.RestCsrfPreventionFilter;
 import org.apache.hadoop.security.ssl.SSLFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -67,6 +62,9 @@ import java.net.SocketException;
 import java.net.URI;
 import java.nio.channels.ServerSocketChannel;
 import java.security.GeneralSecurityException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
@@ -74,7 +72,19 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_INTERNAL_PROXY_PORT;
 
+/**
+ * Data node HTTP Server Class.
+ */
 public class DatanodeHttpServer implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(DatanodeHttpServer.class);
+  private static final ConcurrentHashMap<Class<?>, Object> HANDLER_STATE
+      = new ConcurrentHashMap<Class<?>, Object>() {};
+  // HttpServer threads are only used for the web UI and basic servlets, so
+  // set them to the minimum possible
+  private static final int HTTP_SELECTOR_THREADS = 1;
+  private static final int HTTP_ACCEPTOR_THREADS = 1;
+  private static final int HTTP_MAX_THREADS =
+      HTTP_SELECTOR_THREADS + HTTP_ACCEPTOR_THREADS + 1;
   private final HttpServer2 infoServer;
   private final EventLoopGroup bossGroup;
   private final EventLoopGroup workerGroup;
@@ -84,23 +94,13 @@ public class DatanodeHttpServer implements Closeable {
   private final ServerBootstrap httpsServer;
   private final Configuration conf;
   private final Configuration confForCreate;
-  private final RestCsrfPreventionFilter restCsrfPreventionFilter;
   private InetSocketAddress httpAddress;
   private InetSocketAddress httpsAddress;
-  static final Logger LOG = LoggerFactory.getLogger(DatanodeHttpServer.class);
-
-  // HttpServer threads are only used for the web UI and basic servlets, so
-  // set them to the minimum possible
-  private static final int HTTP_SELECTOR_THREADS = 1;
-  private static final int HTTP_ACCEPTOR_THREADS = 1;
-  private static final int HTTP_MAX_THREADS =
-      HTTP_SELECTOR_THREADS + HTTP_ACCEPTOR_THREADS + 1;
 
   public DatanodeHttpServer(final Configuration conf,
-      final DataNode datanode,
-      final ServerSocketChannel externalHttpChannel)
-    throws IOException {
-    this.restCsrfPreventionFilter = createRestCsrfPreventionFilter(conf);
+        final DataNode datanode,
+        final ServerSocketChannel externalHttpChannel)
+        throws IOException {
     this.conf = conf;
 
     Configuration confForInfoServer = new Configuration(conf);
@@ -136,7 +136,7 @@ public class DatanodeHttpServer implements Closeable {
     this.infoServer.setAttribute("datanode", datanode);
     this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
     this.infoServer.addServlet(null, "/blockScannerReport",
-                               BlockScanner.Servlet.class);
+        BlockScanner.Servlet.class);
     DataNodeUGIProvider.init(conf);
     this.infoServer.start();
     final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
@@ -148,24 +148,26 @@ public class DatanodeHttpServer implements Closeable {
     this.workerGroup = new NioEventLoopGroup();
     this.externalHttpChannel = externalHttpChannel;
     HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
+    final ChannelHandler[] handlers = getFilterHandlers(conf);
 
     if (policy.isHttpEnabled()) {
       this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup)
-        .childHandler(new ChannelInitializer<SocketChannel>() {
-        @Override
-        protected void initChannel(SocketChannel ch) throws Exception {
-          ChannelPipeline p = ch.pipeline();
-          p.addLast(new HttpRequestDecoder(),
-            new HttpResponseEncoder());
-          if (restCsrfPreventionFilter != null) {
-            p.addLast(new RestCsrfPreventionFilterHandler(
-                restCsrfPreventionFilter));
-          }
-          p.addLast(
-              new ChunkedWriteHandler(),
-              new URLDispatcher(jettyAddr, conf, confForCreate));
-        }
-      });
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+              @Override
+              protected void initChannel(SocketChannel ch) throws Exception {
+                ChannelPipeline p = ch.pipeline();
+                p.addLast(new HttpRequestDecoder(),
+                    new HttpResponseEncoder());
+                if (handlers != null) {
+                  for (ChannelHandler c : handlers) {
+                    p.addLast(c);
+                  }
+                }
+                p.addLast(
+                    new ChunkedWriteHandler(),
+                    new URLDispatcher(jettyAddr, conf, confForCreate));
+              }
+            });
 
       this.httpServer.childOption(
           ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,
@@ -188,7 +190,9 @@ public class DatanodeHttpServer implements Closeable {
               // The channel has been bounded externally via JSVC,
               // thus bind() becomes a no-op.
               @Override
-              protected void doBind(SocketAddress localAddress) throws Exception {}
+              protected void doBind(SocketAddress localAddress)
+                  throws Exception {
+              }
             };
           }
         });
@@ -205,30 +209,92 @@ public class DatanodeHttpServer implements Closeable {
         throw new IOException(e);
       }
       this.httpsServer = new ServerBootstrap().group(bossGroup, workerGroup)
-        .channel(NioServerSocketChannel.class)
-        .childHandler(new ChannelInitializer<SocketChannel>() {
-          @Override
-          protected void initChannel(SocketChannel ch) throws Exception {
-            ChannelPipeline p = ch.pipeline();
-            p.addLast(
-                new SslHandler(sslFactory.createSSLEngine()),
-                new HttpRequestDecoder(),
-                new HttpResponseEncoder());
-            if (restCsrfPreventionFilter != null) {
-              p.addLast(new RestCsrfPreventionFilterHandler(
-                  restCsrfPreventionFilter));
+          .channel(NioServerSocketChannel.class)
+          .childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            protected void initChannel(SocketChannel ch) throws Exception {
+              ChannelPipeline p = ch.pipeline();
+              p.addLast(
+                  new SslHandler(sslFactory.createSSLEngine()),
+                  new HttpRequestDecoder(),
+                  new HttpResponseEncoder());
+              if (handlers != null) {
+                for (ChannelHandler c : handlers) {
+                  p.addLast(c);
+                }
+              }
+              p.addLast(
+                  new ChunkedWriteHandler(),
+                  new URLDispatcher(jettyAddr, conf, confForCreate));
             }
-            p.addLast(
-                new ChunkedWriteHandler(),
-                new URLDispatcher(jettyAddr, conf, confForCreate));
-          }
-        });
+          });
     } else {
       this.httpsServer = null;
       this.sslFactory = null;
     }
   }
 
+  private static String getHostnameForSpnegoPrincipal(Configuration conf) {
+    String addr = conf.getTrimmed(DFS_DATANODE_HTTP_ADDRESS_KEY, null);
+    if (addr == null) {
+      addr = conf.getTrimmed(DFS_DATANODE_HTTPS_ADDRESS_KEY,
+          DFS_DATANODE_HTTPS_ADDRESS_DEFAULT);
+    }
+    InetSocketAddress inetSocker = NetUtils.createSocketAddr(addr);
+    return inetSocker.getHostString();
+  }
+
+  /* Get an array of ChannelHandlers specified in the conf
+   * @param conf configuration to read and pass
+   * @return array of ChannelHandlers ready to be used
+   * @throws NoSuchMethodException if the handler does not implement a method
+   *  initializeState(conf)
+   * @throws InvocationTargetException if the handler's initalizeState method
+   *  raises an exception
+   */
+  private ChannelHandler[] getFilterHandlers(Configuration configuration) {
+    if (configuration == null) {
+      return null;
+    }
+    // If the hdfs-site.xml has the proper configs for filter classes, use them.
+    Class<?>[] classes =
+        configuration.getClasses(
+            DFSConfigKeys.DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS);
+
+    // else use the hard coded class from the default configuration.
+    if (classes == null) {
+      classes =
+          configuration.getClasses(
+              DFSConfigKeys.DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS_DEFAULT);
+    }
+
+    // if we are not able to find any handlers, let us fail since running
+    // with Csrf will is a security hole. Let us abort the startup.
+    if(classes == null)  {
+      return null;
+    }
+
+    ChannelHandler[] handlers = new ChannelHandler[classes.length];
+    for (int i = 0; i < classes.length; i++) {
+      LOG.debug("Loading filter handler {}", classes[i].getName());
+      try {
+        Method initializeState = classes[i].getDeclaredMethod("initializeState",
+            Configuration.class);
+        Constructor constructor =
+            classes[i].getDeclaredConstructor(initializeState.getReturnType());
+        handlers[i] = (ChannelHandler) constructor.newInstance(
+            HANDLER_STATE.getOrDefault(classes[i],
+            initializeState.invoke(null, configuration)));
+      } catch (NoSuchMethodException | InvocationTargetException
+          | IllegalAccessException | InstantiationException
+          | IllegalArgumentException e) {
+        LOG.error("Failed to initialize handler {}", classes[i].toString());
+        throw new RuntimeException(e);
+      }
+    }
+    return (handlers);
+  }
+
   public InetSocketAddress getHttpAddress() {
     return httpAddress;
   }
@@ -294,55 +360,21 @@ public class DatanodeHttpServer implements Closeable {
     }
   }
 
-  private static String getHostnameForSpnegoPrincipal(Configuration conf) {
-    String addr = conf.getTrimmed(DFS_DATANODE_HTTP_ADDRESS_KEY, null);
-    if (addr == null) {
-      addr = conf.getTrimmed(DFS_DATANODE_HTTPS_ADDRESS_KEY,
-                             DFS_DATANODE_HTTPS_ADDRESS_DEFAULT);
-    }
-    InetSocketAddress inetSocker = NetUtils.createSocketAddr(addr);
-    return inetSocker.getHostString();
-  }
-
-  /**
-   * Creates the {@link RestCsrfPreventionFilter} for the DataNode.  Since the
-   * DataNode HTTP server is not implemented in terms of the servlet API, it
-   * takes some extra effort to obtain an instance of the filter.  This method
-   * takes care of configuration and implementing just enough of the servlet API
-   * and related interfaces so that the DataNode can get a fully initialized
-   * instance of the filter.
-   *
-   * @param conf configuration to read
-   * @return initialized filter, or null if CSRF protection not enabled
-   */
-  private static RestCsrfPreventionFilter createRestCsrfPreventionFilter(
-      Configuration conf) {
-    if (!conf.getBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY,
-        DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT)) {
-      return null;
-    }
-    String restCsrfClassName = RestCsrfPreventionFilter.class.getName();
-    Map<String, String> restCsrfParams = RestCsrfPreventionFilter
-        .getFilterParams(conf, "dfs.webhdfs.rest-csrf.");
-    RestCsrfPreventionFilter filter = new RestCsrfPreventionFilter();
-    try {
-      filter.init(new MapBasedFilterConfig(restCsrfClassName, restCsrfParams));
-    } catch (ServletException e) {
-      throw new IllegalStateException(
-          "Failed to initialize RestCsrfPreventionFilter.", e);
-    }
-    return filter;
-  }
-
   /**
-   * A minimal {@link FilterConfig} implementation backed by a {@link Map}.
+   * Since the DataNode HTTP server is not implemented in terms of the
+   * servlet API, it
+   * takes some extra effort to obtain an instance of the filter.  This
+   * method provides
+   * a minimal {@link FilterConfig} implementation backed by a {@link Map}.
+   * Call this from
+   * your filter handler to initialize a servlet filter.
    */
-  private static final class MapBasedFilterConfig implements FilterConfig {
+  public static final class MapBasedFilterConfig implements FilterConfig {
 
     private final String filterName;
     private final Map<String, String> parameters;
 
-    /**
+    /*
      * Creates a new MapBasedFilterConfig.
      *
      * @param filterName filter name
@@ -374,10 +406,10 @@ public class DatanodeHttpServer implements Closeable {
       throw this.notImplemented();
     }
 
-    /**
+    /*
      * Creates an exception indicating that an interface method is not
-     * implemented.  These should never be seen in practice, because it is only
-     * used for methods that are not called by {@link RestCsrfPreventionFilter}.
+     * implemented. If you are building a handler it is possible you will
+     * need to make this interface more extensive.
      *
      * @return exception indicating method not implemented
      */

+ 240 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/HostRestrictingAuthorizationFilterHandler.java

@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HostRestrictingAuthorizationFilter;
+import org.apache.hadoop.hdfs.server.common.HostRestrictingAuthorizationFilter.HttpInteraction;
+import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/*
+ * Netty handler that integrates with the {@link
+ * HostRestrictingAuthorizationFilter}.  If
+ * the filter determines that the request is allowed, then this handler forwards
+ * the request to the next handler in the Netty pipeline.  Otherwise, this
+ * handler drops the request and sends an HTTP 403 response.
+ */
+@InterfaceAudience.Private
+@Sharable
+final class HostRestrictingAuthorizationFilterHandler
+    extends SimpleChannelInboundHandler<HttpRequest> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HostRestrictingAuthorizationFilterHandler.class);
+  private final
+  HostRestrictingAuthorizationFilter hostRestrictingAuthorizationFilter;
+
+  /*
+   * Creates a new HostRestrictingAuthorizationFilterHandler.  There will be
+   * a new instance created for each new Netty channel/pipeline serving a new
+   * request.
+   *
+   * To prevent the cost of repeated initialization of the filter, this
+   * constructor requires the caller to pass in a pre-built, fully initialized
+   * filter instance.  The filter is stateless after initialization, so it can
+   * be shared across multiple Netty channels/pipelines.
+   *
+   * @param hostRestrictingAuthorizationFilter initialized filter
+   */
+  public HostRestrictingAuthorizationFilterHandler(
+      HostRestrictingAuthorizationFilter hostRestrictingAuthorizationFilter) {
+    this.hostRestrictingAuthorizationFilter =
+        hostRestrictingAuthorizationFilter;
+  }
+
+  /*
+   * Creates a new HostRestrictingAuthorizationFilterHandler.  There will be
+   * a new instance created for each new Netty channel/pipeline serving a new
+   * request.
+   * To prevent the cost of repeated initialization of the filter, this
+   * constructor requires the caller to pass in a pre-built, fully initialized
+   * filter instance.  The filter is stateless after initialization, so it can
+   * be shared across multiple Netty channels/pipelines.
+   */
+  public HostRestrictingAuthorizationFilterHandler() {
+    Configuration conf = new Configuration();
+    this.hostRestrictingAuthorizationFilter = initializeState(conf);
+  }
+
+  /*
+   * Creates a {@link HostRestrictingAuthorizationFilter} for the
+   * {@DatanodeHttpServer}.
+   * This method takes care of configuration and implementing just enough of the
+   * servlet API and related interfaces so that the DataNode can get a fully
+   * initialized
+   * instance of the filter.
+   *
+   * @param conf configuration to read
+   * @return initialized filter, or null if CSRF protection not enabled
+   * @throws IllegalStateException if filter fails initialization
+   */
+  public static HostRestrictingAuthorizationFilter
+  initializeState(Configuration conf) {
+    String confName = HostRestrictingAuthorizationFilter.HDFS_CONFIG_PREFIX +
+        HostRestrictingAuthorizationFilter.RESTRICTION_CONFIG;
+    String confValue = conf.get(confName);
+    // simply pass a blank value if we do not have one set
+    confValue = (confValue == null ? "" : confValue);
+
+    Map<String, String> confMap =
+        ImmutableMap.of(HostRestrictingAuthorizationFilter.RESTRICTION_CONFIG
+            , confValue);
+    FilterConfig fc =
+        new DatanodeHttpServer.MapBasedFilterConfig(
+            HostRestrictingAuthorizationFilter.class.getName(), confMap);
+    HostRestrictingAuthorizationFilter hostRestrictingAuthorizationFilter =
+        new HostRestrictingAuthorizationFilter();
+    try {
+      hostRestrictingAuthorizationFilter.init(fc);
+    } catch (ServletException e) {
+      throw new IllegalStateException(
+          "Failed to initialize HostRestrictingAuthorizationFilter.", e);
+    }
+    return hostRestrictingAuthorizationFilter;
+  }
+
+  /*
+   * Finish handling this pipeline by writing a response with the
+   * "Connection: close" header, flushing, and scheduling a close of the
+   * connection.
+   *
+   * @param ctx context to receive the response
+   * @param resp response to send
+   */
+  private static void sendResponseAndClose(ChannelHandlerContext ctx,
+      DefaultHttpResponse resp) {
+    resp.headers().set(CONNECTION, CLOSE);
+    ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
+  }
+
+  @Override
+  protected void channelRead0(final ChannelHandlerContext ctx,
+      final HttpRequest req) throws Exception {
+    hostRestrictingAuthorizationFilter
+        .handleInteraction(new NettyHttpInteraction(ctx, req));
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOG.error("Exception in " + this.getClass().getSimpleName(), cause);
+    sendResponseAndClose(ctx,
+        new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+  }
+
+  /*
+   * {@link HttpInteraction} implementation for use in a Netty pipeline.
+   */
+  private static final class NettyHttpInteraction implements HttpInteraction {
+
+    private final ChannelHandlerContext ctx;
+    private final HttpRequest req;
+    private boolean committed;
+
+    /*
+     * Creates a new NettyHttpInteraction.
+     *
+     * @param ctx context to receive the response
+     * @param req request to process
+     */
+    public NettyHttpInteraction(ChannelHandlerContext ctx, HttpRequest req) {
+      this.committed = false;
+      this.ctx = ctx;
+      this.req = req;
+    }
+
+    @Override
+    public boolean isCommitted() {
+      return committed;
+    }
+
+    @Override
+    public String getRemoteAddr() {
+      return ((InetSocketAddress) ctx.channel().remoteAddress()).
+          getAddress().getHostAddress();
+    }
+
+    @Override
+    public String getQueryString() {
+      try {
+        return (new URI(req.getUri()).getQuery());
+      } catch (URISyntaxException e) {
+        return null;
+      }
+    }
+
+    @Override
+    public String getRequestURI() {
+      String uri = req.getUri();
+      // Netty's getUri includes the query string, while Servlet's does not
+      return (uri.substring(0, uri.indexOf("?") >= 0 ? uri.indexOf("?") :
+          uri.length()));
+    }
+
+    @Override
+    public String getRemoteUser() {
+      QueryStringDecoder queryString = new QueryStringDecoder(req.getUri());
+      List<String> p = queryString.parameters().get(UserParam.NAME);
+      String user = (p == null ? null : p.get(0));
+      return (new UserParam(user).getValue());
+    }
+
+    @Override
+    public String getMethod() {
+      return req.getMethod().name();
+    }
+
+    @Override
+    public void proceed() {
+      ReferenceCountUtil.retain(req);
+      ctx.fireChannelRead(req);
+    }
+
+    @Override
+    public void sendError(int code, String message) {
+      HttpResponseStatus status = new HttpResponseStatus(code, message);
+      sendResponseAndClose(ctx, new DefaultHttpResponse(HTTP_1_1, status));
+      this.committed = true;
+    }
+  }
+}

+ 55 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java

@@ -21,8 +21,21 @@ import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
 import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY;
+
+import java.util.Map;
+
+import javax.servlet.ServletException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.http.RestCsrfPreventionFilter;
+import org.apache.hadoop.security.http.RestCsrfPreventionFilter.HttpInteraction;
+import org.slf4j.Logger;
 
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultHttpResponse;
@@ -30,11 +43,6 @@ import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.util.ReferenceCountUtil;
 
-import org.slf4j.Logger;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.http.RestCsrfPreventionFilter;
-import org.apache.hadoop.security.http.RestCsrfPreventionFilter.HttpInteraction;
 
 /**
  * Netty handler that integrates with the {@link RestCsrfPreventionFilter}.  If
@@ -43,6 +51,7 @@ import org.apache.hadoop.security.http.RestCsrfPreventionFilter.HttpInteraction;
  * handler drops the request and immediately sends an HTTP 400 response.
  */
 @InterfaceAudience.Private
+@Sharable
 final class RestCsrfPreventionFilterHandler
     extends SimpleChannelInboundHandler<HttpRequest> {
 
@@ -60,16 +69,24 @@ final class RestCsrfPreventionFilterHandler
    *
    * @param restCsrfPreventionFilter initialized filter
    */
-  public RestCsrfPreventionFilterHandler(
+  RestCsrfPreventionFilterHandler(
       RestCsrfPreventionFilter restCsrfPreventionFilter) {
+    if(restCsrfPreventionFilter == null) {
+      LOG.warn("Got null for restCsrfPreventionFilter - will not do any filtering.");
+    }
     this.restCsrfPreventionFilter = restCsrfPreventionFilter;
   }
 
   @Override
   protected void channelRead0(final ChannelHandlerContext ctx,
       final HttpRequest req) throws Exception {
-    restCsrfPreventionFilter.handleHttpInteraction(new NettyHttpInteraction(
-        ctx, req));
+    if(restCsrfPreventionFilter != null) {
+      restCsrfPreventionFilter.handleHttpInteraction(new NettyHttpInteraction(
+          ctx, req));
+    } else {
+      // we do not have a valid filter simply pass requests
+      new NettyHttpInteraction(ctx, req).proceed();
+    }
   }
 
   @Override
@@ -107,7 +124,7 @@ final class RestCsrfPreventionFilterHandler
      * @param ctx context to receive the response
      * @param req request to process
      */
-    public NettyHttpInteraction(ChannelHandlerContext ctx, HttpRequest req) {
+    NettyHttpInteraction(ChannelHandlerContext ctx, HttpRequest req) {
       this.ctx = ctx;
       this.req = req;
     }
@@ -134,4 +151,33 @@ final class RestCsrfPreventionFilterHandler
       sendResponseAndClose(ctx, new DefaultHttpResponse(HTTP_1_1, status));
     }
   }
+
+  /**
+   * Creates a {@link RestCsrfPreventionFilter} for the {@DatanodeHttpServer}.
+   * This method takes care of configuration and implementing just enough of the
+   * servlet API and related interfaces so that the DataNode can get a fully
+   * initialized instance of the filter.
+   *
+   * @param conf configuration to read
+   * @return initialized filter, or null if CSRF protection not enabled
+   */
+  public static RestCsrfPreventionFilter initializeState(
+      Configuration conf) {
+    if (!conf.getBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY,
+        DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT)) {
+      return null;
+    }
+    String restCsrfClassName = RestCsrfPreventionFilter.class.getName();
+    Map<String, String> restCsrfParams = RestCsrfPreventionFilter
+        .getFilterParams(conf, "dfs.webhdfs.rest-csrf.");
+    RestCsrfPreventionFilter filter = new RestCsrfPreventionFilter();
+    try {
+      filter.init(new DatanodeHttpServer
+          .MapBasedFilterConfig(restCsrfClassName, restCsrfParams));
+    } catch (ServletException e) {
+      throw new IllegalStateException(
+          "Failed to initialize RestCsrfPreventionFilter.", e);
+    }
+    return(filter);
+  }
 }

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Data node HTTP classes.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web;

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -319,6 +319,13 @@
     </description>
   </property>
 
+<property>
+  <name>dfs.datanode.httpserver.filter.handlers</name>
+  <value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>
+  <description>Comma separated list of Netty servlet-style filter handlers to inject into the Datanode WebHDFS I/O path
+  </description>
+</property>
+
 <property>
   <name>dfs.default.chunk.view.size</name>
   <value>32768</value>

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md

@@ -210,6 +210,7 @@ The following properties control CSRF prevention.
 | `dfs.webhdfs.rest-csrf.custom-header` | The name of a custom header that HTTP requests must send when protection against cross-site request forgery (CSRF) is enabled for WebHDFS by setting dfs.webhdfs.rest-csrf.enabled to true.  The WebHDFS client also uses this property to determine whether or not it needs to send the custom CSRF prevention header in its HTTP requests. | `X-XSRF-HEADER` |
 | `dfs.webhdfs.rest-csrf.methods-to-ignore` | A comma-separated list of HTTP methods that do not require HTTP requests to include a custom header when protection against cross-site request forgery (CSRF) is enabled for WebHDFS by setting dfs.webhdfs.rest-csrf.enabled to true.  The WebHDFS client also uses this property to determine whether or not it needs to send the custom CSRF prevention header in its HTTP requests. | `GET,OPTIONS,HEAD,TRACE` |
 | `dfs.webhdfs.rest-csrf.browser-useragents-regex` | A comma-separated list of regular expressions used to match against an HTTP request's User-Agent header when protection against cross-site request forgery (CSRF) is enabled for WebHDFS by setting dfs.webhdfs.reset-csrf.enabled to true.  If the incoming User-Agent matches any of these regular expressions, then the request is considered to be sent by a browser, and therefore CSRF prevention is enforced.  If the request's User-Agent does not match any of these regular expressions, then the request is considered to be sent by something other than a browser, such as scripted automation.  In this case, CSRF is not a potential attack vector, so the prevention is not enforced.  This helps achieve backwards-compatibility with existing automation that has not been updated to send the CSRF prevention header. | `^Mozilla.*,^Opera.*` |
+| `dfs.datanode.httpserver.filter.handlers` | Comma separated list of Netty servlet-style filter handlers to inject into the Datanode WebHDFS I/O path | `org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler` |
 
 The following is an example `curl` call that uses the `-H` option to include the
 custom header in the request.
@@ -233,6 +234,15 @@ The following properties control WebHDFS retry and failover policy.
 | `dfs.http.client.failover.sleep.base.millis` | Specify the base amount of time in milliseconds upon which the exponentially increased sleep time between retries or failovers is calculated for WebHDFS client. | `500` |
 | `dfs.http.client.failover.sleep.max.millis` | Specify the upper bound of sleep time in milliseconds between retries or failovers for WebHDFS client. | `15000` |
 
+WebHDFS Request Filtering
+-------------------------------------
+One may control directionality of data in the WebHDFS protocol allowing only writing data from insecure networks. To enable, one must ensure `dfs.datanode.httpserver.filter.handlers` includes `org.apache.hadoop.hdfs.server.datanode.web.HostRestrictingAuthorizationFilterHandler`.  Configuration of the `HostRestrictingAuthorizationFilter` is controlled via the following properties.
+
+| Property | Description | Default Value |
+|:---- |:---- |:----
+| `dfs.datanode.httpserver.filter.handlers` | Comma separated list of Netty servlet-style filter handlers to inject into the Datanode WebHDFS I/O path | `org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler` |
+| `dfs.web.authentication.host.allow.rules` | Rules allowing users to read files in the format of _user_,_network/bits_,_path glob_ newline or `|`-separated. Use `*` for a wildcard of all _users_ or _network/bits_. | nothing - defaults to no one may read via WebHDFS |
+
 File and Directory Operations
 -----------------------------
 

+ 275 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestHostRestrictingAuthorizationFilter.java

@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test Host Restriction Filter.
+ */
+public class TestHostRestrictingAuthorizationFilter {
+  private Logger log =
+      LoggerFactory.getLogger(TestHostRestrictingAuthorizationFilter.class);
+
+  /*
+   * Test running in unrestricted mode
+   */
+  @Test
+  public void testAcceptAll() throws Exception {
+    HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteAddr()).thenReturn(null);
+    Mockito.when(request.getMethod()).thenReturn("GET");
+    Mockito.when(request.getRequestURI())
+        .thenReturn(new StringBuffer(WebHdfsFileSystem.PATH_PREFIX + "/user" +
+            "/ubuntu/foo").toString());
+    Mockito.when(request.getQueryString()).thenReturn("op=OPEN");
+    Mockito.when(request.getRemoteAddr()).thenReturn("192.168.1.2");
+
+    HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+    FilterChain chain = new FilterChain() {
+      @Override
+      public void doFilter(ServletRequest servletRequest,
+          ServletResponse servletResponse)
+          throws IOException, ServletException {
+      }
+    };
+
+    Filter filter = new HostRestrictingAuthorizationFilter();
+
+    HashMap<String, String> configs = new HashMap<String, String>() {
+    };
+    String allowRule = "*,*,/";
+    log.trace("Passing configs:\n{}", allowRule);
+    configs.put("host.allow.rules", allowRule);
+    configs.put(AuthenticationFilter.AUTH_TYPE, "simple");
+    FilterConfig fc = new DummyFilterConfig(configs);
+
+    filter.init(fc);
+    filter.doFilter(request, response, chain);
+    Mockito.verify(response, Mockito.times(0)).sendError(Mockito.eq(HttpServletResponse.SC_FORBIDDEN),
+        Mockito.anyString());
+    filter.destroy();
+  }
+
+  /*
+   * Test accepting a GET request for the file checksum when prohibited from
+   * doing
+   * a GET open call
+   */
+  @Test
+  public void testAcceptGETFILECHECKSUM() throws Exception {
+    HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteAddr()).thenReturn(null);
+    Mockito.when(request.getMethod()).thenReturn("GET");
+    Mockito.when(request.getRequestURI())
+        .thenReturn(new StringBuffer(WebHdfsFileSystem.PATH_PREFIX + "/user" +
+            "/ubuntu/").toString());
+    Mockito.when(request.getQueryString()).thenReturn("op=GETFILECHECKSUM ");
+    Mockito.when(request.getRemoteAddr()).thenReturn("192.168.1.2");
+
+    HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+    FilterChain chain = new FilterChain() {
+      @Override
+      public void doFilter(ServletRequest servletRequest,
+          ServletResponse servletResponse)
+          throws IOException, ServletException {
+      }
+    };
+
+    Filter filter = new HostRestrictingAuthorizationFilter();
+
+    HashMap<String, String> configs = new HashMap<String, String>() {
+    };
+    configs.put(AuthenticationFilter.AUTH_TYPE, "simple");
+    FilterConfig fc = new DummyFilterConfig(configs);
+
+    filter.init(fc);
+    filter.doFilter(request, response, chain);
+    Mockito.verify(response, Mockito.times(0)).sendError(Mockito.eq(HttpServletResponse.SC_FORBIDDEN),
+        Mockito.anyString());
+    filter.destroy();
+  }
+
+  /*
+   * Test accepting a GET request for reading a file via an open call
+   */
+  @Test
+  public void testRuleAllowedGet() throws Exception {
+    HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteAddr()).thenReturn(null);
+    Mockito.when(request.getMethod()).thenReturn("GET");
+    String queryString = "op=OPEN";
+    Mockito.when(request.getRequestURI())
+        .thenReturn(new StringBuffer(WebHdfsFileSystem.PATH_PREFIX + "/user" +
+            "/ubuntu/foo?" + queryString).toString());
+    Mockito.when(request.getQueryString()).thenReturn(queryString);
+    Mockito.when(request.getRemoteAddr()).thenReturn("192.168.1.2");
+
+    HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+    FilterChain chain = new FilterChain() {
+      @Override
+      public void doFilter(ServletRequest servletRequest,
+          ServletResponse servletResponse)
+          throws IOException, ServletException {
+      }
+    };
+
+    Filter filter = new HostRestrictingAuthorizationFilter();
+
+    HashMap<String, String> configs = new HashMap<String, String>() {
+    };
+    String allowRule = "ubuntu,127.0.0.1/32,/localbits/*|*,192.168.0.1/22," +
+        "/user/ubuntu/*";
+    log.trace("Passing configs:\n{}", allowRule);
+    configs.put("host.allow.rules", allowRule);
+    configs.put(AuthenticationFilter.AUTH_TYPE, "simple");
+    FilterConfig fc = new DummyFilterConfig(configs);
+
+    filter.init(fc);
+    filter.doFilter(request, response, chain);
+    filter.destroy();
+  }
+
+  /*
+   * Test by default we deny an open call GET request
+   */
+  @Test
+  public void testRejectsGETs() throws Exception {
+    HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteAddr()).thenReturn(null);
+    Mockito.when(request.getMethod()).thenReturn("GET");
+    String queryString = "bar=foo&delegationToken=dt&op=OPEN";
+    Mockito.when(request.getRequestURI())
+        .thenReturn(new StringBuffer(WebHdfsFileSystem.PATH_PREFIX + "/user" +
+            "/ubuntu/?" + queryString).toString());
+    Mockito.when(request.getQueryString()).thenReturn(queryString);
+    Mockito.when(request.getRemoteAddr()).thenReturn("192.168.1.2");
+
+    HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+    FilterChain chain = new FilterChain() {
+      @Override
+      public void doFilter(ServletRequest servletRequest,
+          ServletResponse servletResponse)
+          throws IOException, ServletException {
+      }
+    };
+
+    Filter filter = new HostRestrictingAuthorizationFilter();
+
+    HashMap<String, String> configs = new HashMap<String, String>() {
+    };
+    configs.put(AuthenticationFilter.AUTH_TYPE, "simple");
+    FilterConfig fc = new DummyFilterConfig(configs);
+
+    filter.init(fc);
+    filter.doFilter(request, response, chain);
+    filter.destroy();
+  }
+
+  /*
+   * Test acceptable behavior to malformed requests
+   * Case: no operation (op parameter) specified
+   */
+  @Test
+  public void testUnexpectedInputMissingOpParameter() throws Exception {
+    HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteAddr()).thenReturn(null);
+    Mockito.when(request.getMethod()).thenReturn("GET");
+    Mockito.when(request.getRequestURI())
+        .thenReturn(new StringBuffer(WebHdfsFileSystem.PATH_PREFIX +
+            "/IAmARandomRequest/").toString());
+    Mockito.when(request.getQueryString()).thenReturn(null);
+    Mockito.when(request.getRemoteAddr()).thenReturn("192.168.1.2");
+
+    HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+    FilterChain chain = new FilterChain() {
+      @Override
+      public void doFilter(ServletRequest servletRequest,
+          ServletResponse servletResponse)
+          throws IOException, ServletException {
+      }
+    };
+
+    Filter filter = new HostRestrictingAuthorizationFilter();
+
+    HashMap<String, String> configs = new HashMap<String, String>() {
+    };
+    configs.put(AuthenticationFilter.AUTH_TYPE, "simple");
+    FilterConfig fc = new DummyFilterConfig(configs);
+
+    filter.init(fc);
+    filter.doFilter(request, response, chain);
+    log.error("XXX {}", response.getStatus());
+    filter.destroy();
+  }
+
+  private static class DummyFilterConfig implements FilterConfig {
+    final Map<String, String> map;
+
+    DummyFilterConfig(Map<String, String> map) {
+      this.map = map;
+    }
+
+    @Override
+    public String getFilterName() {
+      return "dummy";
+    }
+
+    @Override
+    public String getInitParameter(String arg0) {
+      return map.get(arg0);
+    }
+
+    @Override
+    public Enumeration<String> getInitParameterNames() {
+      return Collections.enumeration(map.keySet());
+    }
+
+    @Override
+    public ServletContext getServletContext() {
+      ServletContext context = Mockito.mock(ServletContext.class);
+      Mockito.when(context.getAttribute(AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)).thenReturn(null);
+      return context;
+    }
+  }
+}

+ 178 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/TestHostRestrictingAuthorizationFilterHandler.java

@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HostRestrictingAuthorizationFilter;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHostRestrictingAuthorizationFilterHandler {
+
+  final static String CONFNAME =
+      HostRestrictingAuthorizationFilter.HDFS_CONFIG_PREFIX +
+          HostRestrictingAuthorizationFilter.RESTRICTION_CONFIG;
+
+  /*
+   * Test running in with no ACL rules (restrict all)
+   */
+  @Test
+  public void testRejectAll() throws Exception {
+    EmbeddedChannel channel = new CustomEmbeddedChannel("127.0.0.1", 1006,
+        new HostRestrictingAuthorizationFilterHandler());
+    FullHttpRequest httpRequest =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/user/myName/fooFile?op=OPEN");
+    // we will send back an error so ensure our write returns false
+    assertFalse("Should get error back from handler for rejected request",
+        channel.writeInbound(httpRequest));
+    DefaultHttpResponse channelResponse =
+        (DefaultHttpResponse) channel.outboundMessages().poll();
+    assertNotNull("Expected response to exist.", channelResponse);
+    assertEquals(HttpResponseStatus.FORBIDDEN, channelResponse.getStatus());
+    assertFalse(channel.isOpen());
+  }
+
+  /*
+   * Test accepting multiple allowed GET requests to ensure channel can be
+   * reused
+   */
+  @Test
+  public void testMultipleAcceptedGETsOneChannel() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(CONFNAME, "*,*,/allowed");
+    HostRestrictingAuthorizationFilter filter =
+        HostRestrictingAuthorizationFilterHandler.initializeState(conf);
+    EmbeddedChannel channel = new CustomEmbeddedChannel("127.0.0.1", 1006,
+        new HostRestrictingAuthorizationFilterHandler(filter));
+    FullHttpRequest allowedHttpRequest =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/allowed/file_one?op=OPEN");
+    FullHttpRequest allowedHttpRequest2 =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/allowed/file_two?op=OPEN");
+    FullHttpRequest allowedHttpRequest3 =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/allowed/file_three?op=OPEN");
+    assertTrue("Should successfully accept request",
+        channel.writeInbound(allowedHttpRequest));
+    assertTrue("Should successfully accept request, second time",
+        channel.writeInbound(allowedHttpRequest2));
+    assertTrue("Should successfully accept request, third time",
+        channel.writeInbound(allowedHttpRequest3));
+  }
+
+  /*
+   * Test accepting multiple allowed GET requests in different channels to a
+   * single filter instance
+   */
+  @Test
+  public void testMultipleChannels() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(CONFNAME, "*,*,/allowed");
+    HostRestrictingAuthorizationFilter filter =
+        HostRestrictingAuthorizationFilterHandler.initializeState(conf);
+    EmbeddedChannel channel1 = new CustomEmbeddedChannel("127.0.0.1", 1006,
+        new HostRestrictingAuthorizationFilterHandler(filter));
+    EmbeddedChannel channel2 = new CustomEmbeddedChannel("127.0.0.2", 1006,
+        new HostRestrictingAuthorizationFilterHandler(filter));
+    EmbeddedChannel channel3 = new CustomEmbeddedChannel("127.0.0.3", 1006,
+        new HostRestrictingAuthorizationFilterHandler(filter));
+    FullHttpRequest allowedHttpRequest =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/allowed/file_one?op=OPEN");
+    FullHttpRequest allowedHttpRequest2 =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/allowed/file_two?op=OPEN");
+    FullHttpRequest allowedHttpRequest3 =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/allowed/file_three?op=OPEN");
+    assertTrue("Should successfully accept request",
+        channel1.writeInbound(allowedHttpRequest));
+    assertTrue("Should successfully accept request, second time",
+        channel2.writeInbound(allowedHttpRequest2));
+
+    // verify closing one channel does not affect remaining channels
+    channel1.close();
+    assertTrue("Should successfully accept request, third time",
+        channel3.writeInbound(allowedHttpRequest3));
+  }
+
+  /*
+   * Test accepting a GET request for the file checksum
+   */
+  @Test
+  public void testAcceptGETFILECHECKSUM() throws Exception {
+    EmbeddedChannel channel = new CustomEmbeddedChannel("127.0.0.1", 1006,
+        new HostRestrictingAuthorizationFilterHandler());
+    FullHttpRequest httpRequest =
+        new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+            HttpMethod.GET,
+            WebHdfsFileSystem.PATH_PREFIX + "/user/myName/fooFile?op" +
+                "=GETFILECHECKSUM");
+    assertTrue("Should successfully accept request",
+        channel.writeInbound(httpRequest));
+  }
+
+  /*
+   * Custom channel implementation which allows for mocking a client's remote
+   * address
+   */
+  protected static class CustomEmbeddedChannel extends EmbeddedChannel {
+
+    private InetSocketAddress socketAddress;
+
+    /*
+     * A normal @{EmbeddedChannel} constructor which takes the remote client
+     * host and port to mock
+     */
+    public CustomEmbeddedChannel(String host, int port,
+        final ChannelHandler... handlers) {
+      super(handlers);
+      socketAddress = new InetSocketAddress(host, port);
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+      return this.socketAddress;
+    }
+  }
+}

+ 12 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithRestCsrfPreventionFilter.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.web;
 
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_BROWSER_USERAGENTS_REGEX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -76,14 +77,14 @@ public class TestWebHdfsWithRestCsrfPreventionFilter {
   @Parameters
   public static Iterable<Object[]> data() {
     return Arrays.asList(new Object[][] {
-        { false, false, false },
-        { true, true, true },
-        { true, true, false },
-        { true, false, true },
-        { true, false, false },
-        { false, true, true },
-        { false, true, false },
-        { false, false, true }});
+        {false, false, false},
+        {true, true, true},
+        {true, true, false},
+        {true, false, true},
+        {true, false, false},
+        {false, true, true},
+        {false, true, false},
+        {false, false, true}});
   }
 
   @Before
@@ -97,6 +98,9 @@ public class TestWebHdfsWithRestCsrfPreventionFilter {
 
     Configuration dnConf = new Configuration(nnConf);
     dnConf.setBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY, dnRestCsrf);
+    // By default the datanode loads the CSRF filter handler
+    dnConf.set(DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS,
+        "org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler");
     cluster.startDataNodes(dnConf, 1, true, null, null, null, null, false);
 
     cluster.waitActive();