Ver Fonte

HDFS-17358. EC: infinite lease recovery caused by the length of RWR equals to zero or datanode does not have the replica. (#6509). Contributed by farmmamba.

Reviewed-by: Tao Li <tomscut@apache.org>
Reviewed-by: Haiyang Hu <haiyang.hu@shopee.com>
Signed-off-by:  Shuyan Zhang <zhangshuyan@apache.org>
hfutatzhanghb há 1 ano atrás
pai
commit
15af52954f

+ 35 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java

@@ -386,6 +386,8 @@ public class BlockRecoveryWorker {
       Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
       Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
       final int dataBlkNum = ecPolicy.getNumDataUnits();
       final int dataBlkNum = ecPolicy.getNumDataUnits();
       final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
       final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
+      int zeroLenReplicaCnt = 0;
+      int dnNotHaveReplicaCnt = 0;
       //check generation stamps
       //check generation stamps
       for (int i = 0; i < locs.length; i++) {
       for (int i = 0; i < locs.length; i++) {
         DatanodeID id = locs[i];
         DatanodeID id = locs[i];
@@ -419,10 +421,14 @@ public class BlockRecoveryWorker {
             if (info == null) {
             if (info == null) {
               LOG.debug("Block recovery: DataNode: {} does not have " +
               LOG.debug("Block recovery: DataNode: {} does not have " +
                   "replica for block: (block={}, internalBlk={})", id, block, internalBlk);
                   "replica for block: (block={}, internalBlk={})", id, block, internalBlk);
+              dnNotHaveReplicaCnt++;
             } else {
             } else {
               LOG.debug("Block recovery: Ignored replica with invalid "
               LOG.debug("Block recovery: Ignored replica with invalid "
                   + "generation stamp or length: {} from DataNode: {} by block: {}",
                   + "generation stamp or length: {} from DataNode: {} by block: {}",
                   info, id, block);
                   info, id, block);
+              if (info.getNumBytes() == 0) {
+                zeroLenReplicaCnt++;
+              }
             }
             }
           }
           }
         } catch (RecoveryInProgressException ripE) {
         } catch (RecoveryInProgressException ripE) {
@@ -436,9 +442,18 @@ public class BlockRecoveryWorker {
                   "datanode={})", block, internalBlk, id, e);
                   "datanode={})", block, internalBlk, id, e);
         }
         }
       }
       }
-      checkLocations(syncBlocks.size());
 
 
-      final long safeLength = getSafeLength(syncBlocks);
+      final long safeLength;
+      if (dnNotHaveReplicaCnt + zeroLenReplicaCnt <= locs.length - ecPolicy.getNumDataUnits()) {
+        checkLocations(syncBlocks.size());
+        safeLength = getSafeLength(syncBlocks);
+      } else {
+        safeLength = 0;
+        LOG.warn("Block recovery: {} datanodes do not have the replica of block {}." +
+            " {} datanodes have zero-length replica. Will remove this block.",
+            dnNotHaveReplicaCnt, block, zeroLenReplicaCnt);
+      }
+
       LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
       LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
           block.getNumBytes(), safeLength, syncBlocks);
           block.getNumBytes(), safeLength, syncBlocks);
 
 
@@ -452,11 +467,13 @@ public class BlockRecoveryWorker {
           rurList.add(r);
           rurList.add(r);
         }
         }
       }
       }
-      assert rurList.size() >= dataBlkNum : "incorrect safe length";
 
 
-      // Recovery the striped block by truncating internal blocks to the safe
-      // length. Abort if there is any failure in this step.
-      truncatePartialBlock(rurList, safeLength);
+      if (safeLength > 0) {
+        Preconditions.checkArgument(rurList.size() >= dataBlkNum, "incorrect safe length");
+        // Recovery the striped block by truncating internal blocks to the safe
+        // length. Abort if there is any failure in this step.
+        truncatePartialBlock(rurList, safeLength);
+      }
 
 
       // notify Namenode the new size and locations
       // notify Namenode the new size and locations
       final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
       final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
@@ -469,11 +486,20 @@ public class BlockRecoveryWorker {
         int index = (int) (r.rInfo.getBlockId() &
         int index = (int) (r.rInfo.getBlockId() &
             HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
             HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
         newLocs[index] = r.id;
         newLocs[index] = r.id;
-        newStorages[index] = r.storageID;
+        if (r.storageID != null) {
+          newStorages[index] = r.storageID;
+        }
       }
       }
       ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
       ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
           safeLength, recoveryId);
           safeLength, recoveryId);
       DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
       DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
+      if (safeLength == 0) {
+        nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
+            newBlock.getNumBytes(), true, true, newLocs, newStorages);
+        LOG.info("After block recovery, the length of new block is 0. " +
+            "Will remove this block: {} from file.", newBlock);
+        return;
+      }
       nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
       nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
           newBlock.getNumBytes(), true, false, newLocs, newStorages);
           newBlock.getNumBytes(), true, false, newLocs, newStorages);
     }
     }
@@ -527,8 +553,8 @@ public class BlockRecoveryWorker {
     private void checkLocations(int locationCount)
     private void checkLocations(int locationCount)
         throws IOException {
         throws IOException {
       if (locationCount < ecPolicy.getNumDataUnits()) {
       if (locationCount < ecPolicy.getNumDataUnits()) {
-        throw new IOException(block + " has no enough internal blocks" +
-            ", unable to start recovery. Locations=" + Arrays.asList(locs));
+        throw new IOException(block + " has no enough internal blocks(current: " + locationCount +
+            "), unable to start recovery. Locations=" + Arrays.asList(locs));
       }
       }
     }
     }
   }
   }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java

@@ -133,8 +133,8 @@ class StripedBlockReader {
           block.getNumBytes() - offsetInBlock, true, "", peer, source,
           block.getNumBytes() - offsetInBlock, true, "", peer, source,
           null, stripedReader.getCachingStrategy(), -1, conf);
           null, stripedReader.getCachingStrategy(), -1, conf);
     } catch (IOException e) {
     } catch (IOException e) {
-      LOG.info("Exception while creating remote block reader, datanode {}",
-          source, e);
+      LOG.info("Exception while creating remote block reader for {}, datanode {}",
+          block, source, e);
       IOUtils.closeStream(peer);
       IOUtils.closeStream(peer);
       return null;
       return null;
     }
     }

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java

@@ -259,6 +259,35 @@ public class TestLeaseRecoveryStriped {
     checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB
     checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB
   }
   }
 
 
+  /**
+   * 1. Write 1MB data, then flush it.
+   * 2. Mock client quiet exceptionally.
+   * 3. Trigger lease recovery.
+   * 4. Lease recovery successfully.
+   */
+  @Test
+  public void testLeaseRecoveryWithManyZeroLengthReplica() {
+    int curCellSize = (int)1024 * 1024;
+    try {
+      final FSDataOutputStream out = dfs.create(p);
+      final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
+          .getWrappedStream();
+      for (int pos = 0; pos < curCellSize; pos++) {
+        out.write(StripedFileTestUtil.getByte(pos));
+      }
+      for (int i = 0; i < dataBlocks + parityBlocks; i++) {
+        StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
+        waitStreamerAllAcked(s);
+        stopBlockStream(s);
+      }
+      recoverLease();
+      LOG.info("Trigger recover lease manually successfully.");
+    } catch (Throwable e) {
+      String msg = "failed testCase" + StringUtils.stringifyException(e);
+      Assert.fail(msg);
+    }
+  }
+
   private void checkSafeLength(int blockLength, long expectedSafeLength) {
   private void checkSafeLength(int blockLength, long expectedSafeLength) {
     int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength,
     int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength,
         blockLength, blockLength};
         blockLength, blockLength};