Selaa lähdekoodia

HADOOP-3176. Change lease record when a open-for-write-file
gets renamed. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@648513 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 vuotta sitten
vanhempi
commit
9455f06285

+ 3 - 0
CHANGES.txt

@@ -31,6 +31,9 @@ Trunk (unreleased changes)
     heartbeats to use well-defined BlockCommand object(s) instead of 
     using the base java Object. (Tsz Wo (Nicholas), SZE via dhruba)
 
+    HADOOP-3176.  Change lease record when a open-for-write-file 
+    gets renamed. (dhruba)
+
 Release 0.17.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 4 - 0
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -2675,6 +2675,10 @@ class DFSClient implements FSConstants {
       chunksPerPacket = Math.min(chunksPerPacket, value);
       packetSize = chunkSize * chunksPerPacket;
     }
+
+    synchronized void setTestFilename(String newname) {
+      src = newname;
+    }
   }
 
   void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {

+ 4 - 1
src/java/org/apache/hadoop/dfs/FSEditLog.java

@@ -584,7 +584,10 @@ class FSEditLog {
               dst = (UTF8) writables[1];
               timestamp = Long.parseLong(((UTF8)writables[2]).toString());
             }
-            fsDir.unprotectedRenameTo(src.toString(), dst.toString(), timestamp);
+            String s = src.toString();
+            String d = dst.toString();
+            fsDir.unprotectedRenameTo(s, d, timestamp);
+            fsNamesys.changeLease(s, d);
             break;
           }
           case OP_DELETE: {

+ 127 - 4
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -170,6 +170,13 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
   // Set of: Lease
   private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
 
+  // 
+  // Map path names to leases. It is protected by the sortedLeases lock.
+  // The map stores pathnames in lexicographical order.
+  //
+  private TreeMap<String, Lease> sortedLeasesByPath = 
+                        new TreeMap<String, Lease>();
+
   //
   // Threaded object that checks to see if we have been
   // getting heartbeats from all clients. 
@@ -1464,7 +1471,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       checkAncestorAccess(dst, FsAction.WRITE);
     }
 
-    return dir.renameTo(src, dst);
+    if (dir.renameTo(src, dst)) {
+      changeLease(src, dst);     // update lease with new filename
+      return true;
+    }
+    return false;
   }
 
   /**
@@ -1753,6 +1764,34 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     Collection<StringBytesWritable> getPaths() throws IOException {
       return creates;
     }
+
+    // If a file with the specified prefix exists, then replace 
+    // it with the new prefix.
+    //
+    void replacePrefix(String src, String overwrite, 
+                       String replaceBy) throws IOException {
+      List<StringBytesWritable> toAdd = new ArrayList<StringBytesWritable>();
+      for (Iterator<StringBytesWritable> f = creates.iterator(); 
+           f.hasNext();){
+        String path = f.next().getString();
+        if (!path.startsWith(src)) {
+          continue;
+        }
+        // remove this filename from this lease.
+        f.remove();
+
+        // remember new filename
+        String newPath = path.replaceFirst(overwrite, replaceBy);
+        toAdd.add(new StringBytesWritable(newPath));
+        LOG.info("Modified Lease for file " + path +
+                 " to new path " + newPath);
+      }
+      // add modified filenames back into lease.
+      for (Iterator<StringBytesWritable> f = toAdd.iterator(); 
+           f.hasNext();) {
+        creates.add(f.next());
+      }
+    }
   }
   
   /******************************************************
@@ -4290,6 +4329,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         if (!lease.hasLocks()) {
           removeLease(holder);
           sortedLeases.remove(lease);
+          sortedLeasesByPath.remove(src);
         }
       }
     }
@@ -4310,10 +4350,69 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         lease.renew();
         sortedLeases.add(lease);
       }
+      sortedLeasesByPath.put(src, lease);
       lease.startedCreate(src);
     }
   }
 
+  // rename was successful. If any part of the renamed subtree had
+  // files that were being written to, update with new filename.
+  //
+  void changeLease(String src, String dst) throws IOException {
+    Map<String, Lease> addTo = new TreeMap<String, Lease>();
+    String overwrite;
+    String replaceBy;
+
+    DFSFileInfo dinfo = getFileInfo(dst);
+    if (dinfo.isDir()) {
+      Path spath = new Path(src);
+      overwrite = spath.getParent().toString() + Path.SEPARATOR;
+      replaceBy = dst + Path.SEPARATOR;
+    } else {
+      overwrite = src;
+      replaceBy = dst;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("changelease " +
+               " src " + src + " dest " + dst + 
+               " overwrite " + overwrite +
+               " replaceBy " + replaceBy);
+    }
+
+    synchronized (sortedLeases) {
+      SortedMap<String, Lease> myset = sortedLeasesByPath.tailMap(src);
+      for (Iterator<Map.Entry<String, Lease>> iter = myset.entrySet().iterator(); 
+           iter.hasNext();) {
+        Map.Entry<String, Lease> value = iter.next();
+        String path = (String)value.getKey();
+        if (!path.startsWith(src)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("changelease comparing " + path +
+                     " with " + src + " and terminating.");
+          }
+          break;
+        }
+        Lease lease = (Lease)value.getValue();
+
+        // Fix up all the pathnames in this lease.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("changelease comparing " + path +
+                    " with " + src + " and replacing ");
+        }
+        lease.replacePrefix(src, overwrite, replaceBy);
+
+        // Remove this lease from sortedLeasesByPath because the 
+        // pathname has changed.
+        String newPath = path.replaceFirst(overwrite, replaceBy);
+        addTo.put(newPath, lease);
+        iter.remove();
+      }
+      // re-add entries back in sortedLeasesByPath
+      sortedLeasesByPath.putAll(addTo);
+    }
+  }
+           
   /**
    * Returns the number of leases currently in the system
    */
@@ -4324,10 +4423,34 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
   }
 
   /**
-   * Serializes leases. This current code does not save leases but will do
-   * so in the future.
+   * Serializes leases. 
    */
   void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
-    out.writeInt(0);      // the number of leases
+    synchronized (sortedLeases) {
+      int count = 0;
+      for (Lease lease : sortedLeases) {
+        count += lease.getPaths().size();
+      }
+      out.writeInt(count); // write the size
+      for (Lease lease : sortedLeases) {
+        Collection<StringBytesWritable> files = lease.getPaths();
+        for (Iterator<StringBytesWritable> i = files.iterator(); i.hasNext();){
+          String path = i.next().getString();
+
+          // verify that path exists in namespace
+          INode node = dir.getFileINode(path);
+          if (node == null) {
+            throw new IOException("saveLeases found path " + path +
+                                  " but no matching entry in namespace.");
+          }
+          if (!node.isUnderConstruction()) {
+            throw new IOException("saveLeases found path " + path +
+                                  " but is not under construction.");
+          }
+          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+          FSImage.writeINodeUnderConstruction(out, cons, path);
+        }
+      }
+    }
   }
 }

+ 46 - 5
src/test/org/apache/hadoop/dfs/TestFileCreation.java

@@ -356,7 +356,7 @@ public class TestFileCreation extends TestCase {
    * This test is currently not triggered because more HDFS work is 
    * is needed to handle persistent leases.
    */
-  public void XXXtestFileCreationNamenodeRestart() throws IOException {
+  public void testFileCreationNamenodeRestart() throws IOException {
     Configuration conf = new Configuration();
     final int MAX_IDLE_TIME = 2000; // 2s
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
@@ -380,20 +380,49 @@ public class TestFileCreation extends TestCase {
       Path file1 = new Path("/filestatus.dat");
       FSDataOutputStream stm = createFile(fs, file1, 1);
       System.out.println("testFileCreationNamenodeRestart: "
-                         + "Created file filestatus.dat with one "
-                         + " replicas.");
+                         + "Created file " + file1);
 
       // write two full blocks.
       writeFile(stm, numBlocks * blockSize);
       flushFile(stm);
 
+      // rename file wile keeping it open.
+      Path fileRenamed = new Path("/filestatusRenamed.dat");
+      fs.rename(file1, fileRenamed);
+      System.out.println("testFileCreationNamenodeRestart: "
+                         + "Renamed file " + file1 + " to " +
+                         fileRenamed);
+      file1 = fileRenamed;
+
       // create another new file.
       //
       Path file2 = new Path("/filestatus2.dat");
       FSDataOutputStream stm2 = createFile(fs, file2, 1);
       System.out.println("testFileCreationNamenodeRestart: "
-                         + "Created file filestatus2.dat with one "
-                         + " replicas.");
+                         + "Created file " + file2);
+
+      // create yet another new file with full path name. 
+      // rename it while open
+      //
+      Path file3 = new Path("/user/home/fullpath.dat");
+      FSDataOutputStream stm3 = createFile(fs, file3, 1);
+      System.out.println("testFileCreationNamenodeRestart: "
+                         + "Created file " + file3);
+      Path file4 = new Path("/user/home/fullpath4.dat");
+      FSDataOutputStream stm4 = createFile(fs, file4, 1);
+      System.out.println("testFileCreationNamenodeRestart: "
+                         + "Created file " + file4);
+
+      fs.mkdirs(new Path("/bin"));
+      fs.rename(new Path("/user/home"), new Path("/bin"));
+      Path file3new = new Path("/bin/home/fullpath.dat");
+      System.out.println("testFileCreationNamenodeRestart: "
+                         + "Renamed file " + file3 + " to " +
+                         file3new);
+      Path file4new = new Path("/bin/home/fullpath4.dat");
+      System.out.println("testFileCreationNamenodeRestart: "
+                         + "Renamed file " + file4 + " to " +
+                         file4new);
 
       // restart cluster with the same namenode port as before.
       // This ensures that leases are persisted in fsimage.
@@ -417,6 +446,16 @@ public class TestFileCreation extends TestCase {
                                    null, null, null);
       cluster.waitActive();
 
+      // instruct the dfsclient to use a new filename when it requests
+      // new blocks for files that were renamed.
+      DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
+                                                 (stm.getWrappedStream());
+      dfstream.setTestFilename(file1.toString());
+      dfstream = (DFSClient.DFSOutputStream) (stm3.getWrappedStream());
+      dfstream.setTestFilename(file3new.toString());
+      dfstream = (DFSClient.DFSOutputStream) (stm4.getWrappedStream());
+      dfstream.setTestFilename(file4new.toString());
+
       // write 1 byte to file.  This should succeed because the 
       // namenode should have persisted leases.
       byte[] buffer = new byte[1];
@@ -426,6 +465,8 @@ public class TestFileCreation extends TestCase {
       stm.close();
       stm2.write(buffer);
       stm2.close();
+      stm3.close();
+      stm4.close();
 
       // verify that new block is associated with this file
       client = new DFSClient(addr, conf);