Browse Source

HDFS-16320. Datanode retrieve slownode information from NameNode (#3654)

Symious 3 years ago
parent
commit
c88640c4ad

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -183,7 +183,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       rollingUpdateStatus = PBHelperClient.convert(resp.getRollingUpgradeStatus());
       rollingUpdateStatus = PBHelperClient.convert(resp.getRollingUpgradeStatus());
     }
     }
     return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
     return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
-        rollingUpdateStatus, resp.getFullBlockReportLeaseId());
+        rollingUpdateStatus, resp.getFullBlockReportLeaseId(),
+        resp.getIsSlownode());
   }
   }
 
 
   @Override
   @Override

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -853,4 +853,12 @@ class BPOfferService {
     return isAlive();
     return isAlive();
   }
   }
 
 
+  boolean isSlownode() {
+    for (BPServiceActor actor : bpServices) {
+      if (actor.isSlownode()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -112,6 +112,7 @@ class BPServiceActor implements Runnable {
   private String nnId = null;
   private String nnId = null;
   private volatile RunningState runningState = RunningState.CONNECTING;
   private volatile RunningState runningState = RunningState.CONNECTING;
   private volatile boolean shouldServiceRun = true;
   private volatile boolean shouldServiceRun = true;
+  private volatile boolean isSlownode = false;
   private final DataNode dn;
   private final DataNode dn;
   private final DNConf dnConf;
   private final DNConf dnConf;
   private long prevBlockReportId;
   private long prevBlockReportId;
@@ -205,6 +206,7 @@ class BPServiceActor implements Runnable {
         String.valueOf(getScheduler().getLastBlockReportTime()));
         String.valueOf(getScheduler().getLastBlockReportTime()));
     info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize()));
     info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize()));
     info.put("maxDataLength", String.valueOf(maxDataLength));
     info.put("maxDataLength", String.valueOf(maxDataLength));
+    info.put("isSlownode", String.valueOf(isSlownode));
     return info;
     return info;
   }
   }
 
 
@@ -729,6 +731,7 @@ class BPServiceActor implements Runnable {
               handleRollingUpgradeStatus(resp);
               handleRollingUpgradeStatus(resp);
             }
             }
             commandProcessingThread.enqueue(resp.getCommands());
             commandProcessingThread.enqueue(resp.getCommands());
+            isSlownode = resp.getIsSlownode();
           }
           }
         }
         }
 
 
@@ -1474,4 +1477,8 @@ class BPServiceActor implements Runnable {
       commandProcessingThread.interrupt();
       commandProcessingThread.interrupt();
     }
     }
   }
   }
+
+  boolean isSlownode() {
+    return isSlownode;
+  }
 }
 }

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java

@@ -307,4 +307,27 @@ class BlockPoolManager {
   Map<String, BPOfferService> getBpByNameserviceId() {
   Map<String, BPOfferService> getBpByNameserviceId() {
     return bpByNameserviceId;
     return bpByNameserviceId;
   }
   }
+
+  boolean isSlownodeByNameserviceId(String nsId) {
+    if (bpByNameserviceId.containsKey(nsId)) {
+      return bpByNameserviceId.get(nsId).isSlownode();
+    }
+    return false;
+  }
+
+  boolean isSlownodeByBlockPoolId(String bpId) {
+    if (bpByBlockPoolId.containsKey(bpId)) {
+      return bpByBlockPoolId.get(bpId).isSlownode();
+    }
+    return false;
+  }
+
+  boolean isSlownode() {
+    for (BPOfferService bpOfferService : bpByBlockPoolId.values()) {
+      if (bpOfferService.isSlownode()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
 }

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -3814,4 +3814,16 @@ public class DataNode extends ReconfigurableBase
     return (stage == PIPELINE_SETUP_STREAMING_RECOVERY
     return (stage == PIPELINE_SETUP_STREAMING_RECOVERY
         || stage == PIPELINE_SETUP_APPEND_RECOVERY);
         || stage == PIPELINE_SETUP_APPEND_RECOVERY);
   }
   }
+
+  boolean isSlownodeByNameserviceId(String nsId) {
+    return blockPoolManager.isSlownodeByNameserviceId(nsId);
+  }
+
+  boolean isSlownodeByBlockPoolId(String bpId) {
+    return blockPoolManager.isSlownodeByBlockPoolId(bpId);
+  }
+
+  boolean isSlownode() {
+    return blockPoolManager.isSlownode();
+  }
 }
 }

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4421,8 +4421,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           haContext.getState().getServiceState(),
           haContext.getState().getServiceState(),
           getFSImage().getCorrectLastAppliedOrWrittenTxId());
           getFSImage().getCorrectLastAppliedOrWrittenTxId());
 
 
+      Set<String> slownodes = DatanodeManager.getSlowNodesUuidSet();
+      boolean isSlownode = slownodes.contains(nodeReg.getDatanodeUuid());
+
       return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
       return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
-          blockReportLeaseId);
+          blockReportLeaseId, isSlownode);
     } finally {
     } finally {
       readUnlock("handleHeartbeat");
       readUnlock("handleHeartbeat");
     }
     }

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java

@@ -36,14 +36,23 @@ public class HeartbeatResponse {
   private final RollingUpgradeStatus rollingUpdateStatus;
   private final RollingUpgradeStatus rollingUpdateStatus;
 
 
   private final long fullBlockReportLeaseId;
   private final long fullBlockReportLeaseId;
-  
+
+  private final boolean isSlownode;
+
   public HeartbeatResponse(DatanodeCommand[] cmds,
   public HeartbeatResponse(DatanodeCommand[] cmds,
       NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
       NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
       long fullBlockReportLeaseId) {
       long fullBlockReportLeaseId) {
+    this(cmds, haStatus, rollingUpdateStatus, fullBlockReportLeaseId, false);
+  }
+
+  public HeartbeatResponse(DatanodeCommand[] cmds,
+      NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
+      long fullBlockReportLeaseId, boolean isSlownode) {
     commands = cmds;
     commands = cmds;
     this.haStatus = haStatus;
     this.haStatus = haStatus;
     this.rollingUpdateStatus = rollingUpdateStatus;
     this.rollingUpdateStatus = rollingUpdateStatus;
     this.fullBlockReportLeaseId = fullBlockReportLeaseId;
     this.fullBlockReportLeaseId = fullBlockReportLeaseId;
+    this.isSlownode = isSlownode;
   }
   }
   
   
   public DatanodeCommand[] getCommands() {
   public DatanodeCommand[] getCommands() {
@@ -61,4 +70,8 @@ public class HeartbeatResponse {
   public long getFullBlockReportLeaseId() {
   public long getFullBlockReportLeaseId() {
     return fullBlockReportLeaseId;
     return fullBlockReportLeaseId;
   }
   }
+
+  public boolean getIsSlownode() {
+    return isSlownode;
+  }
 }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -223,6 +223,7 @@ message HeartbeatResponseProto {
   optional RollingUpgradeStatusProto rollingUpgradeStatus = 3;
   optional RollingUpgradeStatusProto rollingUpgradeStatus = 3;
   optional RollingUpgradeStatusProto rollingUpgradeStatusV2 = 4;
   optional RollingUpgradeStatusProto rollingUpgradeStatusV2 = 4;
   optional uint64 fullBlockReportLeaseId = 5 [ default = 0 ];
   optional uint64 fullBlockReportLeaseId = 5 [ default = 0 ];
+  optional bool isSlownode = 6 [ default = false ];
 }
 }
 
 
 /**
 /**

+ 58 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -30,6 +30,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertSame;
@@ -127,7 +128,8 @@ public class TestBPOfferService {
   private final int[] heartbeatCounts = new int[3];
   private final int[] heartbeatCounts = new int[3];
   private DataNode mockDn;
   private DataNode mockDn;
   private FsDatasetSpi<?> mockFSDataset;
   private FsDatasetSpi<?> mockFSDataset;
-  
+  private boolean isSlownode;
+
   @Before
   @Before
   public void setupMocks() throws Exception {
   public void setupMocks() throws Exception {
     mockNN1 = setupNNMock(0);
     mockNN1 = setupNNMock(0);
@@ -216,6 +218,23 @@ public class TestBPOfferService {
     }
     }
   }
   }
 
 
+  private class HeartbeatIsSlownodeAnswer implements Answer<HeartbeatResponse> {
+    private final int nnIdx;
+
+    HeartbeatIsSlownodeAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    @Override
+    public HeartbeatResponse answer(InvocationOnMock invocation)
+        throws Throwable {
+      HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
+          datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
+          0, isSlownode);
+
+      return heartbeatResponse;
+    }
+  }
 
 
   private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
   private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
     private final int nnIdx;
     private final int nnIdx;
@@ -1182,6 +1201,44 @@ public class TestBPOfferService {
     }
     }
   }
   }
 
 
+  @Test(timeout = 15000)
+  public void testSetIsSlownode() throws Exception {
+    assertEquals(mockDn.isSlownode(), false);
+    Mockito.when(mockNN1.sendHeartbeat(
+            Mockito.any(DatanodeRegistration.class),
+            Mockito.any(StorageReport[].class),
+            Mockito.anyLong(),
+            Mockito.anyLong(),
+            Mockito.anyInt(),
+            Mockito.anyInt(),
+            Mockito.anyInt(),
+            Mockito.any(VolumeFailureSummary.class),
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class),
+            Mockito.any(SlowDiskReports.class)))
+        .thenAnswer(new HeartbeatIsSlownodeAnswer(0));
+
+    BPOfferService bpos = setupBPOSForNNs(mockNN1);
+    bpos.start();
+
+    try {
+      waitForInitialization(bpos);
+
+      bpos.triggerHeartbeatForTests();
+      assertFalse(bpos.isSlownode());
+
+      isSlownode = true;
+      bpos.triggerHeartbeatForTests();
+      assertTrue(bpos.isSlownode());
+
+      isSlownode = false;
+      bpos.triggerHeartbeatForTests();
+      assertFalse(bpos.isSlownode());
+    } finally {
+      bpos.stop();
+    }
+  }
+
   @Test(timeout = 15000)
   @Test(timeout = 15000)
   public void testCommandProcessingThread() throws Exception {
   public void testCommandProcessingThread() throws Exception {
     Configuration conf = new HdfsConfiguration();
     Configuration conf = new HdfsConfiguration();