Sfoglia il codice sorgente

HADOOP-3503. Fix a race condition when client and namenode start simultaneous
recovery of the same block. (dhruba & Tsz Wo (Nicholas), SZE)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@664041 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 anni fa
parent
commit
82b5394be9

+ 3 - 0
CHANGES.txt

@@ -495,6 +495,9 @@ Trunk (unreleased changes)
     HADOOP-3135. Get the system directory from the JobTracker instead of from
     the conf. (Subramaniam Krishnan via ddas)
 
+    HADOOP-3503. Fix a race condition when client and namenode start simultaneous
+    recovery of the same block.  (dhruba & Tsz Wo (Nicholas), SZE)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 19 - 21
src/java/org/apache/hadoop/dfs/BlocksMap.java

@@ -33,7 +33,7 @@ class BlocksMap {
     private INodeFile          inode;
 
     /**
-     * This array contains trpilets of references.
+     * This array contains triplets of references.
      * For each i-th data-node the block belongs to
      * triplets[3*i] is the reference to the DatanodeDescriptor
      * and triplets[3*i+1] and triplets[3*i+2] are references 
@@ -138,23 +138,6 @@ class BlocksMap {
       return 0;
     }
 
-    /** Update this object */
-    void update(long newgenerationstamp, long newlength,
-        DatanodeDescriptor[] newtargets) {
-      //remove all nodes  
-      for(int n = numNodes(); n >= 0; ) {
-        removeNode(--n);
-      }
-
-      //add all targets  
-      for(DatanodeDescriptor d : newtargets) {
-        addNode(d);
-      }
-
-      generationStamp = newgenerationstamp;
-      len = newlength;
-    }
-
     /**
      * Add data-node this block belongs to.
      */
@@ -339,10 +322,10 @@ class BlocksMap {
 
   /**
    * Remove INode reference from block b.
-   * Remove the block from the block map
-   * only if it does not belong to any file and data-nodes.
+   * If it does not belong to any file and data-nodes,
+   * then remove the block from the block map.
    */
-  public void removeINode(Block b) {
+  void removeINode(Block b) {
     BlockInfo info = map.get(b);
     if (info != null) {
       info.inode = null;
@@ -352,6 +335,21 @@ class BlocksMap {
     }
   }
 
+  /**
+   * Remove the block from the block map.
+   * If the mapped BlockInfo is not null,
+   * it also removes the datanodes associated with the BlockInfo. 
+   */
+  void remove(Block b) {
+    BlockInfo info = map.remove(b);
+    if (info != null) {
+      info.inode = null;
+      for(int n = info.numNodes(); n >= 0; ) {
+        info.removeNode(--n);
+      }
+    }
+  }
+
   /** Returns the block object it it exists in the map. */
   BlockInfo getStoredBlock(Block b) {
     return map.get(b);

+ 2 - 6
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -3067,9 +3067,7 @@ public class DataNode extends Configured
 
   /** {@inheritDoc} */
   public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
-    }
+    LOG.info("oldblock=" + oldblock + ", newblock=" + newblock);
     data.updateBlock(oldblock, newblock);
     if (finalize) {
       data.finalizeBlock(newblock);
@@ -3097,9 +3095,7 @@ public class DataNode extends Configured
   /** {@inheritDoc} */
   public Block recoverBlock(Block block, DatanodeInfo[] targets
       ) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("recoverBlock for block " + block);
-    }
+    LOG.info("Client invoking recoverBlock for block " + block);
     return LeaseManager.recoverBlock(block, targets, namenode, 
                                      getConf(), false);
   }

+ 4 - 3
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and commitBlockSynchronization
+   * 16: Block parameter added to nextGenerationStamp().
    */
   public static final long versionID = 15L;
   
@@ -135,9 +135,10 @@ interface DatanodeProtocol extends VersionedProtocol {
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
   
   /**
-   * @return the next GenerationStamp
+   * @return the next GenerationStamp to be associated with the specified
+   * block. 
    */
-  public long nextGenerationStamp() throws IOException;
+  public long nextGenerationStamp(Block block) throws IOException;
 
   /**
    * Commit block synchronization in lease recovery

+ 63 - 30
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -952,21 +952,17 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         }
         //
         // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then reclaim all resources and allow this request 
-        // to proceed. Otherwise, prevent this request from creating file.
+        // period, then start lease recovery.
         //
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover lease " + lease + ", src=" + src);
           internalReleaseLease(lease, src);
-          leaseManager.renewLease(lease);
-        } else {
-          throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 ", because this file is already being created by " +
-                                                 pendingFile.getClientName() + 
-                                                 " on " + pendingFile.getClientMachine());
         }
+        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
+                                               " on client " + clientMachine + 
+                                               ", because this file is already being created by " +
+                                               pendingFile.getClientName() + 
+                                               " on " + pendingFile.getClientMachine());
       }
 
       try {
@@ -1644,9 +1640,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * @param holder The datanode that was creating the file
    */
   void internalReleaseLease(Lease lease, String src) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("lease=" + lease + ", src=" + src);
-    }
+    LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
@@ -1671,7 +1665,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     // Initialize lease recovery for pendingFile. If there are no blocks 
     // associated with this file, then reap lease immediately. Otherwise 
     // renew the lease and trigger lease recovery.
-    if (pendingFile.getTargets().length == 0) {
+    if (pendingFile.getTargets() == null ||
+        pendingFile.getTargets().length == 0) {
       if (pendingFile.getBlocks().length == 0) {
         finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
@@ -1714,48 +1709,67 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
       ) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("commitBlockSynchronization(lastblock=" + lastblock
+    LOG.info("commitBlockSynchronization(lastblock=" + lastblock
           + ", newgenerationstamp=" + newgenerationstamp
           + ", newlength=" + newlength
           + ", newtargets=" + Arrays.asList(newtargets) + ")");
-    }
-    BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
-    if (blockinfo == null) {
+    final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
+    if (oldblockinfo == null) {
       throw new IOException("Block (=" + lastblock + ") not found");
     }
-    INodeFile iFile = blockinfo.getINode();
+    INodeFile iFile = oldblockinfo.getINode();
     if (!iFile.isUnderConstruction()) {
       throw new IOException("Unexpected block (=" + lastblock
           + ") since the file (=" + iFile.getLocalName()
           + ") is not under construction");
     }
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
-    //update block info
-    if (newtargets.length > 0) {
-      DatanodeDescriptor[] descriptors = new DatanodeDescriptor[newtargets.length];
-      for(int i = 0; i < newtargets.length; i++) {
-        descriptors[i] = getDatanode(newtargets[i]);
+    // Remove old block from blocks map. This always have to be done
+    // because the generationstamp of this block is changing.
+    blocksMap.remove(lastblock);
+
+    if (deleteblock) {
+      pendingFile.removeBlock(lastblock);
+    }
+    else {
+      // update last block, construct newblockinfo and add it to the blocks map
+      lastblock.set(lastblock.blkid, newlength, newgenerationstamp);
+      final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);
+    
+      //update block info
+      DatanodeDescriptor[] descriptors = null;
+      if (newtargets.length > 0) {
+        descriptors = new DatanodeDescriptor[newtargets.length];
+        for(int i = 0; i < newtargets.length; i++) {
+          descriptors[i] = getDatanode(newtargets[i]);
+          descriptors[i].addBlock(newblockinfo);
+        }
       }
-      blockinfo.update(newgenerationstamp, newlength, descriptors);
+
+      pendingFile.setLastBlock(newblockinfo, descriptors);
     }
 
     // If this commit does not want to close the file, just persist
     // block locations and return
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
     String src = leaseManager.findPath(pendingFile);
-    if (deleteblock) {
-      dir.removeBlock(src, pendingFile, lastblock);
-    }
     if (!closeFile) {
       dir.persistBlocks(src, pendingFile);
       getEditLog().logSync();
+      LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+          + ", newgenerationstamp=" + newgenerationstamp
+          + ", newlength=" + newlength
+          + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
       return;
     }
     
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile);
     getEditLog().logSync();
+    LOG.info("commitBlockSynchronization(newblock=" + lastblock
+          + ", newgenerationstamp=" + newgenerationstamp
+          + ", newlength=" + newlength
+          + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
   }
 
 
@@ -4302,6 +4316,25 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     return gs;
   }
 
+  /**
+   * Verifies that the block is associated with a file that has a lease.
+   * Increments, logs and then returns the stamp
+   */
+  synchronized long nextGenerationStampForBlock(Block block) throws IOException {
+    String msg = "Block " + block + " is already commited.";
+    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    if (storedBlock == null) {
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
+    INode fileINode = storedBlock.getINode();
+    if (!fileINode.isUnderConstruction()) {
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
+    return nextGenerationStamp();
+  }
+
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //

+ 12 - 0
src/java/org/apache/hadoop/dfs/INode.java

@@ -1029,6 +1029,16 @@ class INodeFileUnderConstruction extends INodeFile {
     targets = null;
   }
 
+  void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
+      ) throws IOException {
+    if (blocks == null) {
+      throw new IOException("Trying to update non-existant block (newblock="
+          + newblock + ")");
+    }
+    blocks[blocks.length - 1] = newblock;
+    setTargets(newtargets);
+  }
+
   /**
    * Initialize lease recovery for this object
    */
@@ -1048,6 +1058,8 @@ class INodeFileUnderConstruction extends INodeFile {
       if (targets[j].isAlive) {
         DatanodeDescriptor primary = targets[primaryNodeIndex = j]; 
         primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
+        NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
+          + " recovery started.");
       }
     }
   }

+ 53 - 28
src/java/org/apache/hadoop/dfs/LeaseManager.java

@@ -82,6 +82,10 @@ class LeaseManager {
   /** @return the lease containing src */
   Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
 
+  /** list of blocks being recovered */
+  private static Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
+
+
   /** @return the number of leases currently in the system */
   synchronized int countLease() {return sortedLeases.size();}
 
@@ -416,39 +420,60 @@ class LeaseManager {
   static Block recoverBlock(Block block, DatanodeID[] datanodeids,
       DatanodeProtocol namenode, Configuration conf,
       boolean closeFile) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block
-          + ", datanodeids=" + Arrays.asList(datanodeids));
-    }
-    List<BlockRecord> syncList = new ArrayList<BlockRecord>();
-    long minlength = Long.MAX_VALUE;
-    int errorCount = 0;
 
-    //check generation stamps
-    for(DatanodeID id : datanodeids) {
-      try {
-        InterDatanodeProtocol datanode
-            = DataNode.createInterDataNodeProtocolProxy(id, conf);
-        BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
-        if (info != null && info.getGenerationStamp() >= block.generationStamp) {
-          syncList.add(new BlockRecord(id, datanode, new Block(info)));
-          if (info.len < minlength) {
-            minlength = info.len;
+    // If the block is already being recovered, then skip recovering it.
+    // This can happen if the namenode and client start recovering the same
+    // file at the same time.
+    synchronized (ongoingRecovery) {
+      Block tmp = new Block();
+      tmp.set(block.blkid, block.len, GenerationStamp.WILDCARD_STAMP);
+      if (ongoingRecovery.get(tmp) != null) {
+        String msg = "Block " + block + " is already being recovered, " +
+                     " ignoring this request to recover it.";
+        LOG.info(msg);
+        throw new IOException(msg);
+      }
+      ongoingRecovery.put(block, block);
+    }
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("block=" + block
+            + ", datanodeids=" + Arrays.asList(datanodeids));
+      }
+      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+      long minlength = Long.MAX_VALUE;
+      int errorCount = 0;
+
+      //check generation stamps
+      for(DatanodeID id : datanodeids) {
+        try {
+          InterDatanodeProtocol datanode
+              = DataNode.createInterDataNodeProtocolProxy(id, conf);
+          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+          if (info != null && info.getGenerationStamp() >= block.generationStamp) {
+            syncList.add(new BlockRecord(id, datanode, new Block(info)));
+            if (info.len < minlength) {
+              minlength = info.len;
+            }
           }
+        } catch (IOException e) {
+          ++errorCount;
+          InterDatanodeProtocol.LOG.warn(
+              "Failed to getBlockMetaDataInfo for block (=" + block 
+              + ") from datanode (=" + id + ")", e);
         }
-      } catch (IOException e) {
-        ++errorCount;
-        InterDatanodeProtocol.LOG.warn(
-            "Failed to getBlockMetaDataInfo for block (=" + block 
-            + ") from datanode (=" + id + ")", e);
       }
-    }
 
-    if (syncList.isEmpty() && errorCount > 0) {
-      throw new IOException("All datanodes failed: block=" + block
-          + ", datanodeids=" + Arrays.asList(datanodeids));
+      if (syncList.isEmpty() && errorCount > 0) {
+        throw new IOException("All datanodes failed: block=" + block
+            + ", datanodeids=" + Arrays.asList(datanodeids));
+      }
+      return syncBlock(block, minlength, syncList, namenode, closeFile);
+    } finally {
+      synchronized (ongoingRecovery) {
+        ongoingRecovery.remove(block);
+      }
     }
-    return syncBlock(block, minlength, syncList, namenode, closeFile);
   }
 
   /** Block synchronization */
@@ -470,7 +495,7 @@ class LeaseManager {
 
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
 
-    long generationstamp = namenode.nextGenerationStamp();
+    long generationstamp = namenode.nextGenerationStamp(block);
     Block newblock = new Block(block.blkid, minlength, generationstamp);
 
     for(BlockRecord r : syncList) {

+ 2 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -373,8 +373,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   }
 
   /** {@inheritDoc} */
-  public long nextGenerationStamp() {
-    return namesystem.nextGenerationStamp();
+  public long nextGenerationStamp(Block block) throws IOException{
+    return namesystem.nextGenerationStampForBlock(block);
   }
 
   /** {@inheritDoc} */

+ 132 - 0
src/test/org/apache/hadoop/dfs/TestLeaseRecovery2.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class TestLeaseRecovery2 extends junit.framework.TestCase {
+  static final int BLOCK_SIZE = 64;
+  static final int FILE_SIZE = 1024;
+  static final short REPLICATION_NUM = (short)3;
+  static final Random RANDOM = new Random();
+  static byte[] buffer = new byte[FILE_SIZE];
+
+  static void checkMetaInfo(Block b, InterDatanodeProtocol idp
+      ) throws IOException {
+    TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
+  }
+  
+  static int min(Integer... x) {
+    int m = x[0];
+    for(int i = 1; i < x.length; i++) {
+      if (x[i] < m) {
+        m = x[i];
+      }
+    }
+    return m;
+  }
+
+  /**
+   */
+  public void testBlockSynchronization() throws Exception {
+    final long softLease = 1000;
+    final long hardLease = 60 * 60 *1000;
+    final short repl = 3;
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", 16);
+    MiniDFSCluster cluster = null;
+    byte[] actual = new byte[FILE_SIZE];
+
+    try {
+      cluster = new MiniDFSCluster(conf, 5, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      // create a random file name
+      String filestr = "/foo" + RANDOM.nextInt();
+      Path filepath = new Path(filestr);
+      FSDataOutputStream stm = dfs.create(filepath, true,
+                                 dfs.getConf().getInt("io.file.buffer.size", 4096),
+                                 (short)repl, (long)BLOCK_SIZE);
+      assertTrue(dfs.dfs.exists(filestr));
+
+      // write random number of bytes into it.
+      int size = RANDOM.nextInt(FILE_SIZE);
+      stm.write(buffer, 0, size);
+
+      // sync file
+      stm.sync();
+
+      // set the soft limit to be 1 second so that the
+      // namenode triggers lease recovery on next attempt to write-for-open.
+      cluster.setLeasePeriod(softLease, hardLease);
+
+      // try to re-open the file before closing the previous handle. This
+      // should fail but will trigger lease recovery.
+      String oldClientName = dfs.dfs.clientName;
+      dfs.dfs.clientName += "_1";
+      while (true) {
+        try {
+          FSDataOutputStream newstm = dfs.create(filepath, false,
+            dfs.getConf().getInt("io.file.buffer.size", 4096),
+            (short)repl, (long)BLOCK_SIZE);
+          assertTrue("Creation of an existing file should never succeed.", false);
+        } catch (IOException e) {
+          if (e.getMessage().contains("file exists")) {
+            break;
+          }
+          e.printStackTrace();
+        }
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+      }
+      System.out.println("Lease for file " +  filepath + " is recovered. " +
+                         "validating its contents now...");
+
+      // revert back  client identity
+      dfs.dfs.clientName = oldClientName;
+
+      // verify that file-size matches
+      assertTrue("File should be " + size + " bytes, but is actually " +
+                 " found to be " + dfs.getFileStatus(filepath).getLen() +
+                 " bytes",
+                 dfs.getFileStatus(filepath).getLen() == size);
+
+      // verify that there is enough data to read.
+      System.out.println("File size is good. Now validating sizes from datanodes...");
+      FSDataInputStream stmin = dfs.open(filepath);
+      stmin.readFully(0, actual, 0, size);
+      stmin.close();
+    }
+    finally {
+      try {
+        if (cluster != null) {cluster.shutdown();}
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+  }
+}