Selaa lähdekoodia

HDFS-5330. fix readdir and readdirplus for large directories. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1532539 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li 11 vuotta sitten
vanhempi
commit
a9befa6f0a

+ 9 - 1
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Verifier;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * READDIR3 Response
  */
@@ -49,7 +51,8 @@ public class READDIR3Response extends NFS3Response {
       return fileId;
     }
 
-    String getName() {
+    @VisibleForTesting
+    public String getName() {
       return name;
     }
 
@@ -66,6 +69,11 @@ public class READDIR3Response extends NFS3Response {
       this.entries = Collections.unmodifiableList(Arrays.asList(entries));
       this.eof = eof;
     }
+    
+    @VisibleForTesting
+    public List<Entry3> getEntries() {
+      return this.entries;
+    }
   }
 
   public READDIR3Response(int status) {

+ 14 - 1
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Verifier;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * READDIRPLUS3 Response
  */
@@ -51,6 +53,11 @@ public class READDIRPLUS3Response  extends NFS3Response {
       this.objFileHandle = objFileHandle;
     }
 
+    @VisibleForTesting
+    public String getName() {
+      return name;
+    }
+    
     void seralize(XDR xdr) {
       xdr.writeLongAsHyper(fileId);
       xdr.writeString(name);
@@ -71,7 +78,8 @@ public class READDIRPLUS3Response  extends NFS3Response {
       this.eof = eof;
     }
 
-    List<EntryPlus3> getEntries() {
+    @VisibleForTesting
+    public List<EntryPlus3> getEntries() {
       return entries;
     }
     
@@ -80,6 +88,11 @@ public class READDIRPLUS3Response  extends NFS3Response {
     }
   }
 
+  @VisibleForTesting
+  public DirListPlus3 getDirListPlus() {
+    return dirListPlus;
+  }
+  
   public READDIRPLUS3Response(int status) {
     this(status, null, 0, null);
   }

+ 37 - 8
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java

@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FileUtil;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.nfs.AccessPrivilege;
 import org.apache.hadoop.nfs.NfsExports;
 import org.apache.hadoop.nfs.NfsFileType;
@@ -1258,6 +1260,29 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
+  /**
+   * Used by readdir and readdirplus to get dirents. It retries the listing if
+   * the startAfter can't be found anymore.
+   */
+  private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
+      byte[] startAfter) throws IOException {
+    DirectoryListing dlisting = null;
+    try {
+      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
+    } catch (RemoteException e) {
+      IOException io = e.unwrapRemoteException();
+      if (!(io instanceof DirectoryListingStartAfterNotFoundException)) {
+        throw io;
+      }
+      // This happens when startAfter was just deleted
+      LOG.info("Cookie cound't be found: " + new String(startAfter)
+          + ", do listing from beginning");
+      dlisting = dfsClient
+          .listPaths(dirFileIdPath, HdfsFileStatus.EMPTY_NAME);
+    }
+    return dlisting;
+  }
+  
   @Override
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
       InetAddress client) {
@@ -1298,7 +1323,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
           + cookie + " count: " + count);
     }
 
-    HdfsFileStatus dirStatus;
+    HdfsFileStatus dirStatus = null;
     DirectoryListing dlisting = null;
     Nfs3FileAttributes postOpAttr = null;
     long dotdotFileId = 0;
@@ -1342,8 +1367,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+      
+      dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpAttr == null) {
         LOG.error("Can't get path for fileId:" + handle.getFileId());
@@ -1426,11 +1451,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
     long dirCount = request.getDirCount();
     if (dirCount <= 0) {
-      LOG.info("Nonpositive count in invalid READDIRPLUS request:" + dirCount);
-      return new READDIRPLUS3Response(Nfs3Status.NFS3_OK);
+      LOG.info("Nonpositive dircount in invalid READDIRPLUS request:" + dirCount);
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
     }
     int maxCount = request.getMaxCount();
-
+    if (maxCount <= 0) {
+      LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
+    }
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
           + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1480,8 +1509,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+      
+      dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpDirAttr == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java

@@ -195,6 +195,7 @@ public class WriteManager {
       COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
           channel, xid, preOpAttr);
       switch (ret) {
+      case COMMIT_DO_SYNC:
       case COMMIT_FINISHED:
       case COMMIT_INACTIVE_CTX:
         status = Nfs3Status.NFS3_OK;
@@ -207,7 +208,8 @@ public class WriteManager {
         // Do nothing. Commit is async now.
         return;
       default:
-        throw new RuntimeException("Wring error code:" + ret.name());
+        throw new RuntimeException("Should not get commit return code:"
+            + ret.name());
       }
     }
     

+ 195 - 0
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java

@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.nfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
+import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.response.READDIR3Response;
+import org.apache.hadoop.nfs.nfs3.response.READDIR3Response.Entry3;
+import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
+import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.SecurityHandler;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test READDIR and READDIRPLUS request with zero, nonzero cookies
+ */
+public class TestReaddir {
+
+  static Configuration config = new Configuration();
+  static MiniDFSCluster cluster = null;
+  static DistributedFileSystem hdfs;
+  static NameNode nn;
+  static RpcProgramNfs3 nfsd;
+  static String testdir = "/tmp";
+  static SecurityHandler securityHandler;
+  
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+    cluster.waitActive();
+    hdfs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+
+    // Start nfs
+    List<String> exports = new ArrayList<String>();
+    exports.add("/");
+    Nfs3 nfs3 = new Nfs3(exports, config);
+    nfs3.start(false);
+
+    nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
+
+    securityHandler = Mockito.mock(SecurityHandler.class);
+    Mockito.when(securityHandler.getUser()).thenReturn(
+        System.getProperty("user.name"));
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void createFiles() throws IllegalArgumentException, IOException {
+    hdfs.delete(new Path(testdir), true);
+    hdfs.mkdirs(new Path(testdir));
+    DFSTestUtil.createFile(hdfs, new Path(testdir + "/f1"), 0, (short) 1, 0);
+    DFSTestUtil.createFile(hdfs, new Path(testdir + "/f2"), 0, (short) 1, 0);
+    DFSTestUtil.createFile(hdfs, new Path(testdir + "/f3"), 0, (short) 1, 0);
+  }
+  
+  @Test
+  public void testReaddirBasic() throws IOException {
+    // Get inodeId of /tmp
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+
+    // Create related part of the XDR request
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0); // cookie
+    xdr_req.writeLongAsHyper(0); // verifier
+    xdr_req.writeInt(100); // count
+
+    READDIR3Response response = nfsd.readdir(xdr_req.asReadOnlyWrap(),
+        securityHandler, InetAddress.getLocalHost());
+    List<Entry3> dirents = response.getDirList().getEntries();
+    assertTrue(dirents.size() == 5); // inculding dot, dotdot
+
+    // Test start listing from f2
+    status = nn.getRpcServer().getFileInfo(testdir + "/f2");
+    long f2Id = status.getFileId();
+
+    // Create related part of the XDR request
+    xdr_req = new XDR();
+    handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(f2Id); // cookie
+    xdr_req.writeLongAsHyper(0); // verifier
+    xdr_req.writeInt(100); // count
+
+    response = nfsd.readdir(xdr_req.asReadOnlyWrap(), securityHandler,
+        InetAddress.getLocalHost());
+    dirents = response.getDirList().getEntries();
+    assertTrue(dirents.size() == 1);
+    Entry3 entry = dirents.get(0);
+    assertTrue(entry.getName().equals("f3"));
+
+    // When the cookie is deleted, list starts over no including dot, dotdot
+    hdfs.delete(new Path(testdir + "/f2"), false);
+
+    response = nfsd.readdir(xdr_req.asReadOnlyWrap(), securityHandler,
+        InetAddress.getLocalHost());
+    dirents = response.getDirList().getEntries();
+    assertTrue(dirents.size() == 2); // No dot, dotdot
+  }
+  
+  @Test
+  // Test readdirplus
+  public void testReaddirPlus() throws IOException {
+    // Get inodeId of /tmp
+    HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
+    long dirId = status.getFileId();
+    
+    // Create related part of the XDR request
+    XDR xdr_req = new XDR();
+    FileHandle handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(0); // cookie
+    xdr_req.writeLongAsHyper(0); // verifier
+    xdr_req.writeInt(100); // dirCount
+    xdr_req.writeInt(1000); // maxCount
+
+    READDIRPLUS3Response responsePlus = nfsd.readdirplus(
+        xdr_req.asReadOnlyWrap(), securityHandler, InetAddress.getLocalHost());
+    List<EntryPlus3> direntPlus = responsePlus.getDirListPlus().getEntries();
+    assertTrue(direntPlus.size() == 5); // including dot, dotdot
+
+    // Test start listing from f2
+    status = nn.getRpcServer().getFileInfo(testdir + "/f2");
+    long f2Id = status.getFileId();
+
+    // Create related part of the XDR request
+    xdr_req = new XDR();
+    handle = new FileHandle(dirId);
+    handle.serialize(xdr_req);
+    xdr_req.writeLongAsHyper(f2Id); // cookie
+    xdr_req.writeLongAsHyper(0); // verifier
+    xdr_req.writeInt(100); // dirCount
+    xdr_req.writeInt(1000); // maxCount
+
+    responsePlus = nfsd.readdirplus(xdr_req.asReadOnlyWrap(), securityHandler,
+        InetAddress.getLocalHost());
+    direntPlus = responsePlus.getDirListPlus().getEntries();
+    assertTrue(direntPlus.size() == 1);
+    EntryPlus3 entryPlus = direntPlus.get(0);
+    assertTrue(entryPlus.getName().equals("f3"));
+
+    // When the cookie is deleted, list starts over no including dot, dotdot
+    hdfs.delete(new Path(testdir + "/f2"), false);
+
+    responsePlus = nfsd.readdirplus(xdr_req.asReadOnlyWrap(), securityHandler,
+        InetAddress.getLocalHost());
+    direntPlus = responsePlus.getDirListPlus().getEntries();
+    assertTrue(direntPlus.size() == 2); // No dot, dotdot
+  }
+}

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentNavigableMap;
 
 import junit.framework.Assert;
@@ -33,7 +32,6 @@ import org.apache.hadoop.nfs.nfs3.IdUserGroup;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
-import org.jboss.netty.channel.Channel;
 import org.junit.Test;
 import org.mockito.Mockito;
 

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -384,6 +384,8 @@ Release 2.2.1 - UNRELEASED
     HDFS-5329. Update FSNamesystem#getListing() to handle inode path in startAfter
     token. (brandonli)
 
+    HDFS-5330. fix readdir and readdirplus for large directories (brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES