Browse Source

Fix HADOOP-170. Permit FileSystem clients to examine and modify the replication count of individual files. Also fix a few replication-related bugs. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@397330 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
82b8f6de56

+ 4 - 0
CHANGES.txt

@@ -108,6 +108,10 @@ Trunk (unreleased)
 28. Fix HADOOP-169.  Don't fail a reduce task if a call to the
     jobtracker to locate map outputs fails.  (omalley via cutting)
 
+29. Fix HADOOP-170.  Permit FileSystem clients to examine and modify
+    the replication count of individual files.  Also fix a few
+    replication-related bugs. (Konstantin Shvachko via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

+ 19 - 0
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -58,6 +58,25 @@ interface ClientProtocol {
                                 short replication
                               ) throws IOException;
 
+    /**
+     * Set replication for an existing file.
+     * 
+     * The NameNode sets replication to the new value and returns.
+     * The actual block replication is not expected to be performed during  
+     * this method call. The blocks will be populated or removed in the 
+     * background as the result of the routine block maintenance procedures.
+     * 
+     * @param src file name
+     * @param replication new replication
+     * @throws IOException
+     * @return true if successful;
+     *         false if file does not exist or is a directory
+     * @author shv
+     */
+    public boolean setReplication( String src, 
+                                short replication
+                              ) throws IOException;
+
     /**
      * A client that has written a block of data can report completion
      * back to the NameNode with reportWrittenBlock().  Clients cannot

+ 15 - 1
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -182,7 +182,6 @@ class DFSClient implements FSConstants {
       return create( src, overwrite, (short)conf.getInt("dfs.replication", 3));
     }
     
-    /**
     /**
      * Create a new dfs file with the specified block replication 
      * and return an output stream for writing into the file.  
@@ -205,6 +204,21 @@ class DFSClient implements FSConstants {
       return result;
     }
 
+    /**
+     * Set replication for an existing file.
+     * 
+     * @see ClientProtocol#setReplication(String, short)
+     * @param replication
+     * @throws IOException
+     * @return true is successful or false if file does not exist 
+     * @author shv
+     */
+    public boolean setReplication(UTF8 src, 
+                                  short replication
+                                ) throws IOException {
+      return namenode.setReplication(src.toString(), replication);
+    }
+
     /**
      * Make a direct connection to namenode and manipulate structures
      * there.

+ 5 - 1
src/java/org/apache/hadoop/dfs/DFSShell.java

@@ -91,7 +91,11 @@ public class DFSShell {
             }
             for (int i = 0; i < items.length; i++) {
                 Path cur = items[i];
-                System.out.println(cur + "\t" + (fs.isDirectory(cur) ? "<dir>" : ("" + fs.getLength(cur))));
+                System.out.println(cur + "\t" 
+                                    + (fs.isDirectory(cur) ? 
+                                        "<dir>" : 
+                                        ("<r " + fs.getReplication(cur) 
+                                            + ">\t" + fs.getLength(cur))));
                 if(recursive && fs.isDirectory(cur)) {
                   ls(cur.toString(), recursive);
                 }

+ 4 - 1
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -234,7 +234,10 @@ public class DataNode implements FSConstants, Runnable {
 				break;
 			    } else {
 				if (xferTargets[i].length > 0) {
-				    LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+				    LOG.info("Starting thread to transfer block " + blocks[i] 
+                   + " to " + xferTargets[i][0].getName()
+                   + (xferTargets[i].length > 1 ? " and " 
+                   + (xferTargets[i].length-1) + " more destination(s)" : "" ));
 				    new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
 				}
 			    }

+ 3 - 2
src/java/org/apache/hadoop/dfs/DfsPath.java

@@ -15,8 +15,6 @@
  */
 package org.apache.hadoop.dfs;
 
-import java.io.*;
-
 import org.apache.hadoop.fs.Path;
 
 
@@ -41,4 +39,7 @@ class DfsPath extends Path {
     public long getContentsLength() {
         return info.getContentsLen();
     }
+    public short getReplication() {
+      return info.getReplication();
+    }
 }

+ 15 - 0
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -84,6 +84,12 @@ public class DistributedFileSystem extends FileSystem {
       return dfs.create(getPath(f), overwrite, replication);
     }
 
+    public boolean setReplicationRaw( Path src, 
+                                      short replication
+                                    ) throws IOException {
+      return dfs.setReplication(getPath(src), replication);
+    }
+    
     /**
      * Rename files/dirs
      */
@@ -118,6 +124,15 @@ public class DistributedFileSystem extends FileSystem {
         return info[0].getLen();
     }
 
+    public short getReplication(Path f) throws IOException {
+      if (f instanceof DfsPath) {
+        return ((DfsPath)f).getReplication();
+      }
+
+      DFSFileInfo info[] = dfs.listPaths(getPath(f));
+      return info[0].getReplication();
+  }
+
     public Path[] listPathsRaw(Path f) throws IOException {
         DFSFileInfo info[] = dfs.listPaths(getPath(f));
         if (info == null) {

+ 1 - 0
src/java/org/apache/hadoop/dfs/FSDataset.java

@@ -401,6 +401,7 @@ class FSDataset implements FSConstants {
             if (!f.delete()) {
                 throw new IOException("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
             }
+            DataNode.LOG.info("Deleting block " + invalidBlks[i]);
         }
     }
 

+ 71 - 6
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -35,8 +35,9 @@ import org.apache.hadoop.fs.Path;
  * @author Mike Cafarella
  *************************************************/
 class FSDirectory implements FSConstants {
-    // Versions are negative.
-    // Decrement DFS_CURRENT_VERSION to define new version.
+    // Version is reflected in the dfs image and edit log files.
+    // Versions are negative. 
+    // Decrement DFS_CURRENT_VERSION to define a new version.
     private static final int DFS_CURRENT_VERSION = -1;
     private static final String FS_IMAGE = "fsimage";
     private static final String NEW_FS_IMAGE = "fsimage.new";
@@ -46,6 +47,7 @@ class FSDirectory implements FSConstants {
     private static final byte OP_RENAME = 1;
     private static final byte OP_DELETE = 2;
     private static final byte OP_MKDIR = 3;
+    private static final byte OP_SET_REPLICATION = 4;
 
     /******************************************************
      * We keep an in-memory representation of the file/block
@@ -78,7 +80,6 @@ class FSDirectory implements FSConstants {
 
         /**
          * Check whether it's a directory
-         * @return
          */
         synchronized public boolean isDir() {
           return (blocks == null);
@@ -494,6 +495,16 @@ class FSDirectory implements FSConstants {
                         unprotectedAddFile(name, 
                             new INode( name.toString(), blocks, replication ));
                         break;
+                    }
+                    case OP_SET_REPLICATION: {
+                        UTF8 src = new UTF8();
+                        UTF8 repl = new UTF8();
+                        src.readFields(in);
+                        repl.readFields(in);
+                        unprotectedSetReplication(src.toString(), 
+                                                  fromLogReplication(repl),
+                                                  null);
+                        break;
                     } 
                     case OP_RENAME: {
                         UTF8 src = new UTF8();
@@ -599,13 +610,21 @@ class FSDirectory implements FSConstants {
         // add create file record to log
         UTF8 nameReplicationPair[] = new UTF8[] { 
                               path, 
-                              new UTF8( Short.toString(replication))};
+                              toLogReplication( replication )};
         logEdit(OP_ADD,
                 new ArrayWritable( UTF8.class, nameReplicationPair ), 
                 new ArrayWritable( Block.class, newNode.blocks ));
         return true;
     }
     
+    private static UTF8 toLogReplication( short replication ) {
+      return new UTF8( Short.toString(replication));
+    }
+    
+    private static short fromLogReplication( UTF8 replication ) {
+      return Short.parseShort(replication.toString());
+    }    
+    
     /**
      */
     boolean unprotectedAddFile(UTF8 path, INode newNode) {
@@ -654,13 +673,59 @@ class FSDirectory implements FSConstants {
         }
     }
 
+    /**
+     * Set file replication
+     * 
+     * @param src file name
+     * @param replication new replication
+     * @param oldReplication old replication - output parameter
+     * @return array of file blocks
+     * @throws IOException
+     */
+    public Block[] setReplication(String src, 
+                              short replication,
+                              Vector oldReplication
+                             ) throws IOException {
+      waitForReady();
+      Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication );
+      if( fileBlocks != null )  // 
+        logEdit(OP_SET_REPLICATION, 
+                new UTF8(src), 
+                toLogReplication( replication ));
+      return fileBlocks;
+    }
+
+    private Block[] unprotectedSetReplication( String src, 
+                                          short replication,
+                                          Vector oldReplication
+                                        ) throws IOException {
+      if( oldReplication == null )
+        oldReplication = new Vector();
+      oldReplication.setSize(1);
+      oldReplication.set( 0, new Integer(-1) );
+      Block[] fileBlocks = null;
+      synchronized(rootDir) {
+        INode fileNode = rootDir.getNode(src);
+        if (fileNode == null)
+          return null;
+        if( fileNode.isDir() )
+          return null;
+        oldReplication.set( 0, new Integer( fileNode.blockReplication ));
+        fileNode.blockReplication = replication;
+        fileBlocks = fileNode.blocks;
+      }
+      return fileBlocks;
+    }
+                                 
     /**
      * Remove the file from management, return blocks
      */
     public Block[] delete(UTF8 src) {
         waitForReady();
-        logEdit(OP_DELETE, src, null);
-        return unprotectedDelete(src);
+        Block[] blocks = unprotectedDelete(src); 
+        if( blocks != null )
+          logEdit(OP_DELETE, src, null);
+        return blocks;
     }
 
     /**

+ 100 - 30
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -164,7 +164,7 @@ class FSNamesystem implements FSConstants {
           throw new IOException(
               "Unexpected configuration parameters: dfs.replication.min = " 
               + minReplication
-              + " must be at less than dfs.replication.max = " 
+              + " must be less than dfs.replication.max = " 
               + maxReplication );
 
         this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
@@ -230,6 +230,73 @@ class FSNamesystem implements FSConstants {
         return results;
     }
 
+    /**
+     * Set replication for an existing file.
+     * 
+     * The NameNode sets new replication and schedules either replication of 
+     * under-replicated data blocks or removal of the eccessive block copies 
+     * if the blocks are over-replicated.
+     * 
+     * @see ClientProtocol#setReplication(String, short)
+     * @param src file name
+     * @param replication new replication
+     * @return true if successful; 
+     *         false if file does not exist or is a directory
+     * @author shv
+     */
+    public boolean setReplication(String src, 
+                                  short replication
+                                ) throws IOException {
+      verifyReplication(src, replication, null );
+
+      Vector oldReplication = new Vector();
+      Block[] fileBlocks;
+      fileBlocks = dir.setReplication( src, replication, oldReplication );
+      if( fileBlocks == null )  // file not found or is a directory
+        return false;
+      int oldRepl = ((Integer)oldReplication.elementAt(0)).intValue();
+      if( oldRepl == replication ) // the same replication
+        return true;
+
+      synchronized( neededReplications ) {
+        if( oldRepl < replication ) { 
+          // old replication < the new one; need to replicate
+          LOG.info("Increasing replication for file " + src 
+                    + ". New replication is " + replication );
+          for( int idx = 0; idx < fileBlocks.length; idx++ )
+            neededReplications.add( fileBlocks[idx] );
+        } else {  
+          // old replication > the new one; need to remove copies
+          LOG.info("Reducing replication for file " + src 
+                    + ". New replication is " + replication );
+          for( int idx = 0; idx < fileBlocks.length; idx++ )
+            proccessOverReplicatedBlock( fileBlocks[idx], replication );
+        }
+      }
+      return true;
+    }
+    
+    /**
+     * Check whether the replication parameter is within the range
+     * determined by system configuration.
+     */
+    private void verifyReplication( String src, 
+                                    short replication, 
+                                    UTF8 clientName 
+                                  ) throws IOException {
+      String text = "File " + src 
+              + ((clientName != null) ? " on client " + clientName : "")
+              + ".\n"
+              + "Requested replication " + replication;
+
+      if( replication > maxReplication )
+        throw new IOException( text + " exceeds maximum " + maxReplication );
+      
+      if( replication < minReplication )
+        throw new IOException(  
+            text + " is less than the required minimum " + minReplication );
+    }
+    
     /**
      * The client would like to create a new block for the indicated
      * filename.  Return an array that consists of the block, plus a set 
@@ -255,17 +322,7 @@ class FSNamesystem implements FSConstants {
           throw new NameNode.AlreadyBeingCreatedException(msg);
         }
 
-        if( replication > maxReplication )
-          throw new IOException(
-            "Cannot create file " + src + " on client " + clientMachine + ".\n"
-            + "Requested replication " + replication
-            + " exceeds maximum " + maxReplication );
-        
-        if( replication < minReplication )
-          throw new IOException(
-            "Cannot create file " + src + " on client " + clientMachine + ".\n"
-            + "Requested replication " + replication
-            + " is less than the required minimum " + minReplication );
+        verifyReplication(src.toString(), replication, clientMachine );
         
         if (!dir.isValidToCreate(src)) {
           if (overwrite) {
@@ -1061,24 +1118,27 @@ class FSNamesystem implements FSConstants {
             } else // containingNodes.size() < fileReplication
                 neededReplications.add(block);
 
-            //
-            // Find how many of the containing nodes are "extra", if any.
-            // If there are any extras, call chooseExcessReplicates() to
-            // mark them in the excessReplicateMap.
-            //
-            Vector nonExcess = new Vector();
-            for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
-                DatanodeInfo cur = (DatanodeInfo) it.next();
-                TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
-                if (excessBlocks == null || ! excessBlocks.contains(block)) {
-                    nonExcess.add(cur);
-                }
-            }
-            if (nonExcess.size() > fileReplication) {
-                chooseExcessReplicates(nonExcess, block, fileReplication);    
-            }
+            proccessOverReplicatedBlock( block, fileReplication );
         }
     }
+    
+    /**
+     * Find how many of the containing nodes are "extra", if any.
+     * If there are any extras, call chooseExcessReplicates() to
+     * mark them in the excessReplicateMap.
+     */
+    private void proccessOverReplicatedBlock( Block block, short replication ) {
+      TreeSet containingNodes = (TreeSet) blocksMap.get(block);
+      Vector nonExcess = new Vector();
+      for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
+          DatanodeInfo cur = (DatanodeInfo) it.next();
+          TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+          if (excessBlocks == null || ! excessBlocks.contains(block)) {
+              nonExcess.add(cur);
+          }
+      }
+      chooseExcessReplicates(nonExcess, block, replication);    
+    }
 
     /**
      * We want "replication" replicates for the block, but we now have too many.  
@@ -1263,7 +1323,12 @@ class FSNamesystem implements FSConstants {
                         it.remove();
                     } else {
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
-                        if (containingNodes.contains(srcNode)) {
+                        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(srcNode.getName());
+                        // srcNode must contain the block, and the block must 
+                        // not be scheduled for removal on that node
+                        if (containingNodes.contains(srcNode) 
+                              && ( excessBlocks == null 
+                               || ! excessBlocks.contains(block))) {
                             DatanodeInfo targets[] = chooseTargets(Math.min(fileINode.getReplication() - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
                             if (targets.length > 0) {
                                 // Build items to return
@@ -1293,7 +1358,12 @@ class FSNamesystem implements FSConstants {
                             pendingReplications.add(block);
                         }
 
-			LOG.info("Pending transfer (block " + block.getBlockName() + ") from " + srcNode.getName() + " to " + targets.length + " destinations");
+                        LOG.info("Pending transfer (block " 
+                            + block.getBlockName() 
+                            + ") from " + srcNode.getName() 
+                            + " to " + targets[0].getName() 
+                            + (targets.length > 1 ? " and " + (targets.length-1) 
+                                + " more destination(s)" : "" ));
                     }
 
                     //

+ 6 - 0
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -188,6 +188,12 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         return new LocatedBlock(b, targets);
     }
 
+    public boolean setReplication( String src, 
+                                short replication
+                              ) throws IOException {
+      return namesystem.setReplication( src, replication );
+    }
+    
     /**
      */
     public LocatedBlock addBlock(String src, 

+ 41 - 0
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -242,6 +242,47 @@ public abstract class FileSystem extends Configured {
         }
     }
 
+    /**
+     * Set replication for an existing file.
+     * 
+     * @param src file name
+     * @param replication new replication
+     * @throws IOException
+     * @return true if successful;
+     *         false if file does not exist or is a directory
+     */
+    public boolean setReplication(Path src, short replication) throws IOException {
+      boolean value = setReplicationRaw(src, replication);
+      if( ! value )
+        return false;
+
+      Path checkFile = getChecksumFile(src);
+      if (exists(checkFile))
+        setReplicationRaw(checkFile, replication);
+
+      return true;
+    }
+
+    /**
+     * Get replication.
+     * 
+     * @param src file name
+     * @return file replication
+     * @throws IOException
+     */
+    public abstract short getReplication(Path src) throws IOException;
+
+    /**
+     * Set replication for an existing file.
+     * 
+     * @param src file name
+     * @param replication new replication
+     * @throws IOException
+     * @return true if successful;
+     *         false if file does not exist or is a directory
+     */
+    public abstract boolean setReplicationRaw(Path src, short replication) throws IOException;
+
     /** @deprecated Call {@link #rename(Path, Path)} instead. */
     public boolean rename(File src, File dst) throws IOException {
       return rename(new Path(src.toString()), new Path(dst.toString()));

+ 14 - 0
src/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -21,6 +21,7 @@ import java.util.*;
 import java.nio.channels.*;
 
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 
 /****************************************************************
@@ -172,6 +173,19 @@ public class LocalFileSystem extends FileSystem {
         return new LocalFSFileOutputStream(f);
     }
 
+    /**
+     * Replication is not supported for the local file system.
+     */
+    public short getReplication(Path f) throws IOException {
+      return 1;
+    }
+    
+    public boolean setReplicationRaw( Path src, 
+                                      short replication
+                                    ) throws IOException {
+      return true;
+    }
+
     public boolean renameRaw(Path src, Path dst) throws IOException {
         if (useCopyForRename) {
           return FileUtil.copy(this, src, this, dst, true, getConf());