ソースを参照

CachedRecordStore should check if the record state is expired (#6783)

dannytbecker 1 年間 前
コミット
881034ad45

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

@@ -189,7 +189,7 @@ public abstract class CachedRecordStore<R extends BaseRecord>
           LOG.warn("Couldn't delete State Store record {}: {}", recordName,
               record);
         }
-      } else if (record.checkExpired(currentDriverTime)) {
+      } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         LOG.info("Override State Store record {}: {}", recordName, record);
         commitRecords.add(record);

+ 101 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java

@@ -29,11 +29,18 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -49,16 +56,22 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeat
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.hadoop.util.Time;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test the basic {@link MembershipStore} membership functionality.
  */
 public class TestStateStoreMembershipState extends TestStateStoreBase {
 
+  private static Logger LOG = LoggerFactory.getLogger(
+      TestStateStoreMembershipState.class);
+
   private static MembershipStore membershipStore;
 
   @BeforeClass
@@ -529,6 +542,94 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
     }, 100, 3000);
   }
 
+  @Test
+  public void testRegistrationExpiredRaceCondition()
+      throws InterruptedException, IOException, TimeoutException, ExecutionException {
+
+    // Populate the state store with a single NN element
+    // 1) ns0:nn0 - Expired
+    // Create a thread to refresh the cached records, pulling the expired record
+    // into the thread's memory
+    // Then insert an active record, and confirm that the refresh thread does not
+    // override the active record with the expired record it has in memory
+
+    MembershipState.setDeletionMs(-1);
+
+    MembershipState expiredReport = createRegistration(
+        NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
+        FederationNamenodeServiceState.ACTIVE);
+    expiredReport.setDateModified(Time.monotonicNow() - 5000);
+    expiredReport.setState(FederationNamenodeServiceState.EXPIRED);
+    assertTrue(namenodeHeartbeat(expiredReport));
+
+    // Load cache
+    MembershipStore memStoreSpy = spy(membershipStore);
+    DelayAnswer delayer = new DelayAnswer(LOG);
+    doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());
+
+    ExecutorService pool = Executors.newFixedThreadPool(1);
+
+    Future<Boolean> cacheRefreshFuture = pool.submit(() -> {
+      try {
+        return memStoreSpy.loadCache(true);
+      } catch (IOException e) {
+        LOG.error("Exception while loading cache:", e);
+      }
+      return false;
+    });
+
+    // Verify quorum and entry
+    MembershipState quorumEntry = getNamenodeRegistration(
+        expiredReport.getNameserviceId(), expiredReport.getNamenodeId());
+    assertNull(quorumEntry);
+
+
+    MembershipState record = membershipStore.getDriver()
+        .get(MembershipState.class).getRecords().get(0);
+    assertNotNull(record);
+    assertEquals(ROUTERS[0], record.getRouterId());
+    assertEquals(FederationNamenodeServiceState.EXPIRED,
+        record.getState());
+
+    // Insert active while the other thread refreshing it's cache
+    MembershipState activeReport = createRegistration(
+        NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
+        FederationNamenodeServiceState.ACTIVE);
+
+    delayer.waitForCall();
+    assertTrue(namenodeHeartbeat(activeReport));
+
+    record = membershipStore.getDriver()
+        .get(MembershipState.class).getRecords().get(0);
+    assertNotNull(record);
+    assertEquals(ROUTERS[0], record.getRouterId());
+    assertEquals(FederationNamenodeServiceState.ACTIVE,
+        record.getState());
+
+    quorumEntry = getExpiredNamenodeRegistration(
+        expiredReport.getNameserviceId(), expiredReport.getNamenodeId());
+    assertNull(quorumEntry);
+
+    // Allow the thread to finish refreshing the cache
+    delayer.proceed();
+    assertTrue(cacheRefreshFuture.get(5, TimeUnit.SECONDS));
+
+    // The state store should still be the active report
+    record = membershipStore.getDriver()
+        .get(MembershipState.class).getRecords().get(0);
+    assertNotNull(record);
+    assertEquals(ROUTERS[0], record.getRouterId());
+    assertEquals(FederationNamenodeServiceState.ACTIVE,
+        record.getState());
+
+    membershipStore.loadCache(true);
+
+    quorumEntry = getExpiredNamenodeRegistration(
+        expiredReport.getNameserviceId(),
+        expiredReport.getNamenodeId());
+    assertNull(quorumEntry);
+  }
+
   @Test
   public void testNamespaceInfoWithUnavailableNameNodeRegistration()
       throws IOException {