Selaa lähdekoodia

HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt incorrectly. Contributed by Wei-Chiu Chuang and Yongjun Zhang.

(cherry picked from commit 0cb99db9d91d113e7fbe229f90a61a33433cecb9)

Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
Wei-Chiu Chuang 8 vuotta sitten
vanhempi
commit
162c6cc9f4

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java

@@ -64,7 +64,15 @@ public class BlockScanner {
   /**
    * The scanner configuration.
    */
-  private final Conf conf;
+  private Conf conf;
+
+  @VisibleForTesting
+  void setConf(Conf conf) {
+    this.conf = conf;
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      entry.getValue().setConf(conf);
+    }
+  }
 
   /**
    * The cached scanner configuration.

+ 13 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -242,14 +242,24 @@ class BlockSender implements java.io.Closeable {
         Preconditions.checkArgument(sendChecksum,
             "If verifying checksum, currently must also send it.");
       }
-      
+
+      // if there is a append write happening right after the BlockSender
+      // is constructed, the last partial checksum maybe overwritten by the
+      // append, the BlockSender need to use the partial checksum before
+      // the append write.
+      ChunkChecksum chunkChecksum = null;
       final long replicaVisibleLength;
       try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
+        if (replica instanceof FinalizedReplica) {
+          // Load last checksum in case the replica is being written
+          // concurrently
+          final FinalizedReplica frep = (FinalizedReplica) replica;
+          chunkChecksum = frep.getLastChecksumAndDataLen();
+        }
       }
       // if there is a write in progress
-      ChunkChecksum chunkChecksum = null;
       if (replica instanceof ReplicaBeingWritten) {
         final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
         waitForMinLength(rbw, startOffset + length);
@@ -498,7 +508,7 @@ class BlockSender implements java.io.Closeable {
               bytesOnDisk));
     }
   }
-  
+
   /**
    * Converts an IOExcpetion (not subclasses) to SocketException.
    * This is typically done to indicate to upper layers that the error 
@@ -572,7 +582,6 @@ class BlockSender implements java.io.Closeable {
       if (lastDataPacket && lastChunkChecksum != null) {
         int start = checksumOff + checksumDataLen - checksumSize;
         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
-        
         if (updatedChecksum != null) {
           System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
         }

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -88,4 +90,31 @@ public class FinalizedReplica extends ReplicaInfo {
   public String toString() {
     return super.toString();
   }
+
+  /**
+   * gets the last chunk checksum and the length of the block corresponding
+   * to that checksum.
+   * Note, need to be called with the FsDataset lock acquired. May improve to
+   * lock only the FsVolume in the future.
+   * @throws IOException
+   */
+  public ChunkChecksum getLastChecksumAndDataLen() throws IOException {
+    ChunkChecksum chunkChecksum = null;
+    try {
+      byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
+          getBlockFile(), getMetaFile());
+      if (lastChecksum != null) {
+        chunkChecksum =
+            new ChunkChecksum(getVisibleLength(), lastChecksum);
+      }
+    } catch (FileNotFoundException e) {
+      // meta file is lost. Try to continue anyway.
+      DataNode.LOG.warn("meta file " + getMetaFile() +
+          " is missing!");
+    } catch (IOException ioe) {
+      DataNode.LOG.warn("Unable to read checksum from meta file " +
+          getMetaFile(), ioe);
+    }
+    return chunkChecksum;
+  }
 }

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java

@@ -69,7 +69,12 @@ public class VolumeScanner extends Thread {
   /**
    * The configuration.
    */
-  private final Conf conf;
+  private Conf conf;
+
+  @VisibleForTesting
+  void setConf(Conf conf) {
+    this.conf = conf;
+  }
 
   /**
    * The DataNode this VolumEscanner is associated with.
@@ -430,6 +435,7 @@ public class VolumeScanner extends Thread {
     if (block == null) {
       return -1; // block not found.
     }
+    LOG.debug("start scanning block {}", block);
     BlockSender blockSender = null;
     try {
       blockSender = new BlockSender(block, 0, -1,
@@ -611,6 +617,7 @@ public class VolumeScanner extends Thread {
               break;
             }
             if (timeout > 0) {
+              LOG.debug("{}: wait for {} milliseconds", this, timeout);
               wait(timeout);
               if (stopping) {
                 break;

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -186,4 +186,15 @@ public interface FsVolumeSpi {
    * Get the FSDatasetSpi which this volume is a part of.
    */
   FsDatasetSpi getDataset();
+
+  /**
+   * Load last partial chunk checksum from checksum file.
+   * Need to be called with FsDataset lock acquired.
+   * @param blockFile
+   * @param metaFile
+   * @return the last partial checksum
+   * @throws IOException
+   */
+  byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
+      throws IOException;
 }

+ 2 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1169,30 +1169,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-
-  private byte[] loadLastPartialChunkChecksum(
-      File blockFile, File metaFile) throws IOException {
-    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
-    final int checksumSize = dcs.getChecksumSize();
-    final long onDiskLen = blockFile.length();
-    final int bytesPerChecksum = dcs.getBytesPerChecksum();
-
-    if (onDiskLen % bytesPerChecksum == 0) {
-      // the last chunk is a complete one. No need to preserve its checksum
-      // because it will not be modified.
-      return null;
-    }
-
-    int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
-        (int)(onDiskLen / bytesPerChecksum * checksumSize);
-    byte[] lastChecksum = new byte[checksumSize];
-    try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
-      raf.seek(offsetInChecksum);
-      raf.read(lastChecksum, 0, checksumSize);
-    }
-    return lastChecksum;
-  }
-
   /** Append to a finalized replica
    * Change a finalized replica to be a RBW replica and 
    * bump its generation stamp to be the newGS
@@ -1231,7 +1207,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
 
       // load last checksum and datalen
-      byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
+      byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(
           replicaInfo.getBlockFile(), replicaInfo.getMetaFile());
       newReplicaInfo.setLastChecksumAndDataLen(
           replicaInfo.getNumBytes(), lastChunkChecksum);
@@ -1619,7 +1595,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // load last checksum and datalen
       final File destMeta = FsDatasetUtil.getMetaFile(dest,
           b.getGenerationStamp());
-      byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
+      byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(dest, destMeta);
       rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
       // overwrite the RBW in the volume map
       volumeMap.add(b.getBlockPoolId(), rbw);

+ 39 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
 import java.nio.channels.ClosedChannelException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.CloseableReferenceCount;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -1017,5 +1020,41 @@ public class FsVolumeImpl implements FsVolumeSpi {
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }
+
+
+  @Override
+  public byte[] loadLastPartialChunkChecksum(
+      File blockFile, File metaFile) throws IOException {
+    // readHeader closes the temporary FileInputStream.
+    DataChecksum dcs = BlockMetadataHeader
+        .readHeader(metaFile).getChecksum();
+    final int checksumSize = dcs.getChecksumSize();
+    final long onDiskLen = blockFile.length();
+    final int bytesPerChecksum = dcs.getBytesPerChecksum();
+
+    if (onDiskLen % bytesPerChecksum == 0) {
+      // the last chunk is a complete one. No need to preserve its checksum
+      // because it will not be modified.
+      return null;
+    }
+
+    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+        (onDiskLen / bytesPerChecksum) * checksumSize;
+    byte[] lastChecksum = new byte[checksumSize];
+    try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
+      raf.seek(offsetInChecksum);
+      int readBytes = raf.read(lastChecksum, 0, checksumSize);
+      if (readBytes == -1) {
+        throw new IOException("Expected to read " + checksumSize +
+            " bytes from offset " + offsetInChecksum +
+            " but reached end of file.");
+      } else if (readBytes != checksumSize) {
+        throw new IOException("Expected to read " + checksumSize +
+            " bytes from offset " + offsetInChecksum + " but read " +
+            readBytes + " bytes.");
+      }
+    }
+    return lastChecksum;
+  }
 }
 

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -523,6 +523,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     public FsDatasetSpi getDataset() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public byte[] loadLastPartialChunkChecksum(
+        File blockFile, File metaFile) throws IOException {
+      return null;
+    }
   }
 
   private final Map<String, Map<Block, BInfo>> blockMap

+ 100 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java

@@ -34,8 +34,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -806,4 +810,100 @@ public class TestBlockScanner {
       info.blocksScanned = 0;
     }
   }
+
+  /**
+   * Test concurrent append and scan.
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testAppendWhileScanning() throws Exception {
+    GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+    Configuration conf = new Configuration();
+    // throttle the block scanner: 1MB per second
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576);
+    // Set a really long scan period.
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+    final int numExpectedFiles = 1;
+    final int numExpectedBlocks = 1;
+    final int numNameServices = 1;
+    // the initial file length can not be too small.
+    // Otherwise checksum file stream buffer will be pre-filled and
+    // BlockSender will not see the updated checksum.
+    final int initialFileLength = 2*1024*1024+100;
+    final TestContext ctx = new TestContext(conf, numNameServices);
+    // create one file, with one block.
+    ctx.createFiles(0, numExpectedFiles, initialFileLength);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    String storageID = ctx.volumes.get(0).getStorageID();
+    synchronized (info) {
+      info.sem = new Semaphore(numExpectedBlocks*2);
+      info.shouldRun = true;
+      info.notify();
+    }
+    // VolumeScanner scans the first block when DN starts.
+    // Due to throttler, this should take approximately 2 seconds.
+    waitForRescan(info, numExpectedBlocks);
+
+    // update throttler to schedule rescan immediately.
+    // this number must be larger than initial file length, otherwise
+    // throttler prevents immediate rescan.
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
+        initialFileLength+32*1024);
+    BlockScanner.Conf newConf = new BlockScanner.Conf(conf);
+    ctx.datanode.getBlockScanner().setConf(newConf);
+    // schedule the first block for scanning
+    ExtendedBlock first = ctx.getFileBlock(0, 0);
+    ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
+
+    // append the file before VolumeScanner completes scanning the block,
+    // which takes approximately 2 seconds to complete.
+    FileSystem fs = ctx.cluster.getFileSystem();
+    FSDataOutputStream os = fs.append(ctx.getPath(0));
+    long seed = -1;
+    int size = 200;
+    final byte[] bytes = AppendTestUtil.randomBytes(seed, size);
+    os.write(bytes);
+    os.hflush();
+    os.close();
+    fs.close();
+
+    // verify that volume scanner does not find bad blocks after append.
+    waitForRescan(info, numExpectedBlocks);
+
+    GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO);
+  }
+
+  private void waitForRescan(final TestScanResultHandler.Info info,
+      final int numExpectedBlocks)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for the first 1 blocks to be scanned.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned >= numExpectedBlocks) {
+            LOG.info("info = {}.  blockScanned has now reached 1.", info);
+            return true;
+          } else {
+            LOG.info("info = {}.  Waiting for blockScanned to reach 1.", info);
+            return false;
+          }
+        }
+      }
+    }, 1000, 30000);
+
+    synchronized (info) {
+      assertEquals("Expected 1 good block.",
+          numExpectedBlocks, info.goodBlocks.size());
+      info.goodBlocks.clear();
+      assertEquals("Expected 1 blocksScanned",
+          numExpectedBlocks, info.blocksScanned);
+      assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
+      info.blocksScanned = 0;
+    }
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -874,6 +874,12 @@ public class TestDirectoryScanner {
     public FsDatasetSpi getDataset() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public byte[] loadLastPartialChunkChecksum(
+        File blockFile, File metaFile) throws IOException {
+      return null;
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java

@@ -91,6 +91,12 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
     return null;
   }
 
+  @Override
+  public byte[] loadLastPartialChunkChecksum(
+      File blockFile, File metaFile) throws IOException {
+    return null;
+  }
+
   @Override
   public BlockIterator loadBlockIterator(String bpid, String name)
       throws IOException {