소스 검색

HDDS-195. Create generic CommandWatcher utility.
Contributed by Elek, Marton.

Anu Engineer 7 년 전
부모
커밋
85627e2cba

+ 157 - 0
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
+import org.apache.hadoop.ozone.lease.LeaseExpiredException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event watcher the (re)send a message after timeout.
+ * <p>
+ * Event watcher will send the tracked payload/event after a timeout period
+ * unless a confirmation from the original event (completion event) is arrived.
+ *
+ * @param <TIMEOUT_PAYLOAD>    The type of the events which are tracked.
+ * @param <COMPLETION_PAYLOAD> The type of event which could cancel the
+ *                             tracking.
+ */
+@SuppressWarnings("CheckStyle")
+public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
+    IdentifiableEventPayload,
+    COMPLETION_PAYLOAD extends IdentifiableEventPayload> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EventWatcher.class);
+
+  private final Event<TIMEOUT_PAYLOAD> startEvent;
+
+  private final Event<COMPLETION_PAYLOAD> completionEvent;
+
+  private final LeaseManager<UUID> leaseManager;
+
+  protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID =
+      new ConcurrentHashMap<>();
+
+  protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>();
+
+  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+      Event<COMPLETION_PAYLOAD> completionEvent,
+      LeaseManager<UUID> leaseManager) {
+    this.startEvent = startEvent;
+    this.completionEvent = completionEvent;
+    this.leaseManager = leaseManager;
+
+  }
+
+  public void start(EventQueue queue) {
+
+    queue.addHandler(startEvent, this::handleStartEvent);
+
+    queue.addHandler(completionEvent, (completionPayload, publisher) -> {
+      UUID uuid = completionPayload.getUUID();
+      try {
+        handleCompletion(uuid, publisher);
+      } catch (LeaseNotFoundException e) {
+        //It's already done. Too late, we already retried it.
+        //Not a real problem.
+        LOG.warn("Completion event without active lease. UUID={}", uuid);
+      }
+    });
+
+  }
+
+  private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
+      EventPublisher publisher) {
+    UUID identifier = payload.getUUID();
+    trackedEventsByUUID.put(identifier, payload);
+    trackedEvents.add(payload);
+    try {
+      Lease<UUID> lease = leaseManager.acquire(identifier);
+      try {
+        lease.registerCallBack(() -> {
+          handleTimeout(publisher, identifier);
+          return null;
+        });
+
+      } catch (LeaseExpiredException e) {
+        handleTimeout(publisher, identifier);
+      }
+    } catch (LeaseAlreadyExistException e) {
+      //No problem at all. But timer is not reset.
+    }
+  }
+
+  private synchronized void handleCompletion(UUID uuid,
+      EventPublisher publisher) throws LeaseNotFoundException {
+    leaseManager.release(uuid);
+    TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
+    trackedEvents.remove(payload);
+    onFinished(publisher, payload);
+  }
+
+  private synchronized void handleTimeout(EventPublisher publisher,
+      UUID identifier) {
+    TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
+    trackedEvents.remove(payload);
+    onTimeout(publisher, payload);
+  }
+
+
+  /**
+   * Check if a specific payload is in-progress.
+   */
+  public synchronized boolean contains(TIMEOUT_PAYLOAD payload) {
+    return trackedEvents.contains(payload);
+  }
+
+  public synchronized boolean remove(TIMEOUT_PAYLOAD payload) {
+    try {
+      leaseManager.release(payload.getUUID());
+    } catch (LeaseNotFoundException e) {
+      LOG.warn("Completion event without active lease. UUID={}",
+          payload.getUUID());
+    }
+    trackedEventsByUUID.remove(payload.getUUID());
+    return trackedEvents.remove(payload);
+
+  }
+
+  abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+
+  abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+
+  public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
+      Predicate<? super TIMEOUT_PAYLOAD> predicate) {
+    return trackedEventsByUUID.values().stream().filter(predicate)
+        .collect(Collectors.toList());
+  }
+}

+ 30 - 0
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.UUID;
+
+/**
+ * Event with an additional unique identifier.
+ *
+ */
+public interface IdentifiableEventPayload {
+
+  UUID getUUID();
+
+}

+ 38 - 0
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/EventHandlerStub.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Dummy class for testing to collect all the received events.
+ */
+public class EventHandlerStub<PAYLOAD> implements EventHandler<PAYLOAD> {
+
+  private List<PAYLOAD> receivedEvents = new ArrayList<>();
+
+  @Override
+  public void onMessage(PAYLOAD payload, EventPublisher publisher) {
+    receivedEvents.add(payload);
+  }
+
+  public List<PAYLOAD> getReceivedEvents() {
+    return receivedEvents;
+  }
+}

+ 220 - 0
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java

@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the basic functionality of event watcher.
+ */
+public class TestEventWatcher {
+
+  private static final TypedEvent<UnderreplicatedEvent> WATCH_UNDER_REPLICATED =
+      new TypedEvent<>(UnderreplicatedEvent.class);
+
+  private static final TypedEvent<UnderreplicatedEvent> UNDER_REPLICATED =
+      new TypedEvent<>(UnderreplicatedEvent.class);
+
+  private static final TypedEvent<ReplicationCompletedEvent>
+      REPLICATION_COMPLETED = new TypedEvent<>(ReplicationCompletedEvent.class);
+
+  LeaseManager<UUID> leaseManager;
+
+  @Before
+  public void startLeaseManager() {
+    leaseManager = new LeaseManager<>(2000l);
+    leaseManager.start();
+  }
+
+  @After
+  public void stopLeaseManager() {
+    leaseManager.shutdown();
+  }
+
+
+  @Test
+  public void testEventHandling() throws InterruptedException {
+
+    EventQueue queue = new EventQueue();
+
+    EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
+        replicationWatcher = createEventWatcher();
+
+    EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents =
+        new EventHandlerStub<>();
+
+    queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+    replicationWatcher.start(queue);
+
+    UUID uuid1 = UUID.randomUUID();
+    UUID uuid2 = UUID.randomUUID();
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED,
+        new UnderreplicatedEvent(uuid1, "C1"));
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED,
+        new UnderreplicatedEvent(uuid2, "C2"));
+
+    Assert.assertEquals(0, underReplicatedEvents.getReceivedEvents().size());
+
+    Thread.sleep(1000);
+
+    queue.fireEvent(REPLICATION_COMPLETED,
+        new ReplicationCompletedEvent(uuid1, "C2", "D1"));
+
+    Assert.assertEquals(0, underReplicatedEvents.getReceivedEvents().size());
+
+    Thread.sleep(1500);
+
+    queue.processAll(1000L);
+
+    Assert.assertEquals(1, underReplicatedEvents.getReceivedEvents().size());
+    Assert.assertEquals(uuid2,
+        underReplicatedEvents.getReceivedEvents().get(0).UUID);
+
+  }
+
+  @Test
+  public void testInprogressFilter() throws InterruptedException {
+
+    EventQueue queue = new EventQueue();
+
+    EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
+        replicationWatcher = createEventWatcher();
+
+    EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents =
+        new EventHandlerStub<>();
+
+    queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+    replicationWatcher.start(queue);
+
+    UnderreplicatedEvent event1 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED,
+        new UnderreplicatedEvent(UUID.randomUUID(), "C2"));
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED,
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1"));
+
+    queue.processAll(1000L);
+    Thread.sleep(1000L);
+    List<UnderreplicatedEvent> c1todo = replicationWatcher
+        .getTimeoutEvents(e -> e.containerId.equalsIgnoreCase("C1"));
+
+    Assert.assertEquals(2, c1todo.size());
+    Assert.assertTrue(replicationWatcher.contains(event1));
+    Thread.sleep(1500L);
+
+    c1todo = replicationWatcher
+        .getTimeoutEvents(e -> e.containerId.equalsIgnoreCase("C1"));
+    Assert.assertEquals(0, c1todo.size());
+    Assert.assertFalse(replicationWatcher.contains(event1));
+
+
+  }
+
+  private EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
+  createEventWatcher() {
+    return new EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>(
+        WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
+
+      @Override
+      void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+        publisher.fireEvent(UNDER_REPLICATED, payload);
+      }
+
+      @Override
+      void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+        //Good job. We did it.
+      }
+    };
+  }
+
+  private static class ReplicationCompletedEvent
+      implements IdentifiableEventPayload {
+
+    private final UUID UUID;
+
+    private final String containerId;
+
+    private final String datanodeId;
+
+    public ReplicationCompletedEvent(UUID UUID, String containerId,
+        String datanodeId) {
+      this.UUID = UUID;
+      this.containerId = containerId;
+      this.datanodeId = datanodeId;
+    }
+
+    public UUID getUUID() {
+      return UUID;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ReplicationCompletedEvent that = (ReplicationCompletedEvent) o;
+      return Objects.equals(containerId, that.containerId) && Objects
+          .equals(datanodeId, that.datanodeId);
+    }
+
+    @Override
+    public int hashCode() {
+
+      return Objects.hash(containerId, datanodeId);
+    }
+  }
+
+  private static class UnderreplicatedEvent
+
+      implements IdentifiableEventPayload {
+
+    private final UUID UUID;
+
+    private final String containerId;
+
+    public UnderreplicatedEvent(UUID UUID, String containerId) {
+      this.containerId = containerId;
+      this.UUID = UUID;
+    }
+
+    public UUID getUUID() {
+      return UUID;
+    }
+  }
+
+}