Selaa lähdekoodia

HDFS-13909. RBF: Add Cache pools and directives related ClientProtocol APIs. Contributed by Ayush Saxena.

Ayush Saxena 6 vuotta sitten
vanhempi
commit
395312b821

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java

@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,9 +200,16 @@ public class RemoteMethod {
     for (int i = 0; i < this.params.length; i++) {
       Object currentObj = this.params[i];
       if (currentObj instanceof RemoteParam) {
-        // Map the parameter using the context
         RemoteParam paramGetter = (RemoteParam) currentObj;
-        objList[i] = paramGetter.getParameterForContext(context);
+        // Map the parameter using the context
+        if (this.types[i] == CacheDirectiveInfo.class) {
+          CacheDirectiveInfo path =
+              (CacheDirectiveInfo) paramGetter.getParameterForContext(context);
+          objList[i] = new CacheDirectiveInfo.Builder(path)
+              .setPath(new Path(context.getDest())).build();
+        } else {
+          objList[i] = paramGetter.getParameterForContext(context);
+        }
       } else {
         objList[i] = currentObj;
       }

+ 173 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java

@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+/**
+ * Module that implements all the RPC calls in
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to Cache Admin
+ * in the {@link RouterRpcServer}.
+ */
+public class RouterCacheAdmin {
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  public RouterCacheAdmin(RouterRpcServer server) {
+    this.rpcServer = server;
+    this.rpcClient = this.rpcServer.getRPCClient();
+    this.namenodeResolver = this.rpcClient.getNamenodeResolver();
+  }
+
+  public long addCacheDirective(CacheDirectiveInfo path,
+      EnumSet<CacheFlag> flags) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(path.getPath().toString(), true, false);
+    RemoteMethod method = new RemoteMethod("addCacheDirective",
+        new Class<?>[] {CacheDirectiveInfo.class, EnumSet.class},
+        new RemoteParam(getRemoteMap(path, locations)), flags);
+    Map<RemoteLocation, Long> response =
+        rpcClient.invokeConcurrent(locations, method, false, false, long.class);
+    return response.values().iterator().next();
+  }
+
+  public void modifyCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    Path p = directive.getPath();
+    if (p != null) {
+      final List<RemoteLocation> locations = rpcServer
+          .getLocationsForPath(directive.getPath().toString(), true, false);
+      RemoteMethod method = new RemoteMethod("modifyCacheDirective",
+          new Class<?>[] {CacheDirectiveInfo.class, EnumSet.class},
+          new RemoteParam(getRemoteMap(directive, locations)), flags);
+      rpcClient.invokeConcurrent(locations, method);
+      return;
+    }
+    RemoteMethod method = new RemoteMethod("modifyCacheDirective",
+        new Class<?>[] {CacheDirectiveInfo.class, EnumSet.class}, directive,
+        flags);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, false, false);
+  }
+
+  public void removeCacheDirective(long id) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    RemoteMethod method = new RemoteMethod("removeCacheDirective",
+        new Class<?>[] {long.class}, id);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, false, false);
+  }
+
+  public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
+      CacheDirectiveInfo filter) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    if (filter.getPath() != null) {
+      final List<RemoteLocation> locations = rpcServer
+          .getLocationsForPath(filter.getPath().toString(), true, false);
+      RemoteMethod method = new RemoteMethod("listCacheDirectives",
+          new Class<?>[] {long.class, CacheDirectiveInfo.class}, prevId,
+          new RemoteParam(getRemoteMap(filter, locations)));
+      Map<RemoteLocation, BatchedEntries> response = rpcClient.invokeConcurrent(
+          locations, method, false, false, BatchedEntries.class);
+      return response.values().iterator().next();
+    }
+    RemoteMethod method = new RemoteMethod("listCacheDirectives",
+        new Class<?>[] {long.class, CacheDirectiveInfo.class}, prevId,
+        filter);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, BatchedEntries> results = rpcClient
+        .invokeConcurrent(nss, method, true, false, BatchedEntries.class);
+    return results.values().iterator().next();
+  }
+
+  public void addCachePool(CachePoolInfo info) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    RemoteMethod method = new RemoteMethod("addCachePool",
+        new Class<?>[] {CachePoolInfo.class}, info);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false);
+  }
+
+  public void modifyCachePool(CachePoolInfo info) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    RemoteMethod method = new RemoteMethod("modifyCachePool",
+        new Class<?>[] {CachePoolInfo.class}, info);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false);
+  }
+
+  public void removeCachePool(String cachePoolName) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+    RemoteMethod method = new RemoteMethod("removeCachePool",
+        new Class<?>[] {String.class}, cachePoolName);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false);
+  }
+
+  public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    RemoteMethod method = new RemoteMethod("listCachePools",
+        new Class<?>[] {String.class}, prevKey);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, BatchedEntries> results = rpcClient
+        .invokeConcurrent(nss, method, true, false, BatchedEntries.class);
+    return results.values().iterator().next();
+  }
+
+  /**
+   * Returns a map with the CacheDirectiveInfo mapped to each location.
+   * @param path CacheDirectiveInfo to be mapped to the locations.
+   * @param locations the locations to map.
+   * @return map with CacheDirectiveInfo mapped to the locations.
+   */
+  private Map<RemoteLocation, CacheDirectiveInfo> getRemoteMap(
+      CacheDirectiveInfo path, final List<RemoteLocation> locations) {
+    final Map<RemoteLocation, CacheDirectiveInfo> dstMap = new HashMap<>();
+    Iterator<RemoteLocation> iterator = locations.iterator();
+    while (iterator.hasNext()) {
+      dstMap.put(iterator.next(), path);
+    }
+    return dstMap;
+  }
+}

+ 13 - 13
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -132,6 +132,8 @@ public class RouterClientProtocol implements ClientProtocol {
   private final String superGroup;
   /** Erasure coding calls. */
   private final ErasureCoding erasureCoding;
+  /** Cache Admin calls. */
+  private final RouterCacheAdmin routerCacheAdmin;
   /** StoragePolicy calls. **/
   private final RouterStoragePolicy storagePolicy;
   /** Router security manager to handle token operations. */
@@ -164,6 +166,7 @@ public class RouterClientProtocol implements ClientProtocol {
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
     this.erasureCoding = new ErasureCoding(rpcServer);
     this.storagePolicy = new RouterStoragePolicy(rpcServer);
+    this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
     this.securityManager = rpcServer.getRouterSecurityManager();
   }
 
@@ -1259,48 +1262,45 @@ public class RouterClientProtocol implements ClientProtocol {
   @Override
   public long addCacheDirective(CacheDirectiveInfo path,
       EnumSet<CacheFlag> flags) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
-    return 0;
+    return routerCacheAdmin.addCacheDirective(path, flags);
   }
 
   @Override
   public void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    routerCacheAdmin.modifyCacheDirective(directive, flags);
   }
 
   @Override
   public void removeCacheDirective(long id) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    routerCacheAdmin.removeCacheDirective(id);
   }
 
   @Override
-  public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
-      long prevId, CacheDirectiveInfo filter) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
-    return null;
+  public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
+      CacheDirectiveInfo filter) throws IOException {
+    return routerCacheAdmin.listCacheDirectives(prevId, filter);
   }
 
   @Override
   public void addCachePool(CachePoolInfo info) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    routerCacheAdmin.addCachePool(info);
   }
 
   @Override
   public void modifyCachePool(CachePoolInfo info) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    routerCacheAdmin.modifyCachePool(info);
   }
 
   @Override
   public void removeCachePool(String cachePoolName) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    routerCacheAdmin.removeCachePool(cachePoolName);
   }
 
   @Override
   public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
       throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
-    return null;
+    return routerCacheAdmin.listCachePools(prevKey);
   }
 
   @Override

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -61,13 +61,18 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -1439,6 +1444,63 @@ public class TestRouterRpc {
     cluster.waitNamenodeRegistration();
   }
 
+  @Test
+  public void testCacheAdmin() throws Exception {
+    DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
+    // Verify cache directive commands.
+    CachePoolInfo cpInfo = new CachePoolInfo("Check");
+    cpInfo.setOwnerName("Owner");
+
+    // Add a cache pool.
+    routerProtocol.addCachePool(cpInfo);
+    RemoteIterator<CachePoolEntry> iter = routerDFS.listCachePools();
+    assertTrue(iter.hasNext());
+
+    // Modify a cache pool.
+    CachePoolInfo info = iter.next().getInfo();
+    assertEquals("Owner", info.getOwnerName());
+    cpInfo.setOwnerName("new Owner");
+    routerProtocol.modifyCachePool(cpInfo);
+    iter = routerDFS.listCachePools();
+    assertTrue(iter.hasNext());
+    info = iter.next().getInfo();
+    assertEquals("new Owner", info.getOwnerName());
+
+    // Remove a cache pool.
+    routerProtocol.removeCachePool("Check");
+    iter = routerDFS.listCachePools();
+    assertFalse(iter.hasNext());
+
+    // Verify cache directive commands.
+    cpInfo.setOwnerName("Owner");
+    routerProtocol.addCachePool(cpInfo);
+    routerDFS.mkdirs(new Path("/ns1/dir"));
+
+    // Add a cache directive.
+    CacheDirectiveInfo cacheDir = new CacheDirectiveInfo.Builder()
+        .setPath(new Path("/ns1/dir"))
+        .setReplication((short) 1)
+        .setPool("Check")
+        .build();
+    long id = routerDFS.addCacheDirective(cacheDir);
+    CacheDirectiveInfo filter =
+        new CacheDirectiveInfo.Builder().setPath(new Path("/ns1/dir")).build();
+    assertTrue(routerDFS.listCacheDirectives(filter).hasNext());
+
+    // List cache directive.
+    assertEquals("Check",
+        routerDFS.listCacheDirectives(filter).next().getInfo().getPool());
+    cacheDir = new CacheDirectiveInfo.Builder().setReplication((short) 2)
+        .setId(id).setPath(new Path("/ns1/dir")).build();
+
+    // Modify cache directive.
+    routerDFS.modifyCacheDirective(cacheDir);
+    assertEquals((short) 2, (short) routerDFS.listCacheDirectives(filter).next()
+        .getInfo().getReplication());
+    routerDFS.removeCacheDirective(id);
+    assertFalse(routerDFS.listCacheDirectives(filter).hasNext());
+  }
+
   /**
    * Check the erasure coding policies in the Router and the Namenode.
    * @return The erasure coding policies.