Browse Source

Fix for HADOOP-51. Support per-file replication counts in DFS. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@393035 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
5659db7985

+ 3 - 0
CHANGES.txt

@@ -6,6 +6,9 @@ Trunk (unreleased)
  1. Fix HADOOP-126. 'bin/hadoop dfs -cp' now correctly copies .crc
     files.  (Konstantin Shvachko via cutting)
 
+ 2. Fix HADOOP-51. Change DFS to support per-file replication counts.
+    (Konstantin Shvachko via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

+ 18 - 3
conf/hadoop-default.xml

@@ -92,9 +92,24 @@
 <property>
   <name>dfs.replication</name>
   <value>3</value>
-  <description>How many copies we try to have at all times. The actual
-  number of replications is at max the number of datanodes in the
-  cluster.</description>
+  <description>Default block replication. 
+  The actual number of replications can be specified when the file is created.
+  The default is used if replication is not specified in create time.
+  </description>
+</property>
+
+<property>
+  <name>dfs.replication.max</name>
+  <value>512</value>
+  <description>Maximal block replication. 
+  </description>
+</property>
+
+<property>
+  <name>dfs.replication.min</name>
+  <value>1</value>
+  <description>Minimal block replication. 
+  </description>
 </property>
 
 <property>

+ 5 - 2
src/java/org/apache/hadoop/dfs/Block.java

@@ -15,10 +15,9 @@
  */
 package org.apache.hadoop.dfs;
 
-import org.apache.hadoop.io.*;
-
 import java.io.*;
 import java.util.*;
+import org.apache.hadoop.io.*;
 
 /**************************************************
  * A Block is a Hadoop FS primitive, identified by a 
@@ -113,6 +112,10 @@ class Block implements Writable, Comparable {
     public void readFields(DataInput in) throws IOException {
         this.blkid = in.readLong();
         this.len = in.readLong();
+        if( len < 0 || len > FSConstants.BLOCK_SIZE )
+          throw new IOException("Unexpected block size: " + len 
+                                + ". System block size = " 
+                                + FSConstants.BLOCK_SIZE + ".");
     }
 
     /////////////////////////////////////

+ 6 - 1
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -51,7 +51,12 @@ interface ClientProtocol {
      * create multi-block files must also use reportWrittenBlock()
      * and addBlock().
      */
-    public LocatedBlock create(String src, String clientName, String clientMachine, boolean overwrite) throws IOException;
+    public LocatedBlock create( String src, 
+                                String clientName, 
+                                String clientMachine, 
+                                boolean overwrite, 
+                                short replication
+                              ) throws IOException;
 
     /**
      * A client that has written a block of data can report completion

+ 34 - 4
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -95,8 +95,36 @@ class DFSClient implements FSConstants {
         return new DFSInputStream(src.toString());
     }
 
-    public FSOutputStream create(UTF8 src, boolean overwrite) throws IOException {
-        return new DFSOutputStream(src, overwrite);
+    /**
+     * Create a new dfs file and return an output stream for writing into it. 
+     * 
+     * @param src stream name
+     * @param overwrite do not check for file existence if true
+     * @return output stream
+     * @throws IOException
+     */
+    public FSOutputStream create( UTF8 src, 
+                                  boolean overwrite
+                                ) throws IOException {
+      return create( src, overwrite, (short)conf.getInt("dfs.replication", 3));
+    }
+    
+    /**
+    /**
+     * Create a new dfs file with the specified block replication 
+     * and return an output stream for writing into the file.  
+     * 
+     * @param src stream name
+     * @param overwrite do not check for file existence if true
+     * @param replication block replication
+     * @return output stream
+     * @throws IOException
+     */
+    public FSOutputStream create( UTF8 src, 
+                                  boolean overwrite, 
+                                  short replication
+                                ) throws IOException {
+        return new DFSOutputStream(src, overwrite, replication);
     }
 
     /**
@@ -527,6 +555,7 @@ class DFSClient implements FSConstants {
 
         private UTF8 src;
         private boolean overwrite;
+        private short replication;
         private boolean firstTime = true;
         private DataOutputStream blockStream;
         private DataInputStream blockReplyStream;
@@ -539,9 +568,10 @@ class DFSClient implements FSConstants {
         /**
          * Create a new output stream to the given DataNode.
          */
-        public DFSOutputStream(UTF8 src, boolean overwrite) throws IOException {
+        public DFSOutputStream(UTF8 src, boolean overwrite, short replication) throws IOException {
             this.src = src;
             this.overwrite = overwrite;
+            this.replication = replication;
             this.backupFile = newBackupFile();
             this.backupStream = new FileOutputStream(backupFile);
         }
@@ -570,7 +600,7 @@ class DFSClient implements FSConstants {
                 LocatedBlock lb = null;                
                 while (! blockComplete) {
                     if (firstTime) {
-                        lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);
+                        lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite, replication);
                     } else {
                         lb = namenode.addBlock(src.toString(), localName);
                     }

+ 10 - 0
src/java/org/apache/hadoop/dfs/DFSFileInfo.java

@@ -38,6 +38,7 @@ class DFSFileInfo implements Writable {
     long len;
     long contentsLen;
     boolean isDir;
+    short blockReplication;
 
     /**
      */
@@ -55,6 +56,7 @@ class DFSFileInfo implements Writable {
           this.contentsLen = node.computeContentsLength();
         } else 
           this.len = this.contentsLen = node.computeFileLength();
+        this.blockReplication = node.getReplication();
     }
 
     /**
@@ -93,6 +95,12 @@ class DFSFileInfo implements Writable {
         return isDir;
     }
 
+    /**
+     */
+    public short getReplication() {
+      return this.blockReplication;
+    }
+
     //////////////////////////////////////////////////
     // Writable
     //////////////////////////////////////////////////
@@ -101,6 +109,7 @@ class DFSFileInfo implements Writable {
         out.writeLong(len);
         out.writeLong(contentsLen);
         out.writeBoolean(isDir);
+        out.writeShort(blockReplication);
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -109,6 +118,7 @@ class DFSFileInfo implements Writable {
         this.len = in.readLong();
         this.contentsLen = in.readLong();
         this.isDir = in.readBoolean();
+        this.blockReplication = in.readShort();
     }
 }
 

+ 3 - 3
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -78,9 +78,9 @@ public class DistributedFileSystem extends FileSystem {
       return dfs.open(getPath(f));
     }
 
-    public FSOutputStream createRaw(File f, boolean overwrite)
+    public FSOutputStream createRaw(File f, boolean overwrite, short replication)
       throws IOException {
-      return dfs.create(getPath(f), overwrite);
+      return dfs.create(getPath(f), overwrite, replication);
     }
 
     /**
@@ -287,7 +287,7 @@ public class DistributedFileSystem extends FileSystem {
     }
 
     public long getBlockSize() {
-      return dfs.BLOCK_SIZE;
+      return FSConstants.BLOCK_SIZE;
     }
 
     /** Return the total raw capacity of the filesystem, disregarding

+ 197 - 83
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -34,9 +34,12 @@ import org.apache.hadoop.fs.FileUtil;
  * @author Mike Cafarella
  *************************************************/
 class FSDirectory implements FSConstants {
-    static String FS_IMAGE = "fsimage";
-    static String NEW_FS_IMAGE = "fsimage.new";
-    static String OLD_FS_IMAGE = "fsimage.old";
+    // Versions are negative.
+    // Decrement DFS_CURRENT_VERSION to define new version.
+    private static final int DFS_CURRENT_VERSION = -1;
+    private static final String FS_IMAGE = "fsimage";
+    private static final String NEW_FS_IMAGE = "fsimage.new";
+    private static final String OLD_FS_IMAGE = "fsimage.old";
 
     private static final byte OP_ADD = 0;
     private static final byte OP_RENAME = 1;
@@ -48,17 +51,28 @@ class FSDirectory implements FSConstants {
      * hierarchy.
      ******************************************************/
     class INode {
-        public String name;
-        public INode parent;
-        public TreeMap children = new TreeMap();
-        public Block blocks[];
+        private String name;
+        private INode parent;
+        private TreeMap children = new TreeMap();
+        private Block blocks[];
+        private short blockReplication;
 
         /**
          */
-        INode(String name, INode parent, Block blocks[]) {
+        INode(String name, Block blocks[], short replication) {
             this.name = name;
-            this.parent = parent;
+            this.parent = null;
             this.blocks = blocks;
+            this.blockReplication = replication;
+        }
+
+        /**
+         */
+        INode(String name) {
+            this.name = name;
+            this.parent = null;
+            this.blocks = null;
+            this.blockReplication = 0;
         }
 
         /**
@@ -68,12 +82,21 @@ class FSDirectory implements FSConstants {
         synchronized public boolean isDir() {
           return (blocks == null);
         }
+        
+        /**
+         * Get block replication for the file 
+         * @return block replication
+         */
+        public short getReplication() {
+          return this.blockReplication;
+        }
 
         /**
          * This is the external interface
          */
         INode getNode(String target) {
-            if (! target.startsWith("/") || target.length() == 0) {
+            if ( target == null || 
+                ! target.startsWith("/") || target.length() == 0) {
                 return null;
             } else if (parent == null && "/".equals(target)) {
                 return this;
@@ -103,35 +126,44 @@ class FSDirectory implements FSConstants {
             }
 
             // Check with children
-            INode child = (INode) children.get(components.elementAt(index+1));
+            INode child = this.getChild((String)components.elementAt(index+1));
             if (child == null) {
                 return null;
             } else {
                 return child.getNode(components, index+1);
             }
         }
+        
+        INode getChild( String name) {
+          return (INode) children.get( name );
+        }
 
         /**
+         * Add new INode to the file tree.
+         * Find the parent and insert 
+         * 
+         * @param path file path
+         * @param newNode INode to be added
+         * @return null if the node already exists; inserted INode, otherwise
+         * @author shv
          */
-        INode addNode(String target, Block blks[]) {
-            if (getNode(target) != null) {
-                return null;
-            } else {
-                String parentName = DFSFile.getDFSParent(target);
-                if (parentName == null) {
-                    return null;
-                }
-
-                INode parentNode = getNode(parentName);
-                if (parentNode == null) {
-                    return null;
-                } else {
-                    String targetName = new File(target).getName();
-                    INode newItem = new INode(targetName, parentNode, blks);
-                    parentNode.children.put(targetName, newItem);
-                    return newItem;
-                }
-            }
+        INode addNode(String path, INode newNode) {
+          File target = new File( path );
+          // find parent
+          String parentName = DFSFile.getDFSParent(path);
+          if (parentName == null)
+            return null;
+          INode parentNode = getNode(parentName);
+          if (parentNode == null)
+            return null;
+          // check whether the parent already has a node with that name
+          String name = newNode.name = target.getName();
+          if( parentNode.getChild( name ) != null )
+            return null;
+          // insert into the parent children list
+          parentNode.children.put(name, newNode);
+          newNode.parent = parentNode;
+          return newNode;
         }
 
         /**
@@ -226,6 +258,7 @@ class FSDirectory implements FSConstants {
             if (parent != null) {
                 fullName = parentPrefix + "/" + name;
                 new UTF8(fullName).write(out);
+                out.writeShort(blockReplication);
                 if (blocks == null) {
                     out.writeInt(0);
                 } else {
@@ -242,20 +275,21 @@ class FSDirectory implements FSConstants {
         }
     }
 
-    INode rootDir = new INode("", null, null);
-    TreeSet activeBlocks = new TreeSet();
+    
+    INode rootDir = new INode("");
+    TreeMap activeBlocks = new TreeMap();
     TreeMap activeLocks = new TreeMap();
     DataOutputStream editlog = null;
     boolean ready = false;
 
     /** Access an existing dfs name directory. */
-    public FSDirectory(File dir) throws IOException {
+    public FSDirectory(File dir, Configuration conf) throws IOException {
         File fullimage = new File(dir, "image");
         if (! fullimage.exists()) {
           throw new IOException("NameNode not formatted: " + dir);
         }
         File edits = new File(dir, "edits");
-        if (loadFSImage(fullimage, edits)) {
+        if (loadFSImage(fullimage, edits, conf)) {
             saveFSImage(fullimage, edits);
         }
 
@@ -263,6 +297,7 @@ class FSDirectory implements FSConstants {
             this.ready = true;
             this.notifyAll();
             this.editlog = new DataOutputStream(new FileOutputStream(edits));
+            editlog.writeInt( DFS_CURRENT_VERSION );
         }
     }
 
@@ -309,7 +344,10 @@ class FSDirectory implements FSConstants {
      * filenames and blocks.  Return whether we should
      * "re-save" and consolidate the edit-logs
      */
-    boolean loadFSImage(File fsdir, File edits) throws IOException {
+    boolean loadFSImage( File fsdir, 
+                         File edits, 
+                         Configuration conf
+                       ) throws IOException {
         //
         // Atomic move sequence, to recover from interrupted save
         //
@@ -338,28 +376,51 @@ class FSDirectory implements FSConstants {
         if (curFile.exists()) {
             DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
             try {
-                int numFiles = in.readInt();
+                // read image version
+                int imgVersion = in.readInt();
+                // read number of files
+                int numFiles = 0;
+                // version 0 does not store version #
+                // starts directly with the number of files
+                if( imgVersion >= 0 ) {  
+                  numFiles = imgVersion;
+                  imgVersion = 0;
+                } else 
+                  numFiles = in.readInt();
+                  
+                if( imgVersion < DFS_CURRENT_VERSION ) // future version
+                  throw new IOException(
+                              "Unsupported version of the file system image: "
+                              + imgVersion
+                              + ". Current version = " 
+                              + DFS_CURRENT_VERSION + "." );
+                
+                // read file info
+                short replication = (short)conf.getInt("dfs.replication", 3);
                 for (int i = 0; i < numFiles; i++) {
                     UTF8 name = new UTF8();
                     name.readFields(in);
+                    // version 0 does not support per file replication
+                    if( !(imgVersion >= 0) )
+                      replication = in.readShort(); // other versions do
                     int numBlocks = in.readInt();
-                    if (numBlocks == 0) {
-                        unprotectedAddFile(name, null);
-                    } else {
-                        Block blocks[] = new Block[numBlocks];
+                    Block blocks[] = null;
+                    if (numBlocks > 0) {
+                        blocks = new Block[numBlocks];
                         for (int j = 0; j < numBlocks; j++) {
                             blocks[j] = new Block();
                             blocks[j].readFields(in);
                         }
-                        unprotectedAddFile(name, blocks);
                     }
+                    unprotectedAddFile(name, 
+                            new INode( name.toString(), blocks, replication ));
                 }
             } finally {
                 in.close();
             }
         }
 
-        if (edits.exists() && loadFSEdits(edits) > 0) {
+        if (edits.exists() && loadFSEdits(edits, conf) > 0) {
             return true;
         } else {
             return false;
@@ -372,11 +433,32 @@ class FSDirectory implements FSConstants {
      * This is where we apply edits that we've been writing to disk all
      * along.
      */
-    int loadFSEdits(File edits) throws IOException {
+    int loadFSEdits(File edits, Configuration conf) throws IOException {
         int numEdits = 0;
+        int logVersion = 0;
 
         if (edits.exists()) {
-            DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(edits)));
+            DataInputStream in = new DataInputStream(
+                                    new BufferedInputStream(
+                                        new FileInputStream(edits)));
+            // Read log file version. Could be missing. 
+            in.mark( 4 );
+            if( in.available() > 0 ) {
+              logVersion = in.readByte();
+              in.reset();
+              if( logVersion >= 0 )
+                logVersion = 0;
+              else
+                logVersion = in.readInt();
+              if( logVersion < DFS_CURRENT_VERSION ) // future version
+                  throw new IOException(
+                            "Unexpected version of the file system log file: "
+                            + logVersion
+                            + ". Current version = " 
+                            + DFS_CURRENT_VERSION + "." );
+            }
+            
+            short replication = (short)conf.getInt("dfs.replication", 3);
             try {
                 while (in.available() > 0) {
                     byte opcode = in.readByte();
@@ -384,13 +466,32 @@ class FSDirectory implements FSConstants {
                     switch (opcode) {
                     case OP_ADD: {
                         UTF8 name = new UTF8();
-                        name.readFields(in);
-                        ArrayWritable aw = new ArrayWritable(Block.class);
+                        ArrayWritable aw = null;
+                        Writable writables[];
+                        // version 0 does not support per file replication
+                        if( logVersion >= 0 )
+                          name.readFields(in);  // read name only
+                        else {  // other versions do
+                          // get name and replication
+                          aw = new ArrayWritable(UTF8.class);
+                          aw.readFields(in);
+                          writables = aw.get(); 
+                          if( writables.length != 2 )
+                            throw new IOException("Incorrect data fortmat. " 
+                                           + "Name & replication pair expected");
+                          name = (UTF8) writables[0];
+                          replication = Short.parseShort(
+                                              ((UTF8)writables[1]).toString());
+                        }
+                        // get blocks
+                        aw = new ArrayWritable(Block.class);
                         aw.readFields(in);
-                        Writable writables[] = (Writable[]) aw.get();
+                        writables = aw.get();
                         Block blocks[] = new Block[writables.length];
                         System.arraycopy(writables, 0, blocks, 0, blocks.length);
-                        unprotectedAddFile(name, blocks);
+                        // add to the file tree
+                        unprotectedAddFile(name, 
+                            new INode( name.toString(), blocks, replication ));
                         break;
                     } 
                     case OP_RENAME: {
@@ -422,6 +523,9 @@ class FSDirectory implements FSConstants {
                 in.close();
             }
         }
+        
+        if( logVersion != DFS_CURRENT_VERSION ) // other version
+          numEdits++; // save this image asap
         return numEdits;
     }
 
@@ -438,6 +542,7 @@ class FSDirectory implements FSConstants {
         //
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
         try {
+            out.writeInt(DFS_CURRENT_VERSION);
             out.writeInt(rootDir.numItemsInTree() - 1);
             rootDir.saveImage("", out);
         } finally {
@@ -481,31 +586,35 @@ class FSDirectory implements FSConstants {
     /**
      * Add the given filename to the fs.
      */
-    public boolean addFile(UTF8 src, Block blocks[]) {
+    public boolean addFile(UTF8 path, Block[] blocks, short replication) {
         waitForReady();
 
         // Always do an implicit mkdirs for parent directory tree
-        mkdirs(DFSFile.getDFSParent(src.toString()));
-        if (unprotectedAddFile(src, blocks)) {
-            logEdit(OP_ADD, src, new ArrayWritable(Block.class, blocks));
-            return true;
-        } else {
-            return false;
-        }
+        String pathString = path.toString();
+        mkdirs(DFSFile.getDFSParent(pathString));
+        INode newNode = new INode( new File(pathString).getName(), blocks, replication);
+        if( ! unprotectedAddFile(path, newNode) )
+          return false;
+        // add create file record to log
+        UTF8 nameReplicationPair[] = new UTF8[] { 
+                              path, 
+                              new UTF8( Short.toString(replication))};
+        logEdit(OP_ADD,
+                new ArrayWritable( UTF8.class, nameReplicationPair ), 
+                new ArrayWritable( Block.class, newNode.blocks ));
+        return true;
     }
     
     /**
      */
-    boolean unprotectedAddFile(UTF8 name, Block blocks[]) {
-        synchronized (rootDir) {
-            if (blocks != null) {
-                // Add file->block mapping
-                for (int i = 0; i < blocks.length; i++) {
-                    activeBlocks.add(blocks[i]);
-                }
-            }
-            return (rootDir.addNode(name.toString(), blocks) != null);
-        }
+    boolean unprotectedAddFile(UTF8 path, INode newNode) {
+      synchronized (rootDir) {
+        int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
+        // Add file->block mapping
+        for (int i = 0; i < nrBlocks; i++)
+            activeBlocks.put(newNode.blocks[i], newNode);
+        return (rootDir.addNode(path.toString(), newNode) != null);
+      }
     }
 
     /**
@@ -525,26 +634,22 @@ class FSDirectory implements FSConstants {
      */
     boolean unprotectedRenameTo(UTF8 src, UTF8 dst) {
         synchronized(rootDir) {
-            INode removedNode = rootDir.getNode(src.toString());
-            if (removedNode == null) {
+          String srcStr = src.toString();
+          String dstStr = dst.toString();
+            INode renamedNode = rootDir.getNode(srcStr);
+            if (renamedNode == null) {
                 return false;
             }
-            removedNode.removeNode();
+            renamedNode.removeNode();
             if (isDir(dst)) {
-                dst = new UTF8(dst.toString() + "/" + new File(src.toString()).getName());
+              dstStr += "/" + new File(srcStr).getName();
             }
-            INode newNode = rootDir.addNode(dst.toString(), removedNode.blocks);
-            if (newNode != null) {
-                newNode.children = removedNode.children;
-                for (Iterator it = newNode.children.values().iterator(); it.hasNext(); ) {
-                    INode child = (INode) it.next();
-                    child.parent = newNode;
-                }
-                return true;
-            } else {
-                rootDir.addNode(src.toString(), removedNode.blocks);
-                return false;
+            // the renamed node can be reused now
+            if( rootDir.addNode(dstStr, renamedNode ) == null ) {
+              rootDir.addNode(srcStr, renamedNode); // put it back
+              return false;
             }
+            return true;
         }
     }
 
@@ -730,7 +835,7 @@ class FSDirectory implements FSConstants {
      */
     INode unprotectedMkdir(String src) {
         synchronized (rootDir) {
-            return rootDir.addNode(src, null);
+            return rootDir.addNode(src, new INode(new File(src).getName()));
         }
     }
 
@@ -749,11 +854,20 @@ class FSDirectory implements FSConstants {
      */
     public boolean isValidBlock(Block b) {
         synchronized (rootDir) {
-            if (activeBlocks.contains(b)) {
+            if (activeBlocks.containsKey(b)) {
                 return true;
             } else {
                 return false;
             }
         }
     }
+
+    /**
+     * Returns whether the given block is one pointed-to by a file.
+     */
+    public INode getFileByBlock(Block b) {
+      synchronized (rootDir) {
+        return (INode)activeBlocks.get(b);
+      }
+    }
 }

+ 204 - 156
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -74,6 +74,8 @@ class FSNamesystem implements FSConstants {
     // Keeps track of files that are being created, plus the
     // blocks that make them up.
     //
+    // Maps file names to FileUnderConstruction objects
+    //
     TreeMap pendingCreates = new TreeMap();
 
     //
@@ -130,10 +132,7 @@ class FSNamesystem implements FSConstants {
     Daemon hbthread = null, lmthread = null;
     boolean fsRunning = true;
     long systemStart = 0;
-    private Configuration conf;
 
-    //  DESIRED_REPLICATION is how many copies we try to have at all times
-    private int desiredReplication;
     //  The maximum number of replicates we should allow for a single block
     private int maxReplication;
     //  How many outgoing replication streams a given node should have at one time
@@ -148,18 +147,23 @@ class FSNamesystem implements FSConstants {
      * is stored
      */
     public FSNamesystem(File dir, Configuration conf) throws IOException {
-        this.dir = new FSDirectory(dir);
+        this.dir = new FSDirectory(dir, conf);
         this.hbthread = new Daemon(new HeartbeatMonitor());
         this.lmthread = new Daemon(new LeaseMonitor());
         hbthread.start();
         lmthread.start();
         this.systemStart = System.currentTimeMillis();
-        this.conf = conf;
-        
-        this.desiredReplication = conf.getInt("dfs.replication", 3);
-        this.maxReplication = desiredReplication;
+
+        this.maxReplication = conf.getInt("dfs.replication.max", 3);
+        this.minReplication = conf.getInt("dfs.replication.min", 1);
+        if( maxReplication < minReplication )
+          throw new IOException(
+              "Unexpected configuration parameters: dfs.replication.min = " 
+              + minReplication
+              + " must be at less than dfs.replication.max = " 
+              + maxReplication );
+
         this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
-        this.minReplication = 1;
         this.heartBeatRecheck= 1000;
     }
 
@@ -232,51 +236,68 @@ class FSNamesystem implements FSConstants {
      * of machines, or null if src is invalid for creation (based on
      * {@link FSDirectory#isValidToCreate(UTF8)}.
      */
-    public synchronized Object[] startFile(UTF8 src, UTF8 holder, UTF8 clientMachine, boolean overwrite) {
-        Object results[] = null;
-        if (pendingCreates.get(src) == null) {
-            boolean fileValid = dir.isValidToCreate(src);
-            if (overwrite && ! fileValid) {
-                delete(src);
-                fileValid = true;
-            }
-
-            if (fileValid) {
-                results = new Object[2];
+    public synchronized Object[] startFile( UTF8 src, 
+                                            UTF8 holder, 
+                                            UTF8 clientMachine, 
+                                            boolean overwrite,
+                                            short replication 
+                                          ) throws IOException {
+        if (pendingCreates.get(src) != null) {
+          LOG.warning("Cannot start file because pendingCreates is non-null. src=" + src);
+          return null;
+        }
 
-                // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine);
-                if (targets.length < this.minReplication) {
-                    LOG.warning("Target-length is " + targets.length +
-                        ", below MIN_REPLICATION (" + this.minReplication+ ")");
-                    return null;
-                }
+        if( replication > maxReplication )
+          throw new IOException(
+            "Cannot create file " + src + " on client " + clientMachine + ".\n"
+            + "Requested replication " + replication
+            + " exceeds maximum " + maxReplication );
+        
+        if( replication < minReplication )
+          throw new IOException(
+            "Cannot create file " + src + " on client " + clientMachine + ".\n"
+            + "Requested replication " + replication
+            + " is less than the required minimum " + minReplication );
+        
+        boolean fileValid = dir.isValidToCreate(src);
+        if (overwrite && ! fileValid) {
+            delete(src);
+            fileValid = true;
+        }
+  
+        if ( ! fileValid) {
+          LOG.warning("Cannot start file because it is invalid. src=" + src);
+          return null;
+        }
 
-                // Reserve space for this pending file
-                pendingCreates.put(src, new Vector());
-                synchronized (leases) {
-                    Lease lease = (Lease) leases.get(holder);
-                    if (lease == null) {
-                        lease = new Lease(holder);
-                        leases.put(holder, lease);
-                        sortedLeases.add(lease);
-                    } else {
-                        sortedLeases.remove(lease);
-                        lease.renew();
-                        sortedLeases.add(lease);
-                    }
-                    lease.startedCreate(src);
-                }
+        // Get the array of replication targets 
+        DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine);
+        if (targets.length < this.minReplication) {
+            LOG.warning("Target-length is " + targets.length +
+                ", below MIN_REPLICATION (" + this.minReplication+ ")");
+            return null;
+        }
 
-                // Create next block
-                results[0] = allocateBlock(src);
-                results[1] = targets;
-            } else { // ! fileValid
-              LOG.warning("Cannot start file because it is invalid. src=" + src);
+        // Reserve space for this pending file
+        pendingCreates.put(src, new FileUnderConstruction( replication ));
+        synchronized (leases) {
+            Lease lease = (Lease) leases.get(holder);
+            if (lease == null) {
+                lease = new Lease(holder);
+                leases.put(holder, lease);
+                sortedLeases.add(lease);
+            } else {
+                sortedLeases.remove(lease);
+                lease.renew();
+                sortedLeases.add(lease);
             }
-        } else {
-            LOG.warning("Cannot start file because pendingCreates is non-null. src=" + src);
+            lease.startedCreate(src);
         }
+
+        // Create next block
+        Object results[] = new Object[2];
+        results[0] = allocateBlock(src);
+        results[1] = targets;
         return results;
     }
 
@@ -293,7 +314,8 @@ class FSNamesystem implements FSConstants {
      */
     public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {
         Object results[] = null;
-        if (dir.getFile(src) == null && pendingCreates.get(src) != null) {
+        FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src);
+        if (dir.getFile(src) == null && pendingFile != null) {
             results = new Object[2];
 
             //
@@ -301,7 +323,7 @@ class FSNamesystem implements FSConstants {
             //
             if (checkFileProgress(src)) {
                 // Get the array of replication targets 
-                DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine);
+                DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), null, clientMachine);
                 if (targets.length < this.minReplication) {
                     return null;
                 }
@@ -350,83 +372,89 @@ class FSNamesystem implements FSConstants {
      */
     public synchronized int completeFile(UTF8 src, UTF8 holder) {
         if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
-	    LOG.info("Failed to complete " + src + "  because dir.getFile()==" + dir.getFile(src) + " and " + pendingCreates.get(src));
+            LOG.info( "Failed to complete " + src + 
+                      "  because dir.getFile()==" + dir.getFile(src) + 
+                      " and " + pendingCreates.get(src));
             return OPERATION_FAILED;
         } else if (! checkFileProgress(src)) {
             return STILL_WAITING;
-        } else {
-            Vector pendingVector = (Vector) pendingCreates.get(src);
-            Block pendingBlocks[] = (Block[]) pendingVector.toArray(new Block[pendingVector.size()]);
+        }
+        
+        FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src);
+        int nrBlocks = pendingFile.size();
+        Block pendingBlocks[] = (Block[]) pendingFile.toArray(new Block[nrBlocks]);
 
-            //
-            // We have the pending blocks, but they won't have
-            // length info in them (as they were allocated before
-            // data-write took place).  So we need to add the correct
-            // length info to each
-            //
-            // REMIND - mjc - this is very inefficient!  We should
-            // improve this!
-            //
-            for (int i = 0; i < pendingBlocks.length; i++) {
-                Block b = pendingBlocks[i];
-                TreeSet containingNodes = (TreeSet) blocksMap.get(b);
-                DatanodeInfo node = (DatanodeInfo) containingNodes.first();
-                for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {
-                    Block cur = (Block) it.next();
-                    if (b.getBlockId() == cur.getBlockId()) {
-                        b.setNumBytes(cur.getNumBytes());
-                        break;
-                    }
+        //
+        // We have the pending blocks, but they won't have
+        // length info in them (as they were allocated before
+        // data-write took place).  So we need to add the correct
+        // length info to each
+        //
+        // REMIND - mjc - this is very inefficient!  We should
+        // improve this!
+        //
+        for (int i = 0; i < nrBlocks; i++) {
+            Block b = (Block)pendingBlocks[i];
+            TreeSet containingNodes = (TreeSet) blocksMap.get(b);
+            DatanodeInfo node = (DatanodeInfo) containingNodes.first();
+            for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {
+                Block cur = (Block) it.next();
+                if (b.getBlockId() == cur.getBlockId()) {
+                    b.setNumBytes(cur.getNumBytes());
+                    break;
                 }
             }
-            
-            //
-            // Now we can add the (name,blocks) tuple to the filesystem
-            //
-            if (dir.addFile(src, pendingBlocks)) {
-                // The file is no longer pending
-                pendingCreates.remove(src);
-                for (int i = 0; i < pendingBlocks.length; i++) {
-                    pendingCreateBlocks.remove(pendingBlocks[i]);
-                }
+        }
+        
+        //
+        // Now we can add the (name,blocks) tuple to the filesystem
+        //
+        if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
+          System.out.println("AddFile() for " + src + " failed");
+          return OPERATION_FAILED;
+        }
 
-                synchronized (leases) {
-                    Lease lease = (Lease) leases.get(holder);
-                    if (lease != null) {
-                        lease.completedCreate(src);
-                        if (! lease.hasLocks()) {
-                            leases.remove(holder);
-                            sortedLeases.remove(lease);
-                        }
-                    }
+        // The file is no longer pending
+        pendingCreates.remove(src);
+        for (int i = 0; i < nrBlocks; i++) {
+            pendingCreateBlocks.remove(pendingBlocks[i]);
+        }
+
+        synchronized (leases) {
+            Lease lease = (Lease) leases.get(holder);
+            if (lease != null) {
+                lease.completedCreate(src);
+                if (! lease.hasLocks()) {
+                    leases.remove(holder);
+                    sortedLeases.remove(lease);
                 }
+            }
+        }
 
-                //
-                // REMIND - mjc - this should be done only after we wait a few secs.
-                // The namenode isn't giving datanodes enough time to report the
-                // replicated blocks that are automatically done as part of a client
-                // write.
-                //
+        //
+        // REMIND - mjc - this should be done only after we wait a few secs.
+        // The namenode isn't giving datanodes enough time to report the
+        // replicated blocks that are automatically done as part of a client
+        // write.
+        //
 
-                // Now that the file is real, we need to be sure to replicate
-                // the blocks.
-                for (int i = 0; i < pendingBlocks.length; i++) {
-                    TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
-                    if (containingNodes.size() < this.desiredReplication) {
-                        synchronized (neededReplications) {
-                            LOG.info("Completed file " + src + ", at holder " + holder + ".  There is/are only " + containingNodes.size() + " copies of block " + pendingBlocks[i] + ", so replicating up to " + this.desiredReplication);
-                            neededReplications.add(pendingBlocks[i]);
-                        }
-                    }
+        // Now that the file is real, we need to be sure to replicate
+        // the blocks.
+        for (int i = 0; i < nrBlocks; i++) {
+            TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
+            if (containingNodes.size() < pendingFile.getReplication()) {
+                synchronized (neededReplications) {
+                    LOG.info("Completed file " + src 
+                              + ", at holder " + holder 
+                              + ".  There is/are only " + containingNodes.size() 
+                              + " copies of block " + pendingBlocks[i] 
+                              + ", so replicating up to " 
+                              + pendingFile.getReplication());
+                    neededReplications.add(pendingBlocks[i]);
                 }
-                return COMPLETE_SUCCESS;
-            } else {
-                System.out.println("AddFile() for " + src + " failed");
             }
-	    LOG.info("Dropped through on file add....");
         }
-
-        return OPERATION_FAILED;
+        return COMPLETE_SUCCESS;
     }
 
     /**
@@ -434,7 +462,7 @@ class FSNamesystem implements FSConstants {
      */
     synchronized Block allocateBlock(UTF8 src) {
         Block b = new Block();
-        Vector v = (Vector) pendingCreates.get(src);
+        FileUnderConstruction v = (FileUnderConstruction) pendingCreates.get(src);
         v.add(b);
         pendingCreateBlocks.add(b);
         return b;
@@ -445,7 +473,7 @@ class FSNamesystem implements FSConstants {
      * replicated.  If not, return false.
      */
     synchronized boolean checkFileProgress(UTF8 src) {
-        Vector v = (Vector) pendingCreates.get(src);
+        FileUnderConstruction v = (FileUnderConstruction) pendingCreates.get(src);
 
         for (Iterator it = v.iterator(); it.hasNext(); ) {
             Block b = (Block) it.next();
@@ -744,7 +772,7 @@ class FSNamesystem implements FSConstants {
         return dir.releaseLock(src, holder);
     }
     private void internalReleaseCreate(UTF8 src) {
-        Vector v = (Vector) pendingCreates.remove(src);
+        FileUnderConstruction v = (FileUnderConstruction) pendingCreates.remove(src);
         for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
             Block b = (Block) it2.next();
             pendingCreateBlocks.remove(b);
@@ -950,47 +978,46 @@ class FSNamesystem implements FSConstants {
         }
 
         synchronized (neededReplications) {
-            if (dir.isValidBlock(block)) {
-                if (containingNodes.size() >= this.desiredReplication) {
-                    neededReplications.remove(block);
-                    pendingReplications.remove(block);
-                } else if (containingNodes.size() < this.desiredReplication) {
-                    if (! neededReplications.contains(block)) {
-                        neededReplications.add(block);
-                    }
-                }
+            FSDirectory.INode fileINode = dir.getFileByBlock(block);
+            if( fileINode == null )  // block does not belong to any file
+                return;
+            short fileReplication = fileINode.getReplication();
+            if (containingNodes.size() >= fileReplication ) {
+                neededReplications.remove(block);
+                pendingReplications.remove(block);
+            } else // containingNodes.size() < fileReplication
+                neededReplications.add(block);
 
-                //
-                // Find how many of the containing nodes are "extra", if any.
-                // If there are any extras, call chooseExcessReplicates() to
-                // mark them in the excessReplicateMap.
-                //
-                Vector nonExcess = new Vector();
-                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
-                    DatanodeInfo cur = (DatanodeInfo) it.next();
-                    TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
-                    if (excessBlocks == null || ! excessBlocks.contains(block)) {
-                        nonExcess.add(cur);
-                    }
-                }
-                if (nonExcess.size() > this.maxReplication) {
-                    chooseExcessReplicates(nonExcess, block, this.maxReplication);    
+            //
+            // Find how many of the containing nodes are "extra", if any.
+            // If there are any extras, call chooseExcessReplicates() to
+            // mark them in the excessReplicateMap.
+            //
+            Vector nonExcess = new Vector();
+            for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
+                DatanodeInfo cur = (DatanodeInfo) it.next();
+                TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+                if (excessBlocks == null || ! excessBlocks.contains(block)) {
+                    nonExcess.add(cur);
                 }
             }
+            if (nonExcess.size() > fileReplication) {
+                chooseExcessReplicates(nonExcess, block, fileReplication);    
+            }
         }
     }
 
     /**
-     * We want a max of "maxReps" replicates for any block, but we now have too many.  
+     * We want "replication" replicates for the block, but we now have too many.  
      * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
      *
-     * srcNodes.size() - dstNodes.size() == maxReps
+     * srcNodes.size() - dstNodes.size() == replication
      *
      * For now, we choose nodes randomly.  In the future, we might enforce some
      * kind of policy (like making sure replicates are spread across racks).
      */
-    void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {
-        while (nonExcess.size() - maxReps > 0) {
+    void chooseExcessReplicates(Vector nonExcess, Block b, short replication) {
+        while (nonExcess.size() - replication > 0) {
             int chosenNode = r.nextInt(nonExcess.size());
             DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
             nonExcess.removeElementAt(chosenNode);
@@ -1037,7 +1064,8 @@ class FSNamesystem implements FSConstants {
         // necessary.  In that case, put block on a possibly-will-
         // be-replicated list.
         //
-        if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) {
+        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        if( fileINode != null && (containingNodes.size() < fileINode.getReplication())) {
             synchronized (neededReplications) {
                 neededReplications.add(block);
             }
@@ -1157,12 +1185,13 @@ class FSNamesystem implements FSConstants {
                     }
 
                     Block block = (Block) it.next();
-                    if (! dir.isValidBlock(block)) {
+                    FSDirectory.INode fileINode = dir.getFileByBlock(block);
+                    if( fileINode == null ) { // block does not belong to any file
                         it.remove();
                     } else {
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
                         if (containingNodes.contains(srcNode)) {
-                            DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
+                            DatanodeInfo targets[] = chooseTargets(Math.min(fileINode.getReplication() - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
                             if (targets.length > 0) {
                                 // Build items to return
                                 replicateBlocks.add(block);
@@ -1186,7 +1215,7 @@ class FSNamesystem implements FSConstants {
                         DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
                         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
 
-                        if (containingNodes.size() + targets.length >= this.desiredReplication) {
+                        if (containingNodes.size() + targets.length >= dir.getFileByBlock(block).getReplication()) {
                             neededReplications.remove(block);
                             pendingReplications.add(block);
                         }
@@ -1225,12 +1254,10 @@ class FSNamesystem implements FSConstants {
 
         for (int i = 0; i < desiredReplicates; i++) {
             DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine);
-            if (target != null) {
-                targets.add(target);
-                alreadyChosen.add(target);
-            } else {
-                break; // calling chooseTarget again won't help
-            }
+            if (target == null)
+              break; // calling chooseTarget again won't help
+            targets.add(target);
+            alreadyChosen.add(target);
         }
         return (DatanodeInfo[]) targets.toArray(new DatanodeInfo[targets.size()]);
     }
@@ -1336,4 +1363,25 @@ class FSNamesystem implements FSConstants {
             return null;
         }
     }
+
+    /**
+     * Information about the file while it is being written to.
+     * Note that at that time the file is not visible to the outside.
+     * 
+     * This class contains a <code>Vector</code> of {@link Block}s that has
+     * been written into the file so far, and file replication. 
+     * 
+     * @author shv
+     */
+    private class FileUnderConstruction extends Vector {
+      private short blockReplication; // file replication
+      
+      FileUnderConstruction( short replication ) throws IOException {
+        this.blockReplication = replication;
+      }
+      
+      public short getReplication() {
+        return this.blockReplication;
+      }
+    }
 }

+ 16 - 8
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -141,15 +141,23 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
 
     /**
      */
-    public LocatedBlock create(String src, String clientName, String clientMachine, boolean overwrite) throws IOException {
-        Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), new UTF8(clientMachine), overwrite);
-        if (results == null) {
+    public LocatedBlock create(String src, 
+                               String clientName, 
+                               String clientMachine, 
+                               boolean overwrite,
+                               short replication
+    ) throws IOException {
+        Object results[] = namesystem.startFile(new UTF8(src), 
+                                                new UTF8(clientName), 
+                                                new UTF8(clientMachine), 
+                                                overwrite,
+                                                replication);
+        if (results == null)
             throw new IOException("Cannot create file " + src + " on client " + clientName);
-        } else {
-            Block b = (Block) results[0];
-            DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
-            return new LocatedBlock(b, targets);
-        }
+
+        Block b = (Block) results[0];
+        DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
+        return new LocatedBlock(b, targets);
     }
 
     /**

+ 15 - 9
src/java/org/apache/hadoop/fs/FSDataOutputStream.java

@@ -34,13 +34,17 @@ public class FSDataOutputStream extends DataOutputStream {
     private int inSum;
     private int bytesPerSum;
 
-    public Summer(FileSystem fs, File file, boolean overwrite, Configuration conf)
+    public Summer(FileSystem fs, 
+                  File file, 
+                  boolean overwrite, 
+                  short replication,
+                  Configuration conf)
       throws IOException {
-      super(fs.createRaw(file, overwrite));
+      super(fs.createRaw(file, overwrite, replication));
       this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
-      this.sums =
-        new FSDataOutputStream(fs.createRaw(fs.getChecksumFile(file), true), conf);
-
+      this.sums = new FSDataOutputStream(
+            fs.createRaw(FileSystem.getChecksumFile(file), true, replication), 
+            conf);
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
       sums.writeInt(this.bytesPerSum);
     }
@@ -123,10 +127,12 @@ public class FSDataOutputStream extends DataOutputStream {
 
   public FSDataOutputStream(FileSystem fs, File file,
                             boolean overwrite, Configuration conf,
-                            int bufferSize)
-    throws IOException {
-    super(new Buffer(new PositionCache(new Summer(fs, file, overwrite, conf)),
-                     bufferSize));
+                            int bufferSize, short replication )
+  throws IOException {
+    super(new Buffer(
+            new PositionCache(
+                new Summer(fs, file, overwrite, replication, conf)), 
+            bufferSize));
   }
 
   /** Construct without checksums. */

+ 30 - 6
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -165,7 +165,9 @@ public abstract class FileSystem extends Configured {
      * Files are overwritten by default.
      */
     public FSDataOutputStream create(File f) throws IOException {
-      return create(f, true, getConf().getInt("io.file.buffer.size", 4096));
+      return create(f, true, 
+                    getConf().getInt("io.file.buffer.size", 4096),
+                    (short)getConf().getInt("dfs.replication", 3));
     }
 
     /**
@@ -175,17 +177,38 @@ public abstract class FileSystem extends Configured {
      *   the file will be overwritten, and if false an error will be thrown.
      * @param bufferSize the size of the buffer to be used.
      */
-    public FSDataOutputStream create(File f, boolean overwrite,
-                                      int bufferSize) throws IOException {
-      return new FSDataOutputStream(this, f, overwrite, getConf(), bufferSize);
+    public FSDataOutputStream create( File f, 
+                                      boolean overwrite,
+                                      int bufferSize
+                                    ) throws IOException {
+      return create( f, overwrite, bufferSize, 
+                    (short)getConf().getInt("dfs.replication", 3));
+    }
+    
+    /**
+     * Opens an FSDataOutputStream at the indicated File.
+     * @param f the file name to open
+     * @param overwrite if a file with this name already exists, then if true,
+     *   the file will be overwritten, and if false an error will be thrown.
+     * @param bufferSize the size of the buffer to be used.
+     * @param replication required block replication for the file. 
+     */
+    public FSDataOutputStream create( File f, 
+                                      boolean overwrite,
+                                      int bufferSize,
+                                      short replication
+                                    ) throws IOException {
+      return new FSDataOutputStream(this, f, overwrite, getConf(), 
+                                    bufferSize, replication );
     }
 
     /** Opens an OutputStream at the indicated File.
      * @param f the file name to open
      * @param overwrite if a file with this name already exists, then if true,
      *   the file will be overwritten, and if false an error will be thrown.
+     * @param replication required block replication for the file. 
      */
-    public abstract FSOutputStream createRaw(File f, boolean overwrite)
+    public abstract FSOutputStream createRaw(File f, boolean overwrite, short replication)
       throws IOException;
 
     /**
@@ -196,7 +219,8 @@ public abstract class FileSystem extends Configured {
         if (exists(f)) {
             return false;
         } else {
-            OutputStream out = createRaw(f, false);
+            OutputStream out = createRaw(f, false, 
+                                (short)getConf().getInt("dfs.replication", 3));
             try {
             } finally {
               out.close();

+ 2 - 1
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -56,7 +56,8 @@ public class FileUtil {
         if (fs.isFile(src)) {
             FSInputStream in = fs.openRaw(src);
             try {
-                FSOutputStream out = fs.createRaw(dst, true);
+                FSOutputStream out = fs.createRaw(dst, true, 
+                                      (short)conf.getInt("dfs.replication", 3));
                 byte buf[] = new byte[conf.getInt("io.file.buffer.size", 4096)];
                 try {
                     int readBytes = in.read(buf);

+ 1 - 1
src/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -161,7 +161,7 @@ public class LocalFileSystem extends FileSystem {
       }
     }
     
-    public FSOutputStream createRaw(File f, boolean overwrite)
+    public FSOutputStream createRaw(File f, boolean overwrite, short replication)
       throws IOException {
         f = makeAbsolute(f);
         if (f.exists() && ! overwrite) {