瀏覽代碼

HADOOP-1989. Support simulated DataNodes. This helps creating large virtual
clusters for testing purposes. (Sanjay Radia via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@595509 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 年之前
父節點
當前提交
2faa80f482

+ 3 - 0
CHANGES.txt

@@ -48,6 +48,9 @@ Trunk (unreleased changes)
     keep it end-user focussed, specifically sections related to subversion and
     building Hadoop. (Jim Kellerman via acmurthy)
 
+    HADOOP-1989. Support simulated DataNodes. This helps creating large virtual
+    clusters for testing purposes.  (Sanjay Radia via dhruba)
+    
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 3 - 0
bin/hadoop

@@ -36,6 +36,7 @@ if [ $# = 0 ]; then
   echo "  secondarynamenode    run the DFS secondary namenode"
   echo "  namenode             run the DFS namenode"
   echo "  datanode             run a DFS datanode"
+  echo "  datanodecluster      run a DFS datanode cluster"
   echo "  dfsadmin             run a DFS admin client"
   echo "  fsck                 run a DFS filesystem checking utility"
   echo "  fs                   run a generic filesystem user client"
@@ -169,6 +170,8 @@ elif [ "$COMMAND" = "secondarynamenode" ] ; then
   CLASS='org.apache.hadoop.dfs.SecondaryNameNode'
 elif [ "$COMMAND" = "datanode" ] ; then
   CLASS='org.apache.hadoop.dfs.DataNode'
+elif [ "$COMMAND" = "datanodecluster" ] ; then
+  CLASS='org.apache.hadoop.dfs.DataNodeCluster'
 elif [ "$COMMAND" = "fs" ] ; then
   CLASS=org.apache.hadoop.fs.FsShell
 elif [ "$COMMAND" = "dfs" ] ; then

+ 1 - 0
conf/log4j.properties

@@ -76,6 +76,7 @@ log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
 
 #log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
 #log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
 
 #
 # Event Counter Appender

+ 8 - 5
src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java

@@ -327,7 +327,8 @@ class BlockCrcUpgradeUtils {
                            byte[] crcBuf) throws IOException {
     Block block = blockInfo.block;
     
-    File blockFile = blockInfo.dataNode.data.getBlockFile( block );
+    FSDataset data = (FSDataset) blockInfo.dataNode.data;
+    File blockFile = data.getBlockFile( block );
     File metaFile = FSDataset.getMetaFile( blockFile );
     
     if ( bytesPerChecksum <= 0 ) {
@@ -367,7 +368,7 @@ class BlockCrcUpgradeUtils {
     File tmpMetaFile = null;
     DataOutputStream out = null;
     try {
-      tmpBlockFile = blockInfo.dataNode.data.createTmpFile(null, block);
+      tmpBlockFile = data.createTmpFile(null, block);
       tmpMetaFile = FSDataset.getMetaFile( tmpBlockFile );
       out = new DataOutputStream( new FileOutputStream(tmpMetaFile) );
       
@@ -436,7 +437,8 @@ class BlockCrcUpgradeUtils {
     int oldCrcOffset = 0;
     
     Block block = blockInfo.block;
-    File blockFile = blockInfo.dataNode.data.getBlockFile( block );
+    FSDataset data = (FSDataset) blockInfo.dataNode.data;
+    File blockFile = data.getBlockFile( block );
     if ( blockFile == null || !blockFile.exists() ) {
       throw new IOException("Block file "  + 
                             ((blockFile != null) ? blockFile.getAbsolutePath()
@@ -779,7 +781,8 @@ class BlockCrcUpgradeUtils {
                                    throws IOException {
     
     Block block = blockInfo.block;
-    File blockFile = blockInfo.dataNode.data.getBlockFile( block );
+    FSDataset data = (FSDataset) blockInfo.dataNode.data;
+    File blockFile = data.getBlockFile( block );
     if (blockFile == null || !blockFile.exists()) {
       throw new IOException("Could not local file for block");
     }
@@ -1396,7 +1399,7 @@ class BlockCrcUpgradeObjectDatanode extends UpgradeObjectDatanode {
       return;
     }
     
-    FSDataset dataset = getDatanode().data;
+    FSDataset dataset = (FSDataset) getDatanode().data;
 
     // Set up the retry policy so that each attempt waits for one minute.
     Configuration conf = new Configuration();

+ 11 - 0
src/java/org/apache/hadoop/dfs/DataChecksum.java

@@ -99,6 +99,17 @@ public class DataChecksum implements Checksum {
     out.writeByte( type );
     out.writeInt( bytesPerChecksum );
   }
+
+  public byte[] getHeader() {
+    byte[] header = new byte[DataChecksum.HEADER_LEN];
+    header[0] = (byte) (type & 0xff);
+    // Writing in buffer just like DataOutput.WriteInt()
+    header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
+    header[1+1] = (byte) ((bytesPerChecksum >>> 16) & 0xff);
+    header[1+2] = (byte) ((bytesPerChecksum >>> 8) & 0xff);
+    header[1+3] = (byte) (bytesPerChecksum & 0xff);
+    return header;
+  }
   
   /**
    * Writes the current checksum to the stream.

+ 88 - 43
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.dfs.BlockCommand;
 import org.apache.hadoop.dfs.DatanodeProtocol;
+import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
+
 import java.io.*;
 import java.net.*;
 import java.util.*;
@@ -108,7 +110,7 @@ public class DataNode implements FSConstants, Runnable {
   }
 
   DatanodeProtocol namenode = null;
-  FSDataset data = null;
+  FSDatasetInterface data = null;
   DatanodeRegistration dnRegistration = null;
   private String networkLoc;
   volatile boolean shouldRun = true;
@@ -222,6 +224,17 @@ public class DataNode implements FSConstants, Runnable {
     }
   }
     
+  
+  /**
+   * This method starts the data node with the specified conf.
+   * 
+   * @param conf - the configuration
+   * 		if conf's CONFIG_PROPERTY_SIMULATED property is set
+   *  then a simulated storage based data node is created.
+   * 
+   * @param dataDirs - only for a non-simulated storage data node
+   * @throws IOException
+   */
   void startDataNode(Configuration conf, 
                      AbstractList<File> dataDirs
                      ) throws IOException {
@@ -248,16 +261,32 @@ public class DataNode implements FSConstants, Runnable {
                        conf);
     // get version and id info from the name-node
     NamespaceInfo nsInfo = handshake();
-
-    // read storage info, lock data dirs and transition fs state if necessary
     StartupOption startOpt = getStartupOption(conf);
     assert startOpt != null : "Startup option must be set.";
-    storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-    // adjust
-    this.dnRegistration.setStorageInfo(storage);
-      
-    // initialize data node internal structure
-    this.data = new FSDataset(storage, conf);
+    
+    boolean simulatedFSDataset = 
+        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+    if (simulatedFSDataset) {
+        setNewStorageID(dnRegistration);
+        dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
+        dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
+ 
+        try {
+          //Equivalent of following (can't do because Simulated is in test dir)
+          //  this.data = new SimulatedFSDataset(conf);
+          this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
+              Class.forName("org.apache.hadoop.dfs.SimulatedFSDataset"), conf);
+        } catch (ClassNotFoundException e) {
+          throw new IOException(StringUtils.stringifyException(e));
+        }
+    } else { // real storage
+      // read storage info, lock data dirs and transition fs state if necessary
+      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
+      // adjust
+      this.dnRegistration.setStorageInfo(storage);
+      // initialize data node internal structure
+      this.data = new FSDataset(storage, conf);
+    }
       
     // find free port
     ServerSocket ss = null;
@@ -657,8 +686,7 @@ public class DataNode implements FSConstants, Runnable {
     case DatanodeProtocol.DNA_REGISTER:
       // namenode requested a registration
       register();
-      lastHeartbeat=0;
-      lastBlockReport=0;
+      scheduleBlockReport();
       break;
     case DatanodeProtocol.DNA_FINALIZE:
       storage.finalizeUpgrade();
@@ -708,7 +736,7 @@ public class DataNode implements FSConstants, Runnable {
         break;
       }
       if (xferTargets[i].length > 0) {
-        LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+        LOG.info(dnRegistration + ":Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
         new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
       }
     }
@@ -778,7 +806,7 @@ public class DataNode implements FSConstants, Runnable {
         }
         ss.close();
       } catch (IOException ie) {
-        LOG.info("Exiting DataXceiveServer due to " + ie.toString());
+        LOG.info(dnRegistration + ":Exiting DataXceiveServer due to " + ie.toString());
       }
     }
     public void kill() {
@@ -836,9 +864,9 @@ public class DataNode implements FSConstants, Runnable {
           throw new IOException("Unknown opcode " + op + "in data stream");
         }
        } catch (Throwable t) {
-        LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
+        LOG.error(dnRegistration + ":DataXceiver: " + StringUtils.stringifyException(t));
       } finally {
-        LOG.debug("Number of active connections is: "+xceiverCount);
+        LOG.debug(dnRegistration + ":Number of active connections is: "+xceiverCount);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(s);
       }
@@ -877,7 +905,7 @@ public class DataNode implements FSConstants, Runnable {
 
         myMetrics.readBytes((int) read);
         myMetrics.readBlocks(1);
-        LOG.info("Served block " + block + " to " + s.getInetAddress());
+        LOG.info(dnRegistration + "Served block " + block + " to " + s.getInetAddress());
       } catch ( SocketException ignored ) {
         // Its ok for remote side to close the connection anytime.
         myMetrics.readBlocks(1);
@@ -885,7 +913,7 @@ public class DataNode implements FSConstants, Runnable {
         /* What exactly should we do here?
          * Earlier version shutdown() datanode if there is disk error.
          */
-        LOG.warn( "Got exception while serving " + block + " to " +
+        LOG.warn(dnRegistration +  ":Got exception while serving " + block + " to " +
                   s.getInetAddress() + ":\n" + 
                   StringUtils.stringifyException(ioe) );
         throw ioe;
@@ -992,7 +1020,7 @@ public class DataNode implements FSConstants, Runnable {
         try {
           sendResponse(s, opStatus);
         } catch (IOException ioe) {
-          LOG.warn("Error writing reply back to " + s.getInetAddress() +
+          LOG.warn(dnRegistration +":Error writing reply of status " + opStatus +  " back to " + s.getInetAddress() +
               " for writing block " + block +"\n" +
               StringUtils.stringifyException(ioe));
         }
@@ -1013,18 +1041,18 @@ public class DataNode implements FSConstants, Runnable {
       xceiverCount.incr();
 
       Block block = new Block( in.readLong(), 0 );
-      InputStream checksumIn = null;
+      MetaDataInputStream checksumIn = null;
       DataOutputStream out = null;
       
       try {
-        File blockFile = data.getBlockFile( block );
-        File checksumFile = FSDataset.getMetaFile( blockFile );
-        checksumIn = new FileInputStream(checksumFile);
 
-        long fileSize = checksumFile.length();
+        checksumIn = data.getMetaDataInputStream(block);
+        
+        long fileSize = checksumIn.getLength();
+
         if (fileSize >= 1L<<31 || fileSize <= 0) {
-          throw new IOException("Unexpected size for checksumFile " +
-                                checksumFile);
+            throw new IOException("Unexpected size for checksumFile of block" +
+                    block);
         }
 
         byte [] buf = new byte[(int)fileSize];
@@ -1243,20 +1271,17 @@ public class DataNode implements FSConstants, Runnable {
 
     BlockSender(Block block, long startOffset, long length,
         boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException {
-      RandomAccessFile blockInFile = null;
 
       try {
         this.block = block;
         this.chunkOffsetOK = chunkOffsetOK;
         this.corruptChecksumOk = corruptChecksumOk;
-        File blockFile = data.getBlockFile(block);
-        blockInFile = new RandomAccessFile(blockFile, "r");
 
-        File checksumFile = FSDataset.getMetaFile(blockFile);
 
-        if (!corruptChecksumOk || checksumFile.exists()) {
-          checksumIn = new DataInputStream(new BufferedInputStream(
-              new FileInputStream(checksumFile), BUFFER_SIZE));
+        if ( !corruptChecksumOk || data.metaFileExists(block) ) {
+          checksumIn = new DataInputStream(
+                  new BufferedInputStream(data.getMetaDataInputStream(block),
+                                          BUFFER_SIZE));
 
           // read and handle the common header here. For now just a version
           short version = checksumIn.readShort();
@@ -1284,7 +1309,7 @@ public class DataNode implements FSConstants, Runnable {
             || (length + startOffset) > endOffset) {
           String msg = " Offset " + startOffset + " and length " + length
           + " don't match block " + block + " ( blockLen " + endOffset + " )";
-          LOG.warn("sendBlock() : " + msg);
+          LOG.warn(dnRegistration + ":sendBlock() : " + msg);
           throw new IOException(msg);
         }
 
@@ -1304,18 +1329,17 @@ public class DataNode implements FSConstants, Runnable {
         // seek to the right offsets
         if (offset > 0) {
           long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
-          blockInFile.seek(offset);
+          // note blockInStream is  seeked when created below
           if (checksumSkip > 0) {
             // Should we use seek() for checksum file as well?
             IOUtils.skipFully(checksumIn, checksumSkip);
           }
         }
-
-        blockIn = new DataInputStream(new BufferedInputStream(
-            new FileInputStream(blockInFile.getFD()), BUFFER_SIZE));
+        InputStream blockInStream = data.getBlockInputStream(block, offset); // seek to offset
+        blockIn = new DataInputStream(new BufferedInputStream(blockInStream, BUFFER_SIZE));
       } catch (IOException ioe) {
         IOUtils.closeStream(this);
-        IOUtils.closeStream(blockInFile);
+        IOUtils.closeStream(blockIn);
         throw ioe;
       }
     }
@@ -1533,7 +1557,7 @@ public class DataNode implements FSConstants, Runnable {
           mirrorOut.writeInt(len);
           mirrorOut.write(buf, 0, len + checksumSize);
         } catch (IOException ioe) {
-          LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
+          LOG.info(dnRegistration + ":Exception writing to mirror " + mirrorAddr + "\n"
               + StringUtils.stringifyException(ioe));
           //
           // If stream-copy fails, continue
@@ -1659,13 +1683,14 @@ public class DataNode implements FSConstants, Runnable {
         }
         // send data & checksum
         blockSender.sendBlock(out, null);
+
         
         // check the response
         receiveResponse(sock);
+        LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
 
-        LOG.info("Transmitted block " + b + " to " + curTarget);
       } catch (IOException ie) {
-        LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()
+        LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
       } finally {
         IOUtils.closeStream(blockSender);
@@ -1684,7 +1709,7 @@ public class DataNode implements FSConstants, Runnable {
    * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
    */
   public void run() {
-    LOG.info("In DataNode.run, data = " + data);
+    LOG.info(dnRegistration + "In DataNode.run, data = " + data);
 
     // start dataXceiveServer
     dataXceiveServer.start();
@@ -1710,7 +1735,7 @@ public class DataNode implements FSConstants, Runnable {
     } catch (InterruptedException ie) {
     }
         
-    LOG.info("Finishing DataNode in: "+data);
+    LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
   }
     
   /** Start datanode daemon.
@@ -1903,6 +1928,26 @@ public class DataNode implements FSConstants, Runnable {
 
     return networkLoc.toString();
   }
+  
+  /**
+   * This methods  arranges for the data node to send the block report at the next heartbeat.
+   */
+  public void scheduleBlockReport() {
+    lastHeartbeat=0;
+    lastBlockReport=0;
+  }
+  
+  
+  /**
+   * This method is used for testing. 
+   * Examples are adding and deleting blocks directly.
+   * The most common usage will be when the data node's storage is similated.
+   * 
+   * @return the fsdataset that stores the blocks
+   */
+  public FSDatasetInterface getFSDataset() {
+    return data;
+  }
 
   /**
    */

+ 43 - 12
src/java/org/apache/hadoop/dfs/FSDataset.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.*;
  * has a unique name and an extent on disk.
  *
  ***************************************************/
-class FSDataset implements FSConstants {
+class FSDataset implements FSConstants, FSDatasetInterface {
 
 
   /**
@@ -447,6 +447,7 @@ class FSDataset implements FSConstants {
       return sb.toString();
     }
   }
+  
   //////////////////////////////////////////////////////
   //
   // FSDataSet
@@ -457,9 +458,29 @@ class FSDataset implements FSConstants {
   public static final String METADATA_EXTENSION = ".meta";
   public static final short METADATA_VERSION = 1;
     
-  public static File getMetaFile( File f ) {
+  protected static File getMetaFile( File f ) {
     return new File( f.getAbsolutePath() + METADATA_EXTENSION );
   }
+  
+  protected File getMetaFile(Block b) throws IOException {
+    File blockFile = getBlockFile( b );
+    return new File( blockFile.getAbsolutePath() + METADATA_EXTENSION ); 
+  }
+  public boolean metaFileExists(Block b) throws IOException {
+    return getMetaFile(b).exists();
+  }
+  
+  public long getMetaDataLength(Block b) throws IOException {
+    File checksumFile = getMetaFile( b );
+  return checksumFile.length();
+  }
+
+  public MetaDataInputStream getMetaDataInputStream(Block b)
+      throws IOException {
+    File checksumFile = getMetaFile( b );
+    return new MetaDataInputStream(new FileInputStream(checksumFile),
+                                                    checksumFile.length());
+  }
     
   FSVolumeSet volumes;
   private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
@@ -523,21 +544,31 @@ class FSDataset implements FSConstants {
   /**
    * Get File name for a given block.
    */
-  public synchronized File getBlockFile(Block b) throws IOException {
+  protected synchronized File getBlockFile(Block b) throws IOException {
     if (!isValidBlock(b)) {
       throw new IOException("Block " + b + " is not valid.");
     }
     return getFile(b);
   }
+  
+  public synchronized InputStream getBlockInputStream(Block b) throws IOException {
+    return new FileInputStream(getBlockFile(b));
+  }
 
-  static class BlockWriteStreams {
-    OutputStream dataOut;
-    OutputStream checksumOut;
-    
-    BlockWriteStreams( File f ) throws IOException {
-      dataOut = new FileOutputStream( f );
-      checksumOut = new FileOutputStream( getMetaFile( f ) );
+  public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
+
+    File blockFile = getBlockFile(b);
+    RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
+    if (seekOffset > 0) {
+      blockInFile.seek(seekOffset);
     }
+    return new FileInputStream(blockInFile.getFD());
+  }
+    
+  BlockWriteStreams createBlockWriteStreams( File f ) throws IOException {
+      return new BlockWriteStreams(new FileOutputStream(f),
+          new FileOutputStream( getMetaFile( f ) ));
+
   }
   
   /**
@@ -590,7 +621,7 @@ class FSDataset implements FSConstants {
     // REMIND - mjc - make this a filter stream that enforces a max
     // block size, so clients can't go crazy
     //
-    return new BlockWriteStreams( f );
+    return createBlockWriteStreams( f );
   }
 
   File createTmpFile( FSVolume vol, Block blk ) throws IOException {
@@ -731,7 +762,7 @@ class FSDataset implements FSConstants {
    * check if a data diretory is healthy
    * @throws DiskErrorException
    */
-  void checkDataDir() throws DiskErrorException {
+  public void checkDataDir() throws DiskErrorException {
     volumes.checkDirs();
   }
     

+ 206 - 0
src/java/org/apache/hadoop/dfs/FSDatasetInterface.java

@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+
+
+
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * This is an interface for the underlying storage that stores blocks for
+ * a data node. 
+ * Examples are the FSDataset (which stores blocks on dirs)  and 
+ * SimulatedFSDataset (which simulates data).
+ *
+ */
+public interface FSDatasetInterface {
+  
+  
+  /**
+   * Returns the length of the metadata file of the specified block
+   * @param b - the block for which the metadata length is desired
+   * @return the length of the metadata file for the specified block.
+   * @throws IOException
+   */
+  public long getMetaDataLength(Block b) throws IOException;
+  
+  /**
+   * This class provides the input stream and length of the metadata
+   * of a block
+   *
+   */
+  static class MetaDataInputStream extends java.io.InputStream {
+    MetaDataInputStream(InputStream stream, long len) {
+      inStream = stream;
+      length = len;
+    }
+    private InputStream inStream;
+    private long length;
+    
+    @Override
+    public int read() throws IOException {
+      return inStream.read();
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException { 
+      return inStream.read(b);
+    }
+    
+    public long getLength() {
+      return length;
+    }
+  }
+  
+  /**
+   * Returns metaData of block b as an input stream (and its length)
+   * @param b - the block
+   * @return the metadata input stream; 
+   * @throws IOException
+   */
+  public MetaDataInputStream getMetaDataInputStream(Block b)
+        throws IOException;
+  
+  /**
+   * Does the meta file exist for this block?
+   * @param b - the block
+   * @return true of the metafile for specified block exits
+   * @throws IOException
+   */
+  public boolean metaFileExists(Block b) throws IOException;
+    
+  /**
+   * Returns the total space (in bytes) used by dfs datanode
+   * @return  the total space used by dfs datanode
+   * @throws IOException
+   */  
+  public long getDfsUsed() throws IOException;
+    
+  /**
+   * Returns total capacity (in bytes) of storage (used and unused)
+   * @return  total capacity of storage (used and unused)
+   * @throws IOException
+   */
+  public long getCapacity() throws IOException;
+
+  /**
+   * Returns the amount of free storage space (in bytes)
+   * @return The amount of free storage space
+   * @throws IOException
+   */
+  public long getRemaining() throws IOException;
+
+  /**
+   * Returns the specified block's on-disk length (excluding metadata)
+   * @param b
+   * @return   the specified block's on-disk length (excluding metadta)
+   * @throws IOException
+   */
+  public long getLength(Block b) throws IOException;
+     
+  /**
+   * Returns an input stream to read the contents of the specified block
+   * @param b
+   * @return an input stream to read the contents of the specified block
+   * @throws IOException
+   */
+  public InputStream getBlockInputStream(Block b) throws IOException;
+  
+  /**
+   * Returns an input stream at specified offset of the specified block
+   * @param b
+   * @param seekOffset
+   * @return an input stream to read the contents of the specified block,
+   *  starting at the offset
+   * @throws IOException
+   */
+  public InputStream getBlockInputStream(Block b, long seekOffset)
+            throws IOException;
+
+     /**
+      * 
+      * This class contains the output streams for the data and checksum
+      * of a block
+      *
+      */
+     static class BlockWriteStreams {
+      OutputStream dataOut;
+      OutputStream checksumOut;
+      BlockWriteStreams(OutputStream dOut, OutputStream cOut) {
+        dataOut = dOut;
+        checksumOut = cOut;
+      }
+      
+    }
+    
+  /**
+   * Creates the block and returns output streams to write data and CRC
+   * @param b
+   * @return a BlockWriteStreams object to allow writing the block data
+   *  and CRC
+   * @throws IOException
+   */
+  public BlockWriteStreams writeToBlock(Block b) throws IOException;
+
+  /**
+   * Finalizes the block previously opened for writing using writeToBlock.
+   * The block size is what is in the parameter b and it must match the amount
+   *  of data written
+   * @param b
+   * @throws IOException
+   */
+  public void finalizeBlock(Block b) throws IOException;
+
+  /**
+   * Returns the block report - the full list of blocks stored
+   * @return - the block report - the full list of blocks stored
+   */
+  public Block[] getBlockReport();
+
+  /**
+   * Is the block valid?
+   * @param b
+   * @return - true if the specified block is valid
+   */
+  public boolean isValidBlock(Block b);
+
+  /**
+   * Invalidates the specified blocks
+   * @param invalidBlks - the blocks to be invalidated
+   * @throws IOException
+   */
+  public void invalidate(Block invalidBlks[]) throws IOException;
+
+    /**
+     * Check if all the data directories are healthy
+     * @throws DiskErrorException
+     */
+  public void checkDataDir() throws DiskErrorException;
+      
+    /**
+     * Stringifies the name of the storage
+     */
+  public String toString();
+
+}

+ 2 - 0
src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java

@@ -88,6 +88,7 @@ class PendingReplicationBlocks {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
+      	FSNamesystem.LOG.debug("Removing pending replication for block" + block);
         found.decrementReplicas();
         if (found.getNumReplicas() <= 0) {
           pendingReplications.remove(block);
@@ -196,6 +197,7 @@ class PendingReplicationBlocks {
       synchronized (pendingReplications) {
         Iterator iter = pendingReplications.entrySet().iterator();
         long now = FSNamesystem.now();
+        FSNamesystem.LOG.debug("PendingReplicationMonitor checking Q");
         while (iter.hasNext()) {
           Map.Entry entry = (Map.Entry) iter.next();
           PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue();

+ 1 - 1
src/test/org/apache/hadoop/dfs/ClusterTestDFS.java

@@ -415,7 +415,7 @@ public class ClusterTestDFS extends TestCase implements FSConstants {
     // if this fails, the delete did not propagate because either
     //   awaitQuiescence() returned before the disk images were removed
     //   or a real failure was detected.
-    assertTrue(" data dir not empty: " + dn.data.volumes,
+    assertTrue(" data dir not empty: " + dn.data,
                blocks.length==0);
   }
 

+ 137 - 0
src/test/org/apache/hadoop/dfs/DataNodeCluster.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.DNS;
+
+
+/**
+ * 
+ * This program starts a mini cluster of data nodes
+ *  (ie a mini cluster without the name node), all within one address space.
+ *  It is assumed that the name node has been started separately prior
+ *  to running this program.
+ *  
+ *  A use case of this is to run a real name node with a large number of
+ *  simulated data nodes for say a NN benchmark.
+ *
+ */
+
+public class DataNodeCluster {
+  
+  public static void main(String[] args) {
+    int numDataNodes = 0;
+    int numRacks = 0;
+    
+    Configuration conf = new Configuration();
+    String usage =
+     "Usage: datanodecluster " +
+     " -n <numDataNodes> " + 
+     " [-r <numRacks>] " +
+     " [-simulated] " +
+     "\n" + 
+     "  If -r not specified then default rack is used for all Data Nodes\n" +
+     "  Data nodes are simulated if -simulated OR conf file specifies simulated\n";
+    
+    for (int i = 0; i < args.length; i++) { // parse command line
+      if (args[i].equals("-n")) {
+        numDataNodes = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-r")) {
+        numRacks = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-simulated")) {
+        conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      } else {
+        System.out.println(usage);
+        System.exit(-1);
+      }
+    }
+    if (numDataNodes <= 0) {
+      System.out.println(usage);
+      System.exit(-1);
+    }
+    String nameNodeAdr = conf.get("fs.default.name");
+    if (nameNodeAdr == null) {
+      System.out.println("No name node address and port in config");
+      System.exit(-1);
+    }
+    System.out.println("Starting " + numDataNodes + 
+          " Data Nodes that will connect to Name Node at " + nameNodeAdr);
+  
+
+    MiniDFSCluster mc = new MiniDFSCluster();
+    try {
+      mc.formatDataNodeDirs();
+    } catch (IOException e) {
+      System.out.println("Error formating data node dirs:" + e);
+    }
+    String[] racks = null;
+    String[] rack4DataNode = null;
+    if (numRacks > 0) {
+      System.out.println("Using " + numRacks + " racks: ");
+      String rackPrefix = getUniqueRackPrefix();
+
+      rack4DataNode = new String[numDataNodes];
+      for (int i = 0; i < numDataNodes; ++i ) {
+        //rack4DataNode[i] = racks[i%numRacks];
+        rack4DataNode[i] = rackPrefix + "-" + i%numRacks;
+        System.out.println("Data Node " + i + " using " + rack4DataNode[i]);
+        
+        
+      }
+    }
+    try {
+      mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR,
+          rack4DataNode);
+    } catch (IOException e) {
+      System.out.println("Error creating data node:" + e);
+    }  
+  }
+
+  /*
+   * There is high probability that the rack id generated here will 
+   * not conflict with those of other data node cluster.
+   * Not perfect but mostly unique rack ids are good enough
+   */
+  static private String getUniqueRackPrefix() {
+  
+    String ip = "unknownIP";
+    try {
+      ip = DNS.getDefaultIP("default");
+    } catch (UnknownHostException ignored) {
+      System.out.println("Could not find ip address of \"default\" inteface.");
+    }
+    
+    int rand = 0;
+    try {
+      rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
+    } catch (NoSuchAlgorithmException e) {
+      rand = (new Random()).nextInt(Integer.MAX_VALUE);
+    }
+    return "/Rack-" + rand + "-"+ ip  + "-" + 
+                      System.currentTimeMillis();
+  }
+}

+ 171 - 12
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -32,7 +32,8 @@ import org.apache.hadoop.util.ToolRunner;
 
 /**
  * This class creates a single-process DFS cluster for junit testing.
- * The data directories for DFS are undering the testing directory.
+ * The data directories for non-simulated DFS are under the testing directory.
+ * For simulated data nodes, no underlying fs storage is used.
  */
 public class MiniDFSCluster {
 
@@ -42,6 +43,14 @@ public class MiniDFSCluster {
   private File base_dir;
   private File data_dir;
   
+  
+  /**
+   * This null constructor is used only when wishing to start a data node cluster
+   * without a name node (ie when the name node is started elsewhere).
+   */
+  public MiniDFSCluster() {
+  }
+  
   /**
    * Modify the config and start up the servers with the given operation.
    * Servers will be started on free ports.
@@ -80,10 +89,10 @@ public class MiniDFSCluster {
                         String[] racks) throws IOException {
     this(0, conf, numDataNodes, format, true, null, racks);
   }
-
+  
   /**
-   * NOTE: if possible, the other constructors should be used as they will
-   * ensure that the servers use free ports.
+   * NOTE: if possible, the other constructors that don't have nameNode port 
+   * parameter should be used as they will ensure that the servers use free ports.
    * <p>
    * Modify the config and start up the servers.  
    * 
@@ -106,6 +115,37 @@ public class MiniDFSCluster {
                         boolean manageDfsDirs,
                         StartupOption operation,
                         String[] racks) throws IOException {
+    this(0, conf, numDataNodes, format, manageDfsDirs, operation, racks, null);
+ 
+  }
+
+  /**
+   * NOTE: if possible, the other constructors that don't have nameNode port 
+   * parameter should be used as they will ensure that the servers use free ports.
+   * <p>
+   * Modify the config and start up the servers.  
+   * 
+   * @param nameNodePort suggestion for which rpc port to use.  caller should
+   *          use getNameNodePort() to get the actual port used.
+   * @param conf the base configuration to use in starting the servers.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @param manageDfsDirs if true, the data directories for servers will be
+   *          created and dfs.name.dir and dfs.data.dir will be set in the conf
+   * @param operation the operation with which to start the servers.  If null
+   *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param simulatedCapacities array of capacities of the simulated data nodes
+   */
+  public MiniDFSCluster(int nameNodePort, 
+                        Configuration conf,
+                        int numDataNodes,
+                        boolean format,
+                        boolean manageDfsDirs,
+                        StartupOption operation,
+                        String[] racks,
+                        long[] simulatedCapacities) throws IOException {
     this.conf = conf;
     base_dir = new File(System.getProperty("test.build.data"), "dfs/");
     data_dir = new File(base_dir, "data");
@@ -139,7 +179,7 @@ public class MiniDFSCluster {
     nameNode = NameNode.createNameNode(args, conf);
     
     // Start the DataNodes
-    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks);
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, simulatedCapacities);
     
     if (numDataNodes > 0) {
       while (!isClusterUp()) {
@@ -155,6 +195,12 @@ public class MiniDFSCluster {
   /**
    * Modify the config and start up additional DataNodes.  The info port for
    * DataNodes is guaranteed to use a free port.
+   *  
+   *  Data nodes can run with the name node in the mini cluster or
+   *  a real name node. For example, running with a real name node is useful
+   *  when running simulated data nodes with a real name node.
+   *  If minicluster's name node is null assume that the conf has been
+   *  set with the right address:port of the name node.
    *
    * @param conf the base configuration to use in starting the DataNodes.  This
    *          will be modified as necessary.
@@ -164,14 +210,23 @@ public class MiniDFSCluster {
    * @param operation the operation with which to start the DataNodes.  If null
    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
    * @param racks array of strings indicating the rack that each DataNode is on
+   * @param simulatedCapacities array of capacities of the simulated data nodes
    *
    * @throws IllegalStateException if NameNode has been shutdown
    */
   public void startDataNodes(Configuration conf, int numDataNodes, 
                              boolean manageDfsDirs, StartupOption operation, 
-                             String[] racks) throws IOException {
-    if (nameNode == null) {
-      throw new IllegalStateException("NameNode is not running");
+                             String[] racks,
+                             long[] simulatedCapacities) throws IOException {
+
+    // If minicluster's name node is null assume that the conf has been
+    // set with the right address:port of the name node.
+    //
+    if (nameNode != null) { // set conf from the name node
+      InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 
+      int nameNodePort = nnAddr.getPort(); 
+      conf.set("fs.default.name", 
+               nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
     }
     
     if (racks != null && numDataNodes > racks.length ) {
@@ -181,10 +236,6 @@ public class MiniDFSCluster {
 
     // Set up the right ports for the datanodes
     conf.setInt("dfs.datanode.info.port", 0);
-    InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 
-    int nameNodePort = nnAddr.getPort(); 
-    conf.set("fs.default.name", 
-             nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
     
     String[] args = (operation == null ||
                      operation == StartupOption.FORMAT ||
@@ -209,12 +260,44 @@ public class MiniDFSCluster {
       if (racks != null) {
         dnConf.set("dfs.datanode.rack", racks[i-curDatanodesNum]);
       }
+      if (simulatedCapacities != null) {
+        dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
+      }
+      if (simulatedCapacities != null && i < simulatedCapacities.length) {
+        dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, simulatedCapacities[i]);
+      }
       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
                          + dnConf.get("dfs.data.dir"));
       dataNodes.add(DataNode.createDataNode(dnArgs, dnConf));
     }
   }
   
+  
+  
+  /**
+   * Modify the config and start up the DataNodes.  The info port for
+   * DataNodes is guaranteed to use a free port.
+   *
+   * @param conf the base configuration to use in starting the DataNodes.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param manageDfsDirs if true, the data directories for DataNodes will be
+   *          created and dfs.data.dir will be set in the conf
+   * @param operation the operation with which to start the DataNodes.  If null
+   *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param simulatedCapacities array of capacities of the simulated data nodes
+   *
+   * @throws IllegalStateException if NameNode has been shutdown
+   */
+  
+  public void startDataNodes(Configuration conf, int numDataNodes, 
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks
+      ) throws IOException {
+    startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null);
+  }
+  
   /**
    * If the NameNode is running, attempt to finalize a previous upgrade.
    * When this method return, the NameNode should be finalized, but
@@ -339,4 +422,80 @@ public class MiniDFSCluster {
 
     client.close();
   }
+  
+  public void formatDataNodeDirs() throws IOException {
+    base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+    data_dir = new File(base_dir, "data");
+    if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
+      throw new IOException("Cannot remove data directory: " + data_dir);
+    }
+  }
+  
+  /**
+   * 
+   * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
+   * @return the block report for the specified data node
+   */
+  public Block[] getBlockReport(int dataNodeIndex) {
+    if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
+      throw new IndexOutOfBoundsException();
+    }
+    return dataNodes.get(dataNodeIndex).getFSDataset().getBlockReport();
+  }
+  
+  
+  /**
+   * 
+   * @return block reports from all data nodes
+   *    Block[] is indexed in the same order as the list of datanodes returned by getDataNodes()
+   */
+  public Block[][] getAllBlockReports() {
+    int numDataNodes = dataNodes.size();
+    Block[][] result = new Block[numDataNodes][];
+    for (int i = 0; i < numDataNodes; ++i) {
+     result[i] = getBlockReport(i);
+    }
+    return result;
+  }
+  
+  
+  /**
+   * This method is valid only if the the data nodes have simulated data
+   * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes()
+   * @param blocksToInject - the blocks
+   * @throws IOException
+   *              if not simulatedFSDataset
+   *             if any of blocks already exist in the data node
+   *   
+   */
+  public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException {
+    if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
+      throw new IndexOutOfBoundsException();
+    }
+    FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).getFSDataset();
+    if (!(dataSet instanceof SimulatedFSDataset)) {
+      throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
+    }
+    SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
+    sdataset.injectBlocks(blocksToInject);
+    dataNodes.get(dataNodeIndex).scheduleBlockReport();
+  }
+  
+  /**
+   * This method is valid only if the the data nodes have simulated data
+   * @param blocksToInject - blocksToInject[] is indexed in the same order as the list 
+   *             of datanodes returned by getDataNodes()
+   * @throws IOException
+   *             if not simulatedFSDataset
+   *             if any of blocks already exist in the data nodes
+   *             Note the rest of the blocks are not injected.
+   */
+  public void injectBlocks(Block[][] blocksToInject) throws IOException {
+    if (blocksToInject.length >  dataNodes.size()) {
+      throw new IndexOutOfBoundsException();
+    }
+    for (int i = 0; i < blocksToInject.length; ++i) {
+     injectBlocks(i, blocksToInject[i]);
+    }
+  }
 }

+ 498 - 0
src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java

@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * This class implements a simulated FSDataset.
+ * 
+ * Blocks that are created are recorded but their data (plus their CRCs) are
+ *  discarded.
+ * Fixed data is returned when blocks are read; a null CRC meta file is
+ * created for such data.
+ * 
+ * This FSDataset does not remember any block information across its
+ * restarts; it does however offer an operation to inject blocks
+ *  (See the TestInectionForSImulatedStorage()
+ * for a usage example of injection.
+ * 
+ * Note the synchronization is coarse grained - it is at each method. 
+ */
+
+public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Configurable{
+  
+  public static final String CONFIG_PROPERTY_SIMULATED =
+                                    "dfs.datanode.simulateddatastorage";
+  public static final String CONFIG_PROPERTY_CAPACITY =
+                            "dfs.datanode.simulateddatastorage.capacity";
+  
+  public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
+  public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte
+  byte simulatedDataByte = DEFAULT_DATABYTE;
+  Configuration conf = null;
+  
+  static byte[] nullCrcFileData;
+  {
+    DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
+                              CHECKSUM_NULL, 16*1024 );
+    byte[] nullCrcHeader = checksum.getHeader();
+    nullCrcFileData =  new byte[2 + nullCrcHeader.length];
+    nullCrcFileData[0] = (byte) ((FSDataset.METADATA_VERSION >>> 8) & 0xff);
+    nullCrcFileData[1] = (byte) (FSDataset.METADATA_VERSION & 0xff);
+    for (int i = 0; i < nullCrcHeader.length; i++) {
+      nullCrcFileData[i+2] = nullCrcHeader[i];
+    }
+  }
+  
+  private class BInfo { // information about a single block
+    Block theBlock;
+    private boolean finalized = false; // if not finalized => ongoing creation
+    SimulatedOutputStream oStream = null;
+    BInfo(Block b, boolean forWriting) throws IOException {
+      theBlock = new Block(b);
+      if (theBlock.len < 0) {
+        theBlock.len = 0;
+      }
+      if (!storage.alloc(theBlock.len)) { // expected length - actual length may
+                                          // be more - we find out at finalize
+        DataNode.LOG.warn("Lack of free storage on a block alloc");
+        throw new IOException("Creating block, no free space available");
+      }
+
+      if (forWriting) {
+        finalized = false;
+        oStream = new SimulatedOutputStream();
+      } else {
+        finalized = true;
+        oStream = null;
+      }
+    }
+    
+    synchronized long getlength() {
+      if (!finalized) {
+         return oStream.getLength();
+      } else {
+        return theBlock.len;
+      }
+    }
+    
+    synchronized SimulatedInputStream getIStream() throws IOException {
+      if (!finalized) {
+        // throw new IOException("Trying to read an unfinalized block");
+         return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE);
+      } else {
+        return new SimulatedInputStream(theBlock.len, DEFAULT_DATABYTE);
+      }
+    }
+    
+    synchronized void finalizeBlock(long finalSize) throws IOException {
+      if (finalized) {
+        throw new IOException(
+            "Finalizing a block that has already been finalized" + 
+            theBlock.blkid);
+      }
+      if (oStream == null) {
+        DataNode.LOG.error("Null oStream on unfinalized block - bug");
+        throw new IOException("Unexpected error on finalize");
+      }
+
+      if (oStream.getLength() != finalSize) {
+        DataNode.LOG.warn("Size passed to finalize (" + finalSize +
+                    ")does not match what was written:" + oStream.getLength());
+        throw new IOException(
+          "Size passed to finalize does not match the amount of data written");
+      }
+      // We had allocated the expected length when block was created; 
+      // adjust if necessary
+      long extraLen = finalSize - theBlock.len;
+      if (extraLen > 0) {
+        if (!storage.alloc(extraLen)) {
+          DataNode.LOG.warn("Lack of free storage on a block alloc");
+          throw new IOException("Creating block, no free space available");
+        }
+      } else {
+        storage.free(extraLen);
+      }
+      theBlock.len = finalSize;  
+
+      finalized = true;
+      oStream = null;
+      return;
+    }
+    
+    SimulatedInputStream getMetaIStream() {
+      return new SimulatedInputStream(nullCrcFileData);  
+    }
+  }
+  
+  static private class SimulatedStorage {
+    private long capacity;  // in bytes
+    private long used;    // in bytes
+    
+    synchronized long getFree() {
+      return capacity - used;
+    }
+    
+    synchronized long getCapacity() {
+      return capacity;
+    }
+    
+    synchronized long getUsed() {
+      return used;
+    }
+    
+    synchronized boolean alloc(long amount) {
+      if (getFree() >= amount) {
+        used += amount;
+        return true;
+      } else {
+        return false;    
+      }
+    }
+    
+    synchronized void free(long amount) {
+      used -= amount;
+    }
+    
+    SimulatedStorage(long cap) {
+      capacity = cap;
+      used = 0;   
+    }
+  }
+  
+  private HashMap<Block, BInfo> blockMap = null;
+  private SimulatedStorage storage = null;
+  
+  public SimulatedFSDataset(Configuration conf) throws IOException {
+    setConf(conf);
+  }
+  
+  private SimulatedFSDataset() { // real construction when setConf called.. Uggg
+  }
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration iconf)  {
+    conf = iconf;
+    storage = new SimulatedStorage(
+        conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
+    //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + 
+    //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
+
+    blockMap = new HashMap<Block,BInfo>(); 
+  }
+
+  public synchronized void injectBlocks(Block[] injectBlocks)
+                                            throws IOException {
+    if (injectBlocks != null) {
+      for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
+        if (b == null) {
+          throw new NullPointerException("Null blocks in block list");
+        }
+        if (isValidBlock(b)) {
+          throw new IOException("Block already exists in  block list");
+        }
+      }
+      
+      blockMap = new HashMap<Block,BInfo>(injectBlocks.length);
+      for (Block b: injectBlocks) {
+          BInfo binfo = new BInfo(b, false);
+          blockMap.put(b, binfo);
+      }
+    }
+  }
+
+  public synchronized void finalizeBlock(Block b) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("Finalizing a non existing block " + b);
+    }
+    binfo.finalizeBlock(b.getNumBytes());
+
+  }
+
+  public synchronized Block[] getBlockReport() {
+    Block[] blockTable = new Block[blockMap.size()];
+    int i = 0;
+    for (Block b: blockMap.keySet()) {  
+      blockTable[i++] = blockMap.get(b).theBlock;
+    }
+    return blockTable;
+  }
+
+  public long getCapacity() throws IOException {
+    return storage.getCapacity();
+  }
+
+  public long getDfsUsed() throws IOException {
+    return storage.getUsed();
+  }
+
+  public long getRemaining() throws IOException {
+    return storage.getFree();
+  }
+
+  public synchronized long getLength(Block b) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("Finalizing a non existing block " + b);
+    }
+    return binfo.getlength();
+  }
+
+  public synchronized void invalidate(Block[] invalidBlks) throws IOException {
+    boolean error = false;
+    if (invalidBlks == null) {
+      return;
+    }
+    for (Block b: invalidBlks) {
+      if (b == null) {
+        continue;
+      }
+      BInfo binfo = blockMap.get(b);
+      if (binfo == null) {
+        error = true;
+        DataNode.LOG.warn("Invalidate: Missing block");
+        continue;
+      }
+      storage.free(binfo.getlength());
+      blockMap.remove(b);
+    }
+      if (error) {
+          throw new IOException("Invalidate: Missing blocks.");
+      }
+  }
+
+  public synchronized boolean isValidBlock(Block b) {
+    return (blockMap.containsKey(b));
+  }
+
+  public String toString() {
+    return "Simulated FSDataset";
+  }
+
+  public synchronized BlockWriteStreams writeToBlock(Block b)
+                                            throws IOException {
+    if (isValidBlock(b)) {
+          throw new IOException("Block " + b + 
+              " is valid, and cannot be written to.");
+      }
+      BInfo binfo = new BInfo(b, true);
+      blockMap.put(b, binfo);
+      SimulatedOutputStream crcStream = new SimulatedOutputStream();
+      return new BlockWriteStreams(binfo.oStream, crcStream);
+  }
+
+  public synchronized InputStream getBlockInputStream(Block b)
+                                            throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("No such Block " + b );  
+    }
+    
+    //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
+    return binfo.getIStream();
+  }
+  
+  public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
+                              throws IOException {
+    InputStream result = getBlockInputStream(b);
+    result.skip(seekOffset);
+    return result;
+  }
+
+  /**
+   * Returns metaData of block b as an input stream
+   * @param b - the block for which the metadata is desired
+   * @return metaData of block b as an input stream
+   * @throws IOException - block does not exist or problems accessing
+   *  the meta file
+   */
+  private synchronized InputStream getMetaDataInStream(Block b)
+                                              throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("No such Block " + b );  
+    }
+    if (!binfo.finalized) {
+      throw new IOException("Block " + b + 
+          " is being written, its meta cannot be read");
+    }
+    return binfo.getMetaIStream();
+  }
+
+  public synchronized long getMetaDataLength(Block b) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("No such Block " + b );  
+    }
+    if (!binfo.finalized) {
+      throw new IOException("Block " + b +
+          " is being written, its metalength cannot be read");
+    }
+    return binfo.getMetaIStream().getLength();
+  }
+  
+  public MetaDataInputStream getMetaDataInputStream(Block b)
+  throws IOException {
+
+       return new MetaDataInputStream(getMetaDataInStream(b),
+                                                getMetaDataLength(b));
+  }
+
+  public synchronized boolean metaFileExists(Block b) throws IOException {
+    if (!isValidBlock(b)) {
+          throw new IOException("Block " + b +
+              " is valid, and cannot be written to.");
+      }
+    return true; // crc exists for all valid blocks
+  }
+
+  public void checkDataDir() throws DiskErrorException {
+    // nothing to check for simulated data set
+  }
+  
+  
+  /** 
+   * Simulated input and output streams
+   *
+   */
+  static private class SimulatedInputStream extends java.io.InputStream {
+    
+
+    byte theRepeatedData = 7;
+    long length; // bytes
+    int currentPos = 0;
+    byte[] data = null;
+    
+    /**
+     * An input stream of size l with repeated bytes
+     * @param l
+     * @param iRepeatedData
+     */
+    SimulatedInputStream(long l, byte iRepeatedData) {
+      length = l;
+      theRepeatedData = iRepeatedData;
+    }
+    
+    /**
+     * An input stream of of the supplied data
+     * 
+     * @param iData
+     */
+    SimulatedInputStream(byte[] iData) {
+      data = iData;
+      length = data.length;
+      
+    }
+    
+    /**
+     * 
+     * @return the lenght of the input stream
+     */
+    long getLength() {
+      return length;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (currentPos >= length)
+        return -1;
+      if (data !=null) {
+        return data[currentPos++];
+      } else {
+        currentPos++;
+        return theRepeatedData;
+      }
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException { 
+
+      if (b == null) {
+        throw new NullPointerException();
+      }
+      if (b.length == 0) {
+        return 0;
+      }
+      if (currentPos >= length) { // EOF
+        return -1;
+      }
+      int bytesRead = (int) Math.min(b.length, length-currentPos);
+      if (data != null) {
+        System.arraycopy(data, currentPos, b, 0, bytesRead);
+      } else { // all data is zero
+        for (int i : b) {  
+          b[i] = theRepeatedData;
+        }
+      }
+      currentPos += bytesRead;
+      return bytesRead;
+    }
+  }
+  
+  /**
+   * This class implements an output stream that merely throws its data away, but records its
+   * length.
+   *
+   */
+  static private class SimulatedOutputStream extends OutputStream {
+    long length = 0;
+    
+    /**
+     * constructor for Simulated Output Steram
+     */
+    SimulatedOutputStream() {
+    }
+    
+    /**
+     * 
+     * @return the lenght of the data created so far.
+     */
+    long getLength() {
+      return length;
+    }
+    
+    @Override
+    public void write(int arg0) throws IOException {
+      length++;
+    }
+    
+    @Override
+    public void write(byte[] b) throws IOException {
+      length += b.length;
+    }
+    
+    @Override
+    public void write(byte[] b,
+              int off,
+              int len) throws IOException  {
+      length += len;
+    }
+  }
+
+
+}

+ 33 - 5
src/test/org/apache/hadoop/dfs/TestFileCreation.java

@@ -38,6 +38,7 @@ public class TestFileCreation extends TestCase {
   static final int blockSize = 8192;
   static final int numBlocks = 2;
   static final int fileSize = numBlocks * blockSize + 1;
+  boolean simulatedStorage = false;
 
   // The test file is 2 times the blocksize plus one. This means that when the
   // entire file is written, the first two blocks definitely get flushed to
@@ -96,8 +97,14 @@ public class TestFileCreation extends TestCase {
     }
     FSDataInputStream stm = fileSys.open(name);
     byte[] expected = new byte[numBlocks * blockSize];
-    Random rand = new Random(seed);
-    rand.nextBytes(expected);
+    if (simulatedStorage) {
+      for (int i= 0; i < expected.length; i++) {  
+        expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
+      }
+    } else {
+      Random rand = new Random(seed);
+      rand.nextBytes(expected);
+    }
     // do a sanity check. Read the file
     byte[] actual = new byte[numBlocks * blockSize];
     stm.readFully(0, actual);
@@ -108,7 +115,7 @@ public class TestFileCreation extends TestCase {
     for (int idx = 0; idx < actual.length; idx++) {
       this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
                         expected[from+idx]+" actual "+actual[idx],
-                        actual[idx], expected[from+idx]);
+                        expected[from+idx], actual[idx]);
       actual[idx] = 0;
     }
   }
@@ -118,6 +125,9 @@ public class TestFileCreation extends TestCase {
    */
   public void testFileCreation() throws IOException {
     Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     try {
@@ -163,10 +173,28 @@ public class TestFileCreation extends TestCase {
       assertTrue(file1 + " should be of size " + fileSize +
                  " but found to be of size " + len, 
                   len == fileSize);
-
+      
+      
+      // Check storage usage 
+      // can't check capacities for real storage since the OS file system may be changing under us.
+      if (simulatedStorage) {
+        DataNode dn = cluster.getDataNodes().get(0);
+        assertEquals(fileSize, dn.getFSDataset().getDfsUsed());
+        assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining());
+      }
     } finally {
       fs.close();
       cluster.shutdown();
     }
   }
-}
+
+
+/**
+ * Test that file data becomes available before file is closed.
+ */
+  public void testFileCreationSimulated() throws IOException {
+    simulatedStorage = true;
+    testFileCreation();
+    simulatedStorage = false;
+  }
+}

+ 194 - 0
src/test/org/apache/hadoop/dfs/TestInjectionForSimulatedStorage.java

@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+import java.net.*;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * This class tests the replication and injection of blocks of a DFS file for simulated storage.
+ */
+public class TestInjectionForSimulatedStorage extends TestCase {
+  private int checksumSize = 16;
+  private int blockSize = checksumSize*2;
+  private int numBlocks = 4;
+  private int filesize = blockSize*numBlocks;
+  private int numDataNodes = 4;
+  private static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.dfs.TestInjectionForSimulatedStorage");
+
+  
+  private void writeFile(FileSystem fileSys, Path name, int repl)
+                                                throws IOException {
+    // create and write a file that contains three blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true,
+          fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                      (short)repl, (long)blockSize);
+    byte[] buffer = new byte[filesize];
+    for (int i=0; i<buffer.length; i++) {
+      buffer[i] = '1';
+    }
+    stm.write(buffer);
+    stm.close();
+  }
+  
+  // Waits for all of the blocks to have expected replication
+
+  // Waits for all of the blocks to have expected replication
+  private void waitForBlockReplication(String filename, 
+                                       ClientProtocol namenode,
+                                       int expected, long maxWaitSec) 
+                                       throws IOException {
+    long start = System.currentTimeMillis();
+    
+    //wait for all the blocks to be replicated;
+    LOG.info("Checking for block replication for " + filename);
+    
+    LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
+    assertEquals(numBlocks, blocks.locatedBlockCount());
+    
+    for (int i = 0; i < numBlocks; ++i) {
+      LOG.info("Checking for block:" + (i+1));
+      while (true) { // Loop to check for block i (usually when 0 is done all will be done
+        blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
+        assertEquals(numBlocks, blocks.locatedBlockCount());
+        LocatedBlock block = blocks.get(i);
+        int actual = block.getLocations().length;
+        if ( actual == expected ) {
+          LOG.info("Got enough replicas for " + (i+1) + "th block " + block.getBlock() +
+              ", got " + actual + ".");
+          break;
+        }
+        LOG.info("Not enough replicas for " + (i+1) + "th block " + block.getBlock() +
+                               " yet. Expecting " + expected + ", got " + 
+                               actual + ".");
+      
+        if (maxWaitSec > 0 && 
+            (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
+          throw new IOException("Timedout while waiting for all blocks to " +
+                                " be replicated for " + filename);
+        }
+      
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ignored) {}
+      }
+    }
+  }
+ 
+  
+  
+  /* This test makes sure that NameNode retries all the available blocks 
+   * for under replicated blocks. This test uses simulated storage and one
+   * of its features to inject blocks,
+   * 
+   * It creates a file with several blocks and replication of 4. 
+   * The cluster is then shut down - NN retains its state but the DNs are 
+   * all simulated and hence loose their blocks. 
+   * The blocks are then injected in one of the DNs. The  expected behaviour is
+   * that the NN will arrange for themissing replica will be copied from a valid source.
+   */
+  public void testInjection() throws IOException {
+    
+    MiniDFSCluster cluster = null;
+
+    String testFile = "/replication-test-file";
+    Path testPath = new Path(testFile);
+    
+    byte buffer[] = new byte[1024];
+    for (int i=0; i<buffer.length; i++) {
+      buffer[i] = '1';
+    }
+    
+    try {
+      Configuration conf = new Configuration();
+      conf.set("dfs.replication", Integer.toString(numDataNodes));
+      conf.setInt("io.bytes.per.checksum", checksumSize);
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      //first time format
+      cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
+                                   true, null, null);
+      cluster.waitActive();
+      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                            cluster.getNameNodePort()),
+                                            conf);
+      
+      writeFile(cluster.getFileSystem(), testPath, numDataNodes);
+
+      
+      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, 20);
+
+      
+      Block[][] blocksList = cluster.getAllBlockReports();
+                    
+      
+      cluster.shutdown();
+      cluster = null;
+      
+
+      
+      /* Start the MiniDFSCluster with more datanodes since once a writeBlock
+       * to a datanode node fails, same block can not be written to it
+       * immediately. In our case some replication attempts will fail.
+       */
+      
+      LOG.info("Restarting minicluster");
+      conf = new Configuration();
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      conf.set("dfs.safemode.threshold.pct", "0.0f"); 
+      
+      cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
+                                   true, null, null);
+      
+      Set<Block> uniqueBlocks = new HashSet<Block>();
+      for (int i=0; i<blocksList.length; ++i) {
+        for (int j=0; j < blocksList[i].length; ++j) {
+          uniqueBlocks.add(blocksList[i][j]);
+        }
+      }
+      // Insert all the blocks in the first data node
+      
+      LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
+      Block[] blocks = uniqueBlocks.toArray(new Block[uniqueBlocks.size()]);
+      cluster.injectBlocks(0, blocks);
+      
+      dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                  cluster.getNameNodePort()),
+                                  conf);
+      
+      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+      
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }  
+}

+ 19 - 2
src/test/org/apache/hadoop/dfs/TestPread.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 public class TestPread extends TestCase {
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 4096;
+  boolean simulatedStorage = false;
 
   private void writeFile(FileSystem fileSys, Path name) throws IOException {
     // create and write a file that contains three blocks of data
@@ -80,11 +81,18 @@ public class TestPread extends TestCase {
       nread += nbytes;
     }
   }
+  
   private void pReadFile(FileSystem fileSys, Path name) throws IOException {
     FSDataInputStream stm = fileSys.open(name);
     byte[] expected = new byte[(int)(12*blockSize)];
-    Random rand = new Random(seed);
-    rand.nextBytes(expected);
+    if (simulatedStorage) {
+      for (int i= 0; i < expected.length; i++) {  
+        expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
+      }
+    } else {
+      Random rand = new Random(seed);
+      rand.nextBytes(expected);
+    }
     // do a sanity check. Read first 4K bytes
     byte[] actual = new byte[4096];
     stm.readFully(actual);
@@ -156,6 +164,9 @@ public class TestPread extends TestCase {
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", 4096);
     conf.setLong("dfs.read.prefetch.size", 4096);
+    if (simulatedStorage) {
+      conf.setBoolean("dfs.datanode.simulateddatastorage", true);
+    }
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -169,6 +180,12 @@ public class TestPread extends TestCase {
     }
   }
   
+  public void testPreadDFSSimulated() throws IOException {
+    simulatedStorage = true;
+    testPreadDFS();
+    simulatedStorage = true;
+  }
+  
   /**
    * Tests positional read in LocalFS.
    */

+ 22 - 5
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -44,7 +44,7 @@ public class TestReplication extends TestCase {
   };
   private static final int numDatanodes = racks.length;
   private static final Log LOG = LogFactory.getLog(
-                                                   "org.apache.hadoop.dfs.TestReplication");
+                                       "org.apache.hadoop.dfs.TestReplication");
 
   private void writeFile(FileSystem fileSys, Path name, int repl)
     throws IOException {
@@ -133,9 +133,12 @@ public class TestReplication extends TestCase {
   /**
    * Tests replication in DFS.
    */
-  public void testReplication() throws IOException {
+  public void runReplication(boolean simulated) throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean("dfs.replication.considerLoad", false);
+    if (simulated) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, racks);
     cluster.waitActive();
     
@@ -168,6 +171,16 @@ public class TestReplication extends TestCase {
       cluster.shutdown();
     }
   }
+
+
+  public void testReplicationSimulatedStorag() throws IOException {
+    runReplication(true);
+  }
+  
+  
+  public void testReplication() throws IOException {
+    runReplication(false);
+  }
   
   // Waits for all of the blocks to have expected replication
   private void waitForBlockReplication(String filename, 
@@ -177,7 +190,7 @@ public class TestReplication extends TestCase {
     long start = System.currentTimeMillis();
     
     //wait for all the blocks to be replicated;
-    System.out.println("Checking for block replication for " + filename);
+    LOG.info("Checking for block replication for " + filename);
     int iters = 0;
     while (true) {
       boolean replOk = true;
@@ -190,7 +203,7 @@ public class TestReplication extends TestCase {
         int actual = block.getLocations().length;
         if ( actual < expected ) {
           if (true || iters > 0) {
-            System.out.println("Not enough replicas for " + block.getBlock() +
+            LOG.info("Not enough replicas for " + block.getBlock() +
                                " yet. Expecting " + expected + ", got " + 
                                actual + ".");
           }
@@ -272,13 +285,15 @@ public class TestReplication extends TestCase {
       int fileCount = 0;
       for (int i=0; i<6; i++) {
         File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
-        System.out.println("Checking for file " + blockFile);
+        LOG.info("Checking for file " + blockFile);
         
         if (blockFile.exists()) {
           if (fileCount == 0) {
+            LOG.info("Deleting file " + blockFile);
             assertTrue(blockFile.delete());
           } else {
             // corrupt it.
+            LOG.info("Corrupting file " + blockFile);
             long len = blockFile.length();
             assertTrue(len > 50);
             RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
@@ -294,6 +309,8 @@ public class TestReplication extends TestCase {
        * to a datanode node fails, same block can not be written to it
        * immediately. In our case some replication attempts will fail.
        */
+      
+      LOG.info("Restarting minicluster after deleting a replica and corrupting 2 crcs");
       conf = new Configuration();
       conf.set("dfs.replication", Integer.toString(numDataNodes));
       conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestSetrepDecreasing.java

@@ -23,6 +23,6 @@ import junit.framework.TestCase;
 
 public class TestSetrepDecreasing extends TestCase {
   public void testSetrepDecreasing() throws IOException {
-    TestSetrepIncreasing.setrep(5, 3);
+    TestSetrepIncreasing.setrep(5, 3, false);
   }
 }

+ 9 - 3
src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java

@@ -24,8 +24,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 
 public class TestSetrepIncreasing extends TestCase {
-  static void setrep(int fromREP, int toREP) throws IOException {
+  static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException {
     Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
     conf.set("dfs.replication", "" + fromREP);
     conf.setLong("dfs.blockreport.intervalMsec", 1000L);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 10, true, null);
@@ -63,6 +66,9 @@ public class TestSetrepIncreasing extends TestCase {
   }
 
   public void testSetrepIncreasing() throws IOException {
-    setrep(3, 7);
+    setrep(3, 7, false);
+  }
+  public void testSetrepIncreasingSimulatedStorage() throws IOException {
+    setrep(3, 7, true);
   }
-}
+}

+ 232 - 0
src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java

@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSDatasetInterface;
+import org.apache.hadoop.dfs.SimulatedFSDataset;
+
+/**
+ * this class tests the methods of the  SimulatedFSDataset.
+ *
+ */
+
+public class TestSimulatedFSDataset extends TestCase {
+  
+  Configuration conf = null;
+  
+  FSDatasetInterface fsdataset = null;
+  
+  static final int NUMBLOCKS = 20;
+  static final int BLOCK_LENGTH_MULTIPLIER = 79;
+
+  protected void setUp() throws Exception {
+    super.setUp();
+      conf = new Configuration();
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    fsdataset = new SimulatedFSDataset(conf);  
+  }
+
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+  
+  long blockIdToLen(long blkid) {
+    return blkid*BLOCK_LENGTH_MULTIPLIER;
+  }
+  
+  int addSomeBlocks() throws IOException {
+    int bytesAdded = 0;
+    for (int i = 1; i <= NUMBLOCKS; ++i) {
+      Block b = new Block(i, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
+      OutputStream dataOut  = fsdataset.writeToBlock(b).dataOut;
+      assertEquals(0, fsdataset.getLength(b));
+      for (int j=1; j <= blockIdToLen(i); ++j) {
+        dataOut.write(j);
+        assertEquals(j, fsdataset.getLength(b)); // correct length even as we write
+        bytesAdded++;
+      }
+      dataOut.close();
+      b.setNumBytes(blockIdToLen(i));
+      fsdataset.finalizeBlock(b);
+      assertEquals(blockIdToLen(i), fsdataset.getLength(b));
+    }
+    return bytesAdded;  
+  }
+
+  public void testGetMetaData() throws IOException {
+    Block b = new Block(1, 5);
+    try {
+      assertFalse(fsdataset.metaFileExists(b));
+      assertTrue("Expected an IO exception", false);
+    } catch (IOException e) {
+      // ok - as expected
+    }
+    addSomeBlocks(); // Only need to add one but ....
+    b = new Block(1, 0);
+    InputStream metaInput = fsdataset.getMetaDataInputStream(b);
+    DataInputStream metaDataInput = new DataInputStream(metaInput);
+    short version = metaDataInput.readShort();
+    assertEquals(FSDataset.METADATA_VERSION, version);
+    DataChecksum checksum = DataChecksum.newDataChecksum(metaDataInput);
+    assertEquals(DataChecksum.CHECKSUM_NULL, checksum.getChecksumType());
+    assertEquals(0, checksum.getChecksumSize());  
+  }
+
+
+  public void testStorageUsage() throws IOException {
+    assertEquals(fsdataset.getDfsUsed(), 0);
+    assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity());
+    int bytesAdded = addSomeBlocks();
+    assertEquals(bytesAdded, fsdataset.getDfsUsed());
+    assertEquals(fsdataset.getCapacity()-bytesAdded,  fsdataset.getRemaining());
+    
+  }
+
+
+
+  void  checkBlockDataAndSize(Block b, long expectedLen) throws IOException {
+    InputStream input = fsdataset.getBlockInputStream(b);
+    long lengthRead = 0;
+    int data;
+    while ((data = input.read()) != -1) {
+      assertEquals(SimulatedFSDataset.DEFAULT_DATABYTE, data);
+      lengthRead++;
+    }
+    assertEquals(expectedLen, lengthRead);
+  }
+  
+  public void testWriteRead() throws IOException {
+    addSomeBlocks();
+    for (int i=1; i <= NUMBLOCKS; ++i) {
+      Block b = new Block(i, 0);
+      assertTrue(fsdataset.isValidBlock(b));
+      assertEquals(blockIdToLen(i), fsdataset.getLength(b));
+      checkBlockDataAndSize(b, blockIdToLen(i));
+    }
+  }
+
+
+
+  public void testGetBlockReport() throws IOException {
+    Block[] blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.length);
+    int bytesAdded = addSomeBlocks();
+    blockReport = fsdataset.getBlockReport();
+    assertEquals(NUMBLOCKS, blockReport.length);
+    for (Block b: blockReport) {
+      assertNotNull(b);
+      assertEquals(blockIdToLen(b.blkid), b.len);
+    }
+    
+    // Inject blocks
+    // Now reset fsdataset with an initial block report (Use the blocks we got above)
+  
+   
+    SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf);
+    sfsdataset.injectBlocks(blockReport);
+    blockReport = sfsdataset.getBlockReport();
+    assertEquals(NUMBLOCKS, blockReport.length);
+    for (Block b: blockReport) {
+      assertNotNull(b);
+      assertEquals(blockIdToLen(b.blkid), b.len);
+      assertEquals(blockIdToLen(b.blkid), sfsdataset.getLength(b));
+    }
+    assertEquals(bytesAdded, sfsdataset.getDfsUsed());
+    assertEquals(sfsdataset.getCapacity()-bytesAdded,  sfsdataset.getRemaining());
+
+    
+    // Now test that the dataset cannot be created if it does not have sufficient cap
+
+    conf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, 10);
+ 
+    
+    try {
+      sfsdataset = new SimulatedFSDataset(conf);
+      sfsdataset.injectBlocks(blockReport);
+      assertTrue("Expected an IO exception", false);
+    } catch (IOException e) {
+      // ok - as expected
+    }
+
+  }
+
+  public void checkInvalidBlock(Block b) {
+    assertFalse(fsdataset.isValidBlock(b));
+    try {
+      fsdataset.getLength(b);
+      assertTrue("Expected an IO exception", false);
+    } catch (IOException e) {
+      // ok - as expected
+    }
+    
+    try {
+      fsdataset.getBlockInputStream(b);
+      assertTrue("Expected an IO exception", false);
+    } catch (IOException e) {
+      // ok - as expected
+    }
+    
+    try {
+      fsdataset.finalizeBlock(b);
+      assertTrue("Expected an IO exception", false);
+    } catch (IOException e) {
+      // ok - as expected
+    }
+    
+  }
+  
+  public void testInValidBlocks() throws IOException {
+    Block b = new Block(1, 5);
+    checkInvalidBlock(b);
+    
+    // Now check invlaid after adding some blocks
+    addSomeBlocks();
+    b = new Block(NUMBLOCKS + 99, 5);
+    checkInvalidBlock(b);
+    
+  }
+
+  public void testInvalidate() throws IOException {
+    int bytesAdded = addSomeBlocks();
+    Block[] deleteBlocks = new Block[2];
+    deleteBlocks[0] = new Block(1, 0);
+    deleteBlocks[1] = new Block(2, 0);
+    fsdataset.invalidate(deleteBlocks);
+    checkInvalidBlock(deleteBlocks[0]);
+    checkInvalidBlock(deleteBlocks[1]);
+    long sizeDeleted = blockIdToLen(1) + blockIdToLen(2);
+    assertEquals(bytesAdded-sizeDeleted, fsdataset.getDfsUsed());
+    assertEquals(fsdataset.getCapacity()-bytesAdded+sizeDeleted,  fsdataset.getRemaining());
+    
+    
+    
+    // Now make sure the rest of the blocks are valid
+    for (int i=3; i <= NUMBLOCKS; ++i) {
+      Block b = new Block(i, 0);
+      assertTrue(fsdataset.isValidBlock(b));
+    }
+  }
+
+}

+ 17 - 2
src/test/org/apache/hadoop/dfs/TestSmallBlock.java

@@ -34,6 +34,7 @@ public class TestSmallBlock extends TestCase {
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 1;
   static final int fileSize = 20;
+  boolean simulatedStorage = false;
 
   private void writeFile(FileSystem fileSys, Path name) throws IOException {
     // create and write a file that contains three blocks of data
@@ -61,8 +62,14 @@ public class TestSmallBlock extends TestCase {
     assertEquals("Number of blocks", fileSize, locations.length);
     FSDataInputStream stm = fileSys.open(name);
     byte[] expected = new byte[fileSize];
-    Random rand = new Random(seed);
-    rand.nextBytes(expected);
+    if (simulatedStorage) {
+      for (int i = 0; i < expected.length; ++i) {  
+        expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
+      }
+    } else {
+      Random rand = new Random(seed);
+      rand.nextBytes(expected);
+    }
     // do a sanity check. Read the file
     byte[] actual = new byte[fileSize];
     stm.readFully(0, actual);
@@ -81,6 +88,9 @@ public class TestSmallBlock extends TestCase {
    */
   public void testSmallBlock() throws IOException {
     Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean("dfs.datanode.simulateddatastorage", true);
+    }
     conf.set("io.bytes.per.checksum", "1");
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = cluster.getFileSystem();
@@ -94,4 +104,9 @@ public class TestSmallBlock extends TestCase {
       cluster.shutdown();
     }
   }
+  public void testSmallBlockSimulatedStorage() throws IOException {
+    simulatedStorage = true;
+    testSmallBlock();
+    simulatedStorage = false;
+  }
 }