Explorar o código

HADOOP-999. A HDFS Client immediately informs the NameNode of a new
file creation. ClientProtocol version changed from 14 to 15.
(Tsz Wo (Nicholas), SZE via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@561603 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur %!s(int64=18) %!d(string=hai) anos
pai
achega
62c90338e1

+ 4 - 0
CHANGES.txt

@@ -5,6 +5,10 @@ Trunk (unreleased changes)
 
   INCOMPATIBLE CHANGES
 
+    HADOOP-999.  A HDFS Client immediately informs the NameNode of a new
+    file creation.  ClientProtocol version changed from 14 to 15.
+    (Tsz Wo (Nicholas), SZE via dhruba)
+
   NEW FEATURES
 
     HADOOP-1636.  Allow configuration of the number of jobs kept in

+ 3 - 3
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -31,9 +31,9 @@ interface ClientProtocol extends VersionedProtocol {
 
   /**
    * Compared to the previous version the following changes have been introduced:
-   * 14: distributedUpgradeProgress() added.
+   * 15: create(...) should only create a file but not return block.
    */
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   ///////////////////////////////////////
   // File contents
@@ -90,7 +90,7 @@ interface ClientProtocol extends VersionedProtocol {
    * create multi-block files must also use reportWrittenBlock()
    * and addBlock().
    */
-  public LocatedBlock create(String src, 
+  public void create(String src, 
                              String clientName, 
                              boolean overwrite, 
                              short replication,

+ 4 - 22
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -1396,9 +1396,7 @@ class DFSClient implements FSConstants {
     boolean closed = false;
 
     private UTF8 src;
-    private boolean overwrite;
     private short replication;
-    private boolean firstTime = true;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private File backupFile;
@@ -1421,7 +1419,6 @@ class DFSClient implements FSConstants {
                            ) throws IOException {
       super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
       this.src = src;
-      this.overwrite = overwrite;
       this.replication = replication;
       this.blockSize = blockSize;
       this.buffersize = buffersize;
@@ -1441,6 +1438,8 @@ class DFSClient implements FSConstants {
       
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
+      namenode.create(
+          src.toString(), clientName, overwrite, replication, blockSize);
     }
 
     private void openBackupStream() throws IOException {
@@ -1494,13 +1493,7 @@ class DFSClient implements FSConstants {
       do {
         retry = false;
                 
-        LocatedBlock lb;
-        if (firstTime) {
-          lb = locateNewBlock();
-        } else {
-          lb = locateFollowingBlock(startTime);
-        }
-
+        LocatedBlock lb = locateFollowingBlock(startTime);
         block = lb.getBlock();
         if (block.getNumBytes() < bytesWrittenToBlock) {
           block.setNumBytes(bytesWrittenToBlock);
@@ -1524,12 +1517,7 @@ class DFSClient implements FSConstants {
             Thread.sleep(6000);
           } catch (InterruptedException iex) {
           }
-          if (firstTime) {
-            namenode.abandonFileInProgress(src.toString(), 
-                                           clientName);
-          } else {
-            namenode.abandonBlock(block, src.toString());
-          }
+          namenode.abandonBlock(block, src.toString());
           retry = true;
           continue;
         }
@@ -1549,14 +1537,8 @@ class DFSClient implements FSConstants {
         blockStream = out;
         blockReplyStream = new DataInputStream(s.getInputStream());
       } while (retry);
-      firstTime = false;
     }
 
-    private LocatedBlock locateNewBlock() throws IOException {     
-      return namenode.create(src.toString(), clientName,
-          overwrite, replication, blockSize);
-    }
-        
     private LocatedBlock locateFollowingBlock(long start
                                               ) throws IOException {     
       int retries = 5;

+ 1 - 51
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -709,54 +709,7 @@ class FSNamesystem implements FSConstants {
    * @throws IOException if the filename is invalid
    *         {@link FSDirectory#isValidToCreate(String)}.
    */
-  public LocatedBlock startFile(String src, 
-                                String holder, 
-                                String clientMachine, 
-                                boolean overwrite,
-                                short replication,
-                                long blockSize
-                                ) throws IOException {
-
-    //
-    // Create file into pendingCreates and get the first blockId
-    //
-    Block newBlock = startFileInternal(src, holder, clientMachine,
-                                       overwrite, replication,
-                                       blockSize);
-
-    //
-    // Get the array of replication targets
-    //
-    try {
-      DatanodeDescriptor clientNode = 
-        host2DataNodeMap.getDatanodeByHost(clientMachine);
-      DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
-                                                             clientNode, null, blockSize);
-      if (targets.length < this.minReplication) {
-        if (clusterMap.getNumOfLeaves() == 0) {
-          throw new IOException("Failed to create file " + src
-                                + " on client " + clientMachine
-                                + " because this cluster has no datanodes.");
-        }
-        throw new IOException("Failed to create file " + src
-                              + " on client " + clientMachine
-                              + " because there were not enough datanodes available. "
-                              + "Found " + targets.length
-                              + " datanodes but MIN_REPLICATION for the cluster is "
-                              + "configured to be "
-                              + this.minReplication
-                              + ".");
-      }
-      return new LocatedBlock(newBlock, targets, 0L);
-
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
-                                   + ie.getMessage());
-      throw ie;
-    }
-  }
-
-  public synchronized Block startFileInternal(String src, 
+  synchronized void startFile(String src, 
                                               String holder, 
                                               String clientMachine, 
                                               boolean overwrite,
@@ -861,9 +814,6 @@ class FSNamesystem implements FSConstants {
         }
         lease.startedCreate(src);
       }
-      
-      // Create first block
-      return allocateBlock(src);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
                                    +ie.getMessage());

+ 3 - 8
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -291,7 +291,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
 
   /**
    */
-  public LocatedBlock create(String src, 
+  public void create(String src, 
                              String clientName, 
                              boolean overwrite,
                              short replication,
@@ -304,14 +304,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       throw new IOException("create: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-    LocatedBlock result =  namesystem.startFile(src, 
-                                                clientName, 
-                                                clientMachine, 
-                                                overwrite,
-                                                replication,
-                                                blockSize);
+    namesystem.startFile(
+        src, clientName, clientMachine, overwrite, replication, blockSize);
     myMetrics.createFile();
-    return result;
   }
 
   public boolean setReplication(String src, 

+ 22 - 2
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -249,7 +249,15 @@ public abstract class FileSystem extends Configured {
    * Files are overwritten by default.
    */
   public FSDataOutputStream create(Path f) throws IOException {
-    return create(f, true, 
+    return create(f, true);
+  }
+
+  /**
+   * Opens an FSDataOutputStream at the indicated Path.
+   */
+  public FSDataOutputStream create(Path f, boolean overwrite)
+    throws IOException {
+    return create(f, overwrite, 
                   getConf().getInt("io.file.buffer.size", 4096),
                   getDefaultReplication(),
                   getDefaultBlockSize());
@@ -773,7 +781,19 @@ public abstract class FileSystem extends Configured {
    */
   public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
     throws IOException {
-    FileUtil.copy(getLocal(getConf()), src, this, dst, delSrc, getConf());
+    copyFromLocalFile(delSrc, true, src, dst);
+  }
+
+  /**
+   * The src file is on the local disk.  Add it to FS at
+   * the given dst name.
+   * delSrc indicates if the source should be removed
+   */
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
+                                Path src, Path dst)
+    throws IOException {
+    Configuration conf = getConf();
+    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
   }
     
   /**

+ 11 - 2
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -114,6 +114,15 @@ public class FileUtil {
                              FileSystem dstFS, Path dst, 
                              boolean deleteSource,
                              Configuration conf) throws IOException {
+    return copy(srcFS, src, dstFS, dst, deleteSource, true, conf);
+  }
+  
+  /** Copy files between FileSystems. */
+  public static boolean copy(FileSystem srcFS, Path src, 
+                             FileSystem dstFS, Path dst, 
+                             boolean deleteSource,
+                             boolean overwrite,
+                             Configuration conf) throws IOException {
     dst = checkDest(src.getName(), dstFS, dst);
 
     if (srcFS.isDirectory(src)) {
@@ -124,12 +133,12 @@ public class FileUtil {
       Path contents[] = srcFS.listPaths(src);
       for (int i = 0; i < contents.length; i++) {
         copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()),
-             deleteSource, conf);
+             deleteSource, overwrite, conf);
       }
     } else if (srcFS.isFile(src)) {
       InputStream in = srcFS.open(src);
       try {
-        OutputStream out = dstFS.create(dst);
+        OutputStream out = dstFS.create(dst, overwrite);
         copyContent(in, out, conf);
       } finally {
         in.close();

+ 1 - 1
src/java/org/apache/hadoop/fs/FsShell.java

@@ -109,7 +109,7 @@ public class FsShell extends ToolBase {
     if (src.toString().equals("-")) {
       copyFromStdin(new Path(dstf));
     } else {
-      fs.copyFromLocalFile(src, new Path(dstf));
+      fs.copyFromLocalFile(false, false, src, new Path(dstf));
     }
   }
 

+ 147 - 36
src/test/org/apache/hadoop/dfs/TestDFSShell.java

@@ -19,13 +19,13 @@ package org.apache.hadoop.dfs;
 
 import junit.framework.TestCase;
 import java.io.*;
+import java.security.*;
+import java.util.*;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.util.StringUtils;
 
-
 /**
  * This class tests commands from DFSShell.
  */
@@ -33,11 +33,141 @@ public class TestDFSShell extends TestCase {
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
     .toString().replace(' ', '+');
+
+  static private Path writeFile(FileSystem fs, Path f) throws IOException {
+    DataOutputStream out = fs.create(f);
+    out.writeBytes("dhruba: " + f);
+    out.close();
+    assertTrue(fs.exists(f));
+    return f;
+  }
+
+  static private Path mkdir(FileSystem fs, Path p) throws IOException {
+    assertTrue(fs.mkdirs(p));
+    assertTrue(fs.exists(p));
+    assertTrue(fs.getFileStatus(p).isDir());
+    return p;
+  }
+
+  static private File createLocalFile(File f) throws IOException {
+    assertTrue(!f.exists());
+    PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(f)));
+    out.println(f.getAbsolutePath());
+    out.close();
+    assertTrue(f.exists());
+    assertTrue(f.isFile());
+    return f;
+  }
+
+  static void show(String s) {
+    System.out.println(Thread.currentThread().getStackTrace()[2] + " " + s);
+  }
+
+  public void testZeroSizeFile() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+               fs instanceof DistributedFileSystem);
+    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+
+    try {
+      //create a zero size file
+      final File f1 = new File(TEST_ROOT_DIR, "f1");
+      assertTrue(!f1.exists());
+      assertTrue(f1.createNewFile());
+      assertTrue(f1.exists());
+      assertTrue(f1.isFile());
+      assertEquals(0L, f1.length());
+      
+      //copy to remote
+      final Path root = mkdir(dfs, new Path("/test/zeroSizeFile"));
+      final Path remotef = new Path(root, "dst");
+      show("copy local " + f1 + " to remote " + remotef);
+      dfs.copyFromLocalFile(false, false, new Path(f1.getPath()), remotef);
+      
+      //getBlockSize() should not throw exception
+      show("Block size = " + dfs.getFileStatus(remotef).getBlockSize());
+
+      //copy back
+      final File f2 = new File(TEST_ROOT_DIR, "f2");
+      assertTrue(!f2.exists());
+      dfs.copyToLocalFile(remotef, new Path(f2.getPath()));
+      assertTrue(f2.exists());
+      assertTrue(f2.isFile());
+      assertEquals(0L, f2.length());
   
-  private void writeFile(FileSystem fileSys, Path name) throws IOException {
-    DataOutputStream stm = fileSys.create(name);
-    stm.writeBytes("dhruba: " + name);
-    stm.close();
+      f1.delete();
+      f2.delete();
+    } finally {
+      try {dfs.close();} catch (Exception e) {}
+      cluster.shutdown();
+    }
+  }
+
+  public void testPut() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+               fs instanceof DistributedFileSystem);
+    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+
+    try {
+      final File f1 = createLocalFile(new File(TEST_ROOT_DIR, "f1"));
+      final File f2 = createLocalFile(new File(TEST_ROOT_DIR, "f2"));
+  
+      final Path root = mkdir(dfs, new Path("/test/put"));
+      final Path dst = new Path(root, "dst");
+  
+      show("begin");
+      
+      final Thread copy2ndFileThread = new Thread() {
+        public void run() {
+          try {
+            show("copy local " + f2 + " to remote " + dst);
+            dfs.copyFromLocalFile(false, false, new Path(f2.getPath()), dst);
+          } catch (IOException ioe) {
+            show("good " + StringUtils.stringifyException(ioe));
+            return;
+          }
+          //should not be here, must got IOException
+          assertTrue(false);
+        }
+      };
+      
+      //use SecurityManager to pause the copying of f1 and begin copying f2
+      System.setSecurityManager(new SecurityManager() {
+        private boolean firstTime = true;
+  
+        public void checkPermission(Permission perm) {
+          if (firstTime) {
+            Thread t = Thread.currentThread();
+            if (!t.toString().contains("DataNode")) {
+              String s = "" + Arrays.asList(t.getStackTrace());
+              if (s.contains("FileUtil.copyContent")) {
+                //pause at FileUtil.copyContent
+  
+                firstTime = false;
+                copy2ndFileThread.start();
+                try {Thread.sleep(5000);} catch (InterruptedException e) {}
+              }
+            }
+          }
+        }
+      });
+      show("copy local " + f1 + " to remote " + dst);
+      dfs.copyFromLocalFile(false, false, new Path(f1.getPath()), dst);
+      show("done");
+  
+      try {copy2ndFileThread.join();} catch (InterruptedException e) { }
+      System.setSecurityManager(null);
+      f1.delete();
+      f2.delete();
+    } finally {
+      try {dfs.close();} catch (Exception e) {}
+      cluster.shutdown();
+    }
   }
 
   public void testCopyToLocal() throws IOException {
@@ -63,33 +193,14 @@ public class TestDFSShell extends TestCase {
         //   + sub
         //      |- f3
         //      |- f4
-        Path root = new Path("/test/copyToLocal");
-        assertTrue(dfs.mkdirs(root));
-        assertTrue(dfs.exists(root));
-        assertTrue(dfs.isDirectory(root));
-
-        Path sub = new Path(root, "sub");
-        assertTrue(dfs.mkdirs(sub));
-        assertTrue(dfs.exists(sub));
-        assertTrue(dfs.isDirectory(sub));
-
-        Path f1 = new Path(root, "f1");
-        writeFile(dfs, f1);
-        assertTrue(dfs.exists(f1));
-
-        Path f2 = new Path(root, "f2");
-        writeFile(dfs, f2);
-        assertTrue(dfs.exists(f2));
-
-        Path f3 = new Path(sub, "f3");
-        writeFile(dfs, f3);
-        assertTrue(dfs.exists(f3));
-
-        Path f4 = new Path(sub, "f4");
-        writeFile(dfs, f4);
-        assertTrue(dfs.exists(f4));
-      }
+        Path root = mkdir(dfs, new Path("/test/copyToLocal"));
+        Path sub = mkdir(dfs, new Path(root, "sub"));
 
+        writeFile(fs, new Path(root, "f1"));
+        writeFile(fs, new Path(root, "f2"));
+        writeFile(fs, new Path(sub, "f3"));
+        writeFile(fs, new Path(sub, "f4"));
+      }
 
       // Verify copying the tree
       {
@@ -111,10 +222,10 @@ public class TestDFSShell extends TestCase {
         assertTrue("Copying failed.", sub.isDirectory());
 
         File f3 = new File(sub, "f3");
-        assertTrue("Copying failed.", f3.exists());
+        assertTrue("Copying failed.", f3.isFile());
 
         File f4 = new File(sub, "f4");
-        assertTrue("Copying failed.", f4.exists());
+        assertTrue("Copying failed.", f4.isFile());
 
         f1.delete();
         f2.delete();