瀏覽代碼

HADOOP-2012. Periodic data verification on Datanodes.
(Raghu Angadi via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@612610 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 年之前
父節點
當前提交
cf32115cea

+ 3 - 0
CHANGES.txt

@@ -53,6 +53,9 @@ Trunk (unreleased changes)
     "fs.trash.root" parameter is no longer used.  Full source paths
     "fs.trash.root" parameter is no longer used.  Full source paths
     are also no longer reproduced within the trash.
     are also no longer reproduced within the trash.
 
 
+    HADOOP-2012. Periodic data verification on Datanodes.
+    (Raghu Angadi via dhruba)
+
   NEW FEATURES
   NEW FEATURES
 
 
     HADOOP-1857.  Ability to run a script when a task fails to capture stack
     HADOOP-1857.  Ability to run a script when a task fails to capture stack

+ 21 - 0
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -838,6 +838,24 @@ class DFSClient implements FSConstants {
     int readAll(byte[] buf, int offset, int len) throws IOException {
     int readAll(byte[] buf, int offset, int len) throws IOException {
       return readFully(this, buf, offset, len);
       return readFully(this, buf, offset, len);
     }
     }
+    
+    /* When the reader reaches end of a block and there are no checksum
+     * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
+     * checksum was verified and there was no error.
+     */ 
+    void checksumOk(Socket sock) {
+      try {
+        OutputStream out = sock.getOutputStream();
+        byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
+                       (OP_STATUS_CHECKSUM_OK) & 0xff };
+        out.write(buf);
+        out.flush();
+      } catch (IOException e) {
+        // its ok not to be able to send this.
+        LOG.debug("Could not write to datanode " + sock.getInetAddress() +
+                  ": " + e.getMessage());
+      }
+    }
   }
   }
     
     
   /****************************************************************
   /****************************************************************
@@ -1135,6 +1153,9 @@ class DFSClient implements FSConstants {
             
             
             if (result >= 0) {
             if (result >= 0) {
               pos += result;
               pos += result;
+              if ( pos > blockEnd ) {
+                blockReader.checksumOk(s);
+              }
             } else {
             } else {
               // got a EOS from reader though we expect more data on it.
               // got a EOS from reader though we expect more data on it.
               throw new IOException("Unexpected EOS from the reader");
               throw new IOException("Unexpected EOS from the reader");

+ 130 - 14
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.dfs;
 
 
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.*;
@@ -115,6 +116,9 @@ public class DataNode implements FSConstants, Runnable {
   private Thread dataNodeThread = null;
   private Thread dataNodeThread = null;
   String machineName;
   String machineName;
   int defaultBytesPerChecksum = 512;
   int defaultBytesPerChecksum = 512;
+  
+  private DataBlockScanner blockScanner;
+  private Daemon blockScannerThread;
 
 
   // The following three fields are to support balancing
   // The following three fields are to support balancing
   final static short MAX_BALANCING_THREADS = 5;
   final static short MAX_BALANCING_THREADS = 5;
@@ -130,7 +134,7 @@ public class DataNode implements FSConstants, Runnable {
     return System.currentTimeMillis();
     return System.currentTimeMillis();
   }
   }
 
 
-  private static class DataNodeMetrics implements Updater {
+  static class DataNodeMetrics implements Updater {
     private final MetricsRecord metricsRecord;
     private final MetricsRecord metricsRecord;
     private int bytesWritten = 0;
     private int bytesWritten = 0;
     private int bytesRead = 0;
     private int bytesRead = 0;
@@ -138,6 +142,8 @@ public class DataNode implements FSConstants, Runnable {
     private int blocksRead = 0;
     private int blocksRead = 0;
     private int blocksReplicated = 0;
     private int blocksReplicated = 0;
     private int blocksRemoved = 0;
     private int blocksRemoved = 0;
+    private int blocksVerified = 0;
+    private int blockVerificationFailures = 0;
       
       
     DataNodeMetrics(Configuration conf) {
     DataNodeMetrics(Configuration conf) {
       String sessionId = conf.get("session.id"); 
       String sessionId = conf.get("session.id"); 
@@ -162,6 +168,9 @@ public class DataNode implements FSConstants, Runnable {
         metricsRecord.incrMetric("blocks_written", blocksWritten);
         metricsRecord.incrMetric("blocks_written", blocksWritten);
         metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
         metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
         metricsRecord.incrMetric("blocks_removed", blocksRemoved);
         metricsRecord.incrMetric("blocks_removed", blocksRemoved);
+        metricsRecord.incrMetric("blocks_verified", blocksVerified);
+        metricsRecord.incrMetric("block_verification_failures", 
+                                                  blockVerificationFailures);        
               
               
         bytesWritten = 0;
         bytesWritten = 0;
         bytesRead = 0;
         bytesRead = 0;
@@ -169,6 +178,8 @@ public class DataNode implements FSConstants, Runnable {
         blocksRead = 0;
         blocksRead = 0;
         blocksReplicated = 0;
         blocksReplicated = 0;
         blocksRemoved = 0;
         blocksRemoved = 0;
+        blocksVerified = 0;
+        blockVerificationFailures = 0;
       }
       }
       metricsRecord.update();
       metricsRecord.update();
     }
     }
@@ -196,6 +207,14 @@ public class DataNode implements FSConstants, Runnable {
     synchronized void removedBlocks(int nblocks) {
     synchronized void removedBlocks(int nblocks) {
       blocksRemoved += nblocks;
       blocksRemoved += nblocks;
     }
     }
+    
+    synchronized void verifiedBlocks(int nblocks) {
+      blocksVerified += nblocks;
+    }
+    
+    synchronized void verificationFailures(int failures) {
+      blockVerificationFailures += failures;
+    }    
   }
   }
     
     
   /**
   /**
@@ -310,6 +329,21 @@ public class DataNode implements FSConstants, Runnable {
     LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
     LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
     this.balancingThrottler = new Throttler(balanceBandwidth);
     this.balancingThrottler = new Throttler(balanceBandwidth);
 
 
+    //initialize periodic block scanner
+    String reason = null;
+    if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
+      reason = "verification is turned off by configuration";
+    } else if ( !(data instanceof FSDataset) ) {
+      reason = "verifcation is supported only with FSDataset";
+    } 
+    if ( reason == null ) {
+      blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
+      blockScannerThread = new Daemon(blockScanner);
+    } else {
+      LOG.info("Periodic Block Verification is disabled because " +
+               reason + ".");
+    }
+    
     //create a servlet to serve full-file content
     //create a servlet to serve full-file content
     String infoAddr = conf.get("dfs.datanode.http.bindAddress", "0.0.0.0:50075");
     String infoAddr = conf.get("dfs.datanode.http.bindAddress", "0.0.0.0:50075");
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
@@ -317,6 +351,9 @@ public class DataNode implements FSConstants, Runnable {
     int tmpInfoPort = infoSocAddr.getPort();
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = new StatusHttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0);
     this.infoServer = new StatusHttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0);
     this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
     this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
+    this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
+    this.infoServer.addServlet(null, "/blockScannerReport", 
+                               DataBlockScanner.Servlet.class);
     this.infoServer.start();
     this.infoServer.start();
     // adjust info port
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
@@ -372,6 +409,10 @@ public class DataNode implements FSConstants, Runnable {
     return nameNodeAddr;
     return nameNodeAddr;
   }
   }
     
     
+  DataNodeMetrics getMetrics() {
+    return myMetrics;
+  }
+  
   /**
   /**
    * Return the namenode's identifier
    * Return the namenode's identifier
    */
    */
@@ -473,6 +514,10 @@ public class DataNode implements FSConstants, Runnable {
     }
     }
     if(upgradeManager != null)
     if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
       upgradeManager.shutdownUpgrade();
+    if (blockScannerThread != null) {
+      blockScanner.shutdown();
+      blockScannerThread.interrupt();
+    }
     if (storage != null) {
     if (storage != null) {
       try {
       try {
         this.storage.unlockAll();
         this.storage.unlockAll();
@@ -684,6 +729,9 @@ public class DataNode implements FSConstants, Runnable {
       //
       //
       Block toDelete[] = ((BlockCommand)cmd).getBlocks();
       Block toDelete[] = ((BlockCommand)cmd).getBlocks();
       try {
       try {
+        if (blockScanner != null) {
+          blockScanner.deleteBlocks(toDelete);
+        }
         data.invalidate(toDelete);
         data.invalidate(toDelete);
       } catch(IOException e) {
       } catch(IOException e) {
         checkDiskError();
         checkDiskError();
@@ -910,7 +958,8 @@ public class DataNode implements FSConstants, Runnable {
       BlockSender blockSender = null;
       BlockSender blockSender = null;
       try {
       try {
         try {
         try {
-          blockSender = new BlockSender(block, startOffset, length, true, true);
+          blockSender = new BlockSender(block, startOffset, length, 
+                                        true, true, false);
         } catch(IOException e) {
         } catch(IOException e) {
           out.writeShort(OP_STATUS_ERROR);
           out.writeShort(OP_STATUS_ERROR);
           throw e;
           throw e;
@@ -919,6 +968,17 @@ public class DataNode implements FSConstants, Runnable {
         out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
         out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
         long read = blockSender.sendBlock(out, null); // send data
         long read = blockSender.sendBlock(out, null); // send data
 
 
+        if (blockSender.isBlockReadFully()) {
+          // See if client verification succeeded. 
+          // This is an optional response from client.
+          try {
+            if (in.readShort() == OP_STATUS_CHECKSUM_OK  && 
+                blockScanner != null) {
+              blockScanner.verifiedByClient(block);
+            }
+          } catch (IOException ignored) {}
+        }
+        
         myMetrics.readBytes((int) read);
         myMetrics.readBytes((int) read);
         myMetrics.readBlocks(1);
         myMetrics.readBlocks(1);
         LOG.info(dnRegistration + "Served block " + block + " to " + s.getInetAddress());
         LOG.info(dnRegistration + "Served block " + block + " to " + s.getInetAddress());
@@ -1013,6 +1073,10 @@ public class DataNode implements FSConstants, Runnable {
         // notify name node
         // notify name node
         notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
         notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
 
 
+        if (blockScanner != null) {
+          blockScanner.addBlock(block);
+        }
+        
         String msg = "Received block " + block + " from " +
         String msg = "Received block " + block + " from " +
                      s.getInetAddress();
                      s.getInetAddress();
 
 
@@ -1111,7 +1175,7 @@ public class DataNode implements FSConstants, Runnable {
         balancingSem.acquireUninterruptibly();
         balancingSem.acquireUninterruptibly();
         
         
         // check if the block exists or not
         // check if the block exists or not
-        blockSender = new BlockSender(block, 0, -1, false, false);
+        blockSender = new BlockSender(block, 0, -1, false, false, false);
 
 
         // get the output stream to the target
         // get the output stream to the target
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
@@ -1217,12 +1281,19 @@ public class DataNode implements FSConstants, Runnable {
     private long curReserve;     // remaining bytes can be sent in the period
     private long curReserve;     // remaining bytes can be sent in the period
     private long bytesAlreadyUsed;
     private long bytesAlreadyUsed;
 
 
-    /** Constructor */
+    /** Constructor 
+     * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+     */
     Throttler(long bandwidthPerSec) {
     Throttler(long bandwidthPerSec) {
       this(500, bandwidthPerSec);  // by default throttling period is 500ms 
       this(500, bandwidthPerSec);  // by default throttling period is 500ms 
     }
     }
 
 
-    /** Constructor */
+    /**
+     * Constructor
+     * @param period in milliseconds. Bandwidth is enforced over this
+     *        period.
+     * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+     */
     Throttler(long period, long bandwidthPerSec) {
     Throttler(long period, long bandwidthPerSec) {
       this.curPeriodStart = System.currentTimeMillis();
       this.curPeriodStart = System.currentTimeMillis();
       this.period = period;
       this.period = period;
@@ -1230,6 +1301,26 @@ public class DataNode implements FSConstants, Runnable {
       this.periodExtension = period*3;
       this.periodExtension = period*3;
     }
     }
 
 
+    /**
+     * @return current throttle bandwidth in bytes per second.
+     */
+    public synchronized long getBandwidth() {
+      return bytesPerPeriod*1000/period;
+    }
+    
+    /**
+     * Sets throttle bandwidth. This takes affect latest by the end of current
+     * period.
+     * 
+     * @param bytesPerSecond 
+     */
+    public synchronized void setBandwidth(long bytesPerSecond) {
+      if ( bytesPerSecond <= 0 ) {
+        throw new IllegalArgumentException("" + bytesPerSecond);
+      }
+      bytesPerPeriod = bytesPerSecond*period/1000;
+    }
+    
     /** Given the numOfBytes sent/received since last time throttle was called,
     /** Given the numOfBytes sent/received since last time throttle was called,
      * make the current thread sleep if I/O rate is too fast
      * make the current thread sleep if I/O rate is too fast
      * compared to the given bandwidth
      * compared to the given bandwidth
@@ -1269,30 +1360,35 @@ public class DataNode implements FSConstants, Runnable {
     }
     }
   }
   }
 
 
-  private class BlockSender implements java.io.Closeable {
+  class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
     private Block block; // the block to read from
     private DataInputStream blockIn; // data strean
     private DataInputStream blockIn; // data strean
     private DataInputStream checksumIn; // checksum datastream
     private DataInputStream checksumIn; // checksum datastream
     private DataChecksum checksum; // checksum stream
     private DataChecksum checksum; // checksum stream
     private long offset; // starting position to read
     private long offset; // starting position to read
     private long endOffset; // ending position
     private long endOffset; // ending position
+    private long blockLength;
     private byte buf[]; // buffer to store data read from the block file & crc
     private byte buf[]; // buffer to store data read from the block file & crc
     private int bytesPerChecksum; // chunk size
     private int bytesPerChecksum; // chunk size
     private int checksumSize; // checksum size
     private int checksumSize; // checksum size
     private boolean corruptChecksumOk; // if need to verify checksum
     private boolean corruptChecksumOk; // if need to verify checksum
     private boolean chunkOffsetOK; // if need to send chunk offset
     private boolean chunkOffsetOK; // if need to send chunk offset
 
 
+    private boolean blockReadFully; //set when the whole block is read
+    private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
     private Throttler throttler;
     private DataOutputStream out;
     private DataOutputStream out;
 
 
     BlockSender(Block block, long startOffset, long length,
     BlockSender(Block block, long startOffset, long length,
-        boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException {
+                boolean corruptChecksumOk, boolean chunkOffsetOK,
+                boolean verifyChecksum) throws IOException {
 
 
       try {
       try {
         this.block = block;
         this.block = block;
         this.chunkOffsetOK = chunkOffsetOK;
         this.chunkOffsetOK = chunkOffsetOK;
         this.corruptChecksumOk = corruptChecksumOk;
         this.corruptChecksumOk = corruptChecksumOk;
-
+        this.verifyChecksum = verifyChecksum;
+        this.blockLength = data.getLength(block);
 
 
         if ( !corruptChecksumOk || data.metaFileExists(block) ) {
         if ( !corruptChecksumOk || data.metaFileExists(block) ) {
           checksumIn = new DataInputStream(
           checksumIn = new DataInputStream(
@@ -1317,10 +1413,10 @@ public class DataNode implements FSConstants, Runnable {
         checksumSize = checksum.getChecksumSize();
         checksumSize = checksum.getChecksumSize();
 
 
         if (length < 0) {
         if (length < 0) {
-          length = data.getLength(block);
+          length = blockLength;
         }
         }
 
 
-        endOffset = data.getLength(block);
+        endOffset = blockLength;
         if (startOffset < 0 || startOffset > endOffset
         if (startOffset < 0 || startOffset > endOffset
             || (length + startOffset) > endOffset) {
             || (length + startOffset) > endOffset) {
           String msg = " Offset " + startOffset + " and length " + length
           String msg = " Offset " + startOffset + " and length " + length
@@ -1397,10 +1493,18 @@ public class DataNode implements FSConstants, Runnable {
       if (checksumSize > 0 && checksumIn != null) {
       if (checksumSize > 0 && checksumIn != null) {
         try {
         try {
           checksumIn.readFully(buf, len, checksumSize);
           checksumIn.readFully(buf, len, checksumSize);
+          
+          if (verifyChecksum) {
+            checksum.reset();
+            checksum.update(buf, 0, len);
+            if (!checksum.compare(buf, len)) {
+              throw new ChecksumException("Checksum failed at " + offset, len);
+            }
+          }
         } catch (IOException e) {
         } catch (IOException e) {
-          LOG.warn(" Could not read checksum for data at offset " + offset
-              + " for block " + block + " got : "
-              + StringUtils.stringifyException(e));
+          LOG.warn(" Could not read or failed to veirfy checksum for data" +
+                   " at offset " + offset + " for block " + block + " got : "
+                   + StringUtils.stringifyException(e));
           IOUtils.closeStream(checksumIn);
           IOUtils.closeStream(checksumIn);
           checksumIn = null;
           checksumIn = null;
           if (corruptChecksumOk) {
           if (corruptChecksumOk) {
@@ -1437,6 +1541,7 @@ public class DataNode implements FSConstants, Runnable {
       this.out = out;
       this.out = out;
       this.throttler = throttler;
       this.throttler = throttler;
 
 
+      long initialOffset = offset;
       long totalRead = 0;
       long totalRead = 0;
       try {
       try {
         checksum.writeHeader(out);
         checksum.writeHeader(out);
@@ -1456,8 +1561,14 @@ public class DataNode implements FSConstants, Runnable {
         close();
         close();
       }
       }
 
 
+      blockReadFully = (initialOffset == 0 && offset >= blockLength);
+
       return totalRead;
       return totalRead;
     }
     }
+    
+    boolean isBlockReadFully() {
+      return blockReadFully;
+    }
   }
   }
 
 
   /* A class that receives a block and wites to its own disk, meanwhile
   /* A class that receives a block and wites to its own disk, meanwhile
@@ -1684,7 +1795,7 @@ public class DataNode implements FSConstants, Runnable {
 
 
         out = new DataOutputStream(new BufferedOutputStream(
         out = new DataOutputStream(new BufferedOutputStream(
             sock.getOutputStream(), BUFFER_SIZE));
             sock.getOutputStream(), BUFFER_SIZE));
-        blockSender = new BlockSender(b, 0, -1, false, false);
+        blockSender = new BlockSender(b, 0, -1, false, false, false);
 
 
         //
         //
         // Header info
         // Header info
@@ -1727,6 +1838,11 @@ public class DataNode implements FSConstants, Runnable {
   public void run() {
   public void run() {
     LOG.info(dnRegistration + "In DataNode.run, data = " + data);
     LOG.info(dnRegistration + "In DataNode.run, data = " + data);
 
 
+    // start block scanner
+    if (blockScannerThread != null) {
+      blockScannerThread.start();
+    }
+
     // start dataXceiveServer
     // start dataXceiveServer
     dataXceiveServer.start();
     dataXceiveServer.start();
         
         

+ 12 - 2
src/java/org/apache/hadoop/dfs/DataStorage.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.dfs;
 package org.apache.hadoop.dfs;
 
 
 import java.io.File;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.nio.channels.FileLock;
@@ -31,6 +33,7 @@ import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.fs.FileUtil.HardLink;
 import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.io.IOUtils;
 
 
 /** 
 /** 
  * Data storage information file.
  * Data storage information file.
@@ -41,6 +44,7 @@ class DataStorage extends Storage {
   // Constants
   // Constants
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
   final static String BLOCK_FILE_PREFIX = "blk_";
+  final static String COPY_FILE_PREFIX = "dncp_";
   
   
   private String storageID;
   private String storageID;
 
 
@@ -424,7 +428,12 @@ class DataStorage extends Storage {
   
   
   static void linkBlocks(File from, File to) throws IOException {
   static void linkBlocks(File from, File to) throws IOException {
     if (!from.isDirectory()) {
     if (!from.isDirectory()) {
-      HardLink.createHardLink(from, to);
+      if (from.getName().startsWith(COPY_FILE_PREFIX)) {
+        IOUtils.copyBytes(new FileInputStream(from), 
+                          new FileOutputStream(to), 16*1024, true);
+      } else {
+        HardLink.createHardLink(from, to);
+      }
       return;
       return;
     }
     }
     // from is a directory
     // from is a directory
@@ -433,7 +442,8 @@ class DataStorage extends Storage {
     String[] blockNames = from.list(new java.io.FilenameFilter() {
     String[] blockNames = from.list(new java.io.FilenameFilter() {
         public boolean accept(File dir, String name) {
         public boolean accept(File dir, String name) {
           return name.startsWith(BLOCK_SUBDIR_PREFIX) 
           return name.startsWith(BLOCK_SUBDIR_PREFIX) 
-            || name.startsWith(BLOCK_FILE_PREFIX);
+            || name.startsWith(BLOCK_FILE_PREFIX)
+            || name.startsWith(COPY_FILE_PREFIX);
         }
         }
       });
       });
     
     

+ 6 - 1
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  **********************************************************************/
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
 interface DatanodeProtocol extends VersionedProtocol {
   /*
   /*
-   * 10: blockReceived also sends hints for deletion
+   * 11 : reportBadBlocks() is added.
    * 11 Block reports as long[]
    * 11 Block reports as long[]
    */
    */
   public static final long versionID = 11L;
   public static final long versionID = 11L;
@@ -141,4 +141,9 @@ interface DatanodeProtocol extends VersionedProtocol {
    */
    */
   public BlockCrcInfo blockCrcUpgradeGetBlockLocations(Block block)
   public BlockCrcInfo blockCrcUpgradeGetBlockLocations(Block block)
                                                       throws IOException;  
                                                       throws IOException;  
+
+  /**
+   * same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
+   */
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
 }
 }

+ 1 - 0
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -93,6 +93,7 @@ public interface FSConstants {
   public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
   public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
   public static final int OP_STATUS_ERROR_INVALID = 3;  
   public static final int OP_STATUS_ERROR_INVALID = 3;  
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
+  public static final int OP_STATUS_CHECKSUM_OK = 5;  
 
 
   
   
   /** Version for data transfers between clients and datanodes
   /** Version for data transfers between clients and datanodes

+ 4 - 0
src/java/org/apache/hadoop/dfs/FSDataset.java

@@ -317,6 +317,10 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       return usage.getMount();
       return usage.getMount();
     }
     }
       
       
+    File getDir() {
+      return dataDir.dir;
+    }
+    
     File createTmpFile(Block b) throws IOException {
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
       File f = new File(tmpDir, b.getBlockName());
       try {
       try {

+ 10 - 0
src/java/org/apache/hadoop/io/IOUtils.java

@@ -147,4 +147,14 @@ public class IOUtils {
       }
       }
     }
     }
   }
   }
+  
+  /** /dev/null of OutputStreams.
+   */
+  public static class NullOutputStream extends OutputStream {
+    public void write(byte[] b, int off, int len) throws IOException {
+    }
+
+    public void write(int b) throws IOException {
+    }
+  }  
 }
 }

+ 8 - 0
src/test/org/apache/hadoop/dfs/DFSTestUtil.java

@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Random;
 import java.util.Random;
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -212,4 +213,11 @@ public class DFSTestUtil extends TestCase {
     fs.delete(root);
     fs.delete(root);
     files = null;
     files = null;
   }
   }
+  
+  static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
+    DFSDataInputStream in = 
+      (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
+    in.readByte();
+    return in.getCurrentBlock();
+  }  
 }
 }

+ 8 - 1
src/test/org/apache/hadoop/dfs/TestDFSFinalize.java

@@ -83,7 +83,14 @@ public class TestDFSFinalize extends TestCase {
     UpgradeUtilities.initialize();
     UpgradeUtilities.initialize();
     
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      /* This test requires that "current" directory not change after
+       * the upgrade. Actually it is ok for those contents to change.
+       * For now disabling block verification so that the contents are 
+       * not changed.
+       */
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       
       

+ 3 - 1
src/test/org/apache/hadoop/dfs/TestDFSRollback.java

@@ -117,7 +117,9 @@ public class TestDFSRollback extends TestCase {
     UpgradeUtilities.initialize();
     UpgradeUtilities.initialize();
     
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       
       

+ 2 - 1
src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java

@@ -165,7 +165,8 @@ public class TestDFSStartupVersions extends TestCase {
    */
    */
   public void testVersions() throws Exception {
   public void testVersions() throws Exception {
     UpgradeUtilities.initialize();
     UpgradeUtilities.initialize();
-    Configuration conf = UpgradeUtilities.initializeStorageStateConf(1);
+    Configuration conf = UpgradeUtilities.initializeStorageStateConf(1, 
+                                                      new Configuration());
     StorageInfo[] versions = initializeVersions();
     StorageInfo[] versions = initializeVersions();
     UpgradeUtilities.createStorageDirs(
     UpgradeUtilities.createStorageDirs(
                                        NAME_NODE, conf.getStrings("dfs.name.dir"), "current");
                                        NAME_NODE, conf.getStrings("dfs.name.dir"), "current");

+ 3 - 2
src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.dfs.FSConstants.NodeType;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
-import org.apache.hadoop.fs.Path;
 
 
 /**
 /**
 * This test ensures the appropriate response (successful or failure) from
 * This test ensures the appropriate response (successful or failure) from
@@ -179,7 +178,9 @@ public class TestDFSStorageStateRecovery extends TestCase {
     UpgradeUtilities.initialize();
     UpgradeUtilities.initialize();
 
 
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       for (int i = 0; i < testCases.length; i++) {
       for (int i = 0; i < testCases.length; i++) {
         boolean[] testCase = testCases[i];
         boolean[] testCase = testCases[i];
         boolean shouldRecover = testCase[4];
         boolean shouldRecover = testCase[4];

+ 3 - 1
src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java

@@ -123,7 +123,9 @@ public class TestDFSUpgrade extends TestCase {
     UpgradeUtilities.initialize();
     UpgradeUtilities.initialize();
     
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+      conf = new Configuration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
       
       

+ 1 - 8
src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

@@ -114,13 +114,6 @@ public class TestDataTransferProtocol extends TestCase {
     in.readFully(arr);
     in.readFully(arr);
   }
   }
   
   
-  Block getFirstBlock(FileSystem fs, Path path) throws IOException {
-    DFSDataInputStream in = 
-      (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
-    in.readByte();
-    return in.getCurrentBlock();
-  }
-  
   public void testDataTransferProtocol() throws IOException {
   public void testDataTransferProtocol() throws IOException {
     Random random = new Random();
     Random random = new Random();
     int oneMil = 1024*1024;
     int oneMil = 1024*1024;
@@ -143,7 +136,7 @@ public class TestDataTransferProtocol extends TestCase {
     createFile(fileSys, file, fileLen);
     createFile(fileSys, file, fileLen);
 
 
     // get the first blockid for the file
     // get the first blockid for the file
-    Block firstBlock = getFirstBlock(fileSys, file);
+    Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
     long newBlockId = firstBlock.getBlockId() + 1;
     long newBlockId = firstBlock.getBlockId() + 1;
 
 
     recvByteBuf.position(1);
     recvByteBuf.position(1);

+ 2 - 1
src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java

@@ -88,7 +88,8 @@ public class TestDistributedUpgrade extends TestCase {
     UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Datanode());
     UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Datanode());
     UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Namenode());
     UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Namenode());
 
 
-    conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+    conf = UpgradeUtilities.initializeStorageStateConf(numDirs, 
+                                                       new Configuration());
     String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
     String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
     String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
     String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
     DFSAdmin dfsAdmin = new DFSAdmin();
     DFSAdmin dfsAdmin = new DFSAdmin();

+ 5 - 2
src/test/org/apache/hadoop/dfs/UpgradeUtilities.java

@@ -139,7 +139,8 @@ public class UpgradeUtilities {
    * Initialize dfs.name.dir and dfs.data.dir with the specified number of
    * Initialize dfs.name.dir and dfs.data.dir with the specified number of
    * directory entries. Also initialize dfs.blockreport.intervalMsec.
    * directory entries. Also initialize dfs.blockreport.intervalMsec.
    */
    */
-  public static Configuration initializeStorageStateConf(int numDirs) {
+  public static Configuration initializeStorageStateConf(int numDirs,
+                                                         Configuration conf) {
     StringBuffer nameNodeDirs =
     StringBuffer nameNodeDirs =
       new StringBuffer(new File(TEST_ROOT_DIR, "name1").toString());
       new StringBuffer(new File(TEST_ROOT_DIR, "name1").toString());
     StringBuffer dataNodeDirs =
     StringBuffer dataNodeDirs =
@@ -148,7 +149,9 @@ public class UpgradeUtilities {
       nameNodeDirs.append("," + new File(TEST_ROOT_DIR, "name"+i));
       nameNodeDirs.append("," + new File(TEST_ROOT_DIR, "name"+i));
       dataNodeDirs.append("," + new File(TEST_ROOT_DIR, "data"+i));
       dataNodeDirs.append("," + new File(TEST_ROOT_DIR, "data"+i));
     }
     }
-    Configuration conf = new Configuration();
+    if (conf == null) {
+      conf = new Configuration();
+    }
     conf.set("dfs.name.dir", nameNodeDirs.toString());
     conf.set("dfs.name.dir", nameNodeDirs.toString());
     conf.set("dfs.data.dir", dataNodeDirs.toString());
     conf.set("dfs.data.dir", dataNodeDirs.toString());
     conf.setInt("dfs.blockreport.intervalMsec", 10000);
     conf.setInt("dfs.blockreport.intervalMsec", 10000);