Browse Source

HDFS-12519. Ozone: Lease Manager framework. Contributed by Nandakumar.

Nandakumar 7 years ago
parent
commit
747aefaa81

+ 189 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/Lease.java

@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * This class represents the lease created on a resource. Callback can be
+ * registered on the lease which will be executed in case of timeout.
+ *
+ * @param <T> Resource type for which the lease can be associated
+ */
+public class Lease<T> {
+
+  /**
+   * The resource for which this lease is created.
+   */
+  private final T resource;
+
+  private final long creationTime;
+
+  /**
+   * Lease lifetime in milliseconds.
+   */
+  private volatile long leaseTimeout;
+
+  private boolean expired;
+
+  /**
+   * Functions to be called in case of timeout.
+   */
+  private List<Callable<Void>> callbacks;
+
+
+  /**
+   * Creates a lease on the specified resource with given timeout.
+   *
+   * @param resource
+   *        Resource for which the lease has to be created
+   * @param timeout
+   *        Lease lifetime in milliseconds
+   */
+  public Lease(T resource, long timeout) {
+    this.resource = resource;
+    this.leaseTimeout = timeout;
+    this.callbacks = Collections.synchronizedList(new ArrayList<>());
+    this.creationTime = Time.monotonicNow();
+    this.expired = false;
+  }
+
+  /**
+   * Returns true if the lease has expired, else false.
+   *
+   * @return true if expired, else false
+   */
+  public boolean hasExpired() {
+    return expired;
+  }
+
+  /**
+   * Registers a callback which will be executed in case of timeout. Callbacks
+   * are executed in a separate Thread.
+   *
+   * @param callback
+   *        The Callable which has to be executed
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public void registerCallBack(Callable<Void> callback)
+      throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    callbacks.add(callback);
+  }
+
+  /**
+   * Returns the time elapsed since the creation of lease.
+   *
+   * @return elapsed time in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public long getElapsedTime() throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    return Time.monotonicNow() - creationTime;
+  }
+
+  /**
+   * Returns the time available before timeout.
+   *
+   * @return remaining time in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public long getRemainingTime() throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    return leaseTimeout - getElapsedTime();
+  }
+
+  /**
+   * Returns total lease lifetime.
+   *
+   * @return total lifetime of lease in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public long getLeaseLifeTime() throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    return leaseTimeout;
+  }
+
+  /**
+   * Renews the lease timeout period.
+   *
+   * @param timeout
+   *        Time to be added to the lease in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public void renew(long timeout) throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    leaseTimeout += timeout;
+  }
+
+  @Override
+  public int hashCode() {
+    return resource.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof Lease) {
+      return resource.equals(((Lease) obj).resource);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "Lease<" + resource.toString() + ">";
+  }
+
+  /**
+   * Returns the callbacks to be executed for the lease in case of timeout.
+   *
+   * @return callbacks to be executed
+   */
+  List<Callable<Void>> getCallbacks() {
+    return callbacks;
+  }
+
+  /**
+   * Expires/Invalidates the lease.
+   */
+  void invalidate() {
+    callbacks = null;
+    expired = true;
+  }
+
+}

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that there is already a lease acquired on the
+ * same resource.
+ */
+public class LeaseAlreadyExistException  extends LeaseException {
+
+  /**
+   * Constructs an {@code LeaseAlreadyExistException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseAlreadyExistException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseAlreadyExistException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseAlreadyExistException(String message) {
+    super(message);
+  }
+
+}

+ 65 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * This class is responsible for executing the callbacks of a lease in case of
+ * timeout.
+ */
+public class LeaseCallbackExecutor<T> implements Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Lease.class);
+
+  private final T resource;
+  private final List<Callable<Void>> callbacks;
+
+  /**
+   * Constructs LeaseCallbackExecutor instance with list of callbacks.
+   *
+   * @param resource
+   *        The resource for which the callbacks are executed
+   * @param callbacks
+   *        Callbacks to be executed by this executor
+   */
+  public LeaseCallbackExecutor(T resource, List<Callable<Void>> callbacks) {
+    this.resource = resource;
+    this.callbacks = callbacks;
+  }
+
+  @Override
+  public void run() {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Executing callbacks for lease on {}", resource);
+    }
+    for(Callable<Void> callback : callbacks) {
+      try {
+        callback.call();
+      } catch (Exception e) {
+        LOG.warn("Exception while executing callback for lease on {}",
+            resource, e);
+      }
+    }
+  }
+
+}

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents all lease related exceptions.
+ */
+public class LeaseException extends Exception {
+
+  /**
+   * Constructs an {@code LeaseException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseException(String message) {
+    super(message);
+  }
+
+}

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that the lease that is being accessed has expired.
+ */
+public class LeaseExpiredException extends LeaseException {
+
+  /**
+   * Constructs an {@code LeaseExpiredException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseExpiredException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseExpiredException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseExpiredException(String message) {
+    super(message);
+  }
+
+}

+ 247 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java

@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * LeaseManager is someone who can provide you leases based on your
+ * requirement. If you want to return the lease back before it expires,
+ * you can give it back to Lease Manager. He is the one responsible for
+ * the lifecycle of leases. The resource for which lease is created
+ * should have proper {@code equals} method implementation, resource
+ * equality is checked while the lease is created.
+ *
+ * @param <T> Type of leases that this lease manager can create
+ */
+public class LeaseManager<T> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LeaseManager.class);
+
+  private final long defaultTimeout;
+  private Map<T, Lease<T>> activeLeases;
+  private LeaseMonitor leaseMonitor;
+  private Thread leaseMonitorThread;
+  private boolean isRunning;
+
+  /**
+   * Creates an instance of lease manager.
+   *
+   * @param defaultTimeout
+   *        Default timeout value to be used for lease creation.
+   */
+  public LeaseManager(long defaultTimeout) {
+    this.defaultTimeout = defaultTimeout;
+  }
+
+  /**
+   * Starts the lease manager service.
+   */
+  public void start() {
+    LOG.debug("Starting LeaseManager service");
+    activeLeases = new ConcurrentHashMap<>();
+    leaseMonitor = new LeaseMonitor();
+    leaseMonitorThread = new Thread(leaseMonitor);
+    leaseMonitorThread.setName("LeaseManager#LeaseMonitor");
+    leaseMonitorThread.setDaemon(true);
+    leaseMonitorThread.setUncaughtExceptionHandler((thread, throwable) -> {
+      // Let us just restart this thread after logging an error.
+      // if this thread is not running we cannot handle Lease expiry.
+      LOG.error("LeaseMonitor thread encountered an error. Thread: {}",
+          thread.toString(), throwable);
+      leaseMonitorThread.start();
+    });
+    LOG.debug("Starting LeaseManager#LeaseMonitor Thread");
+    leaseMonitorThread.start();
+    isRunning = true;
+  }
+
+  /**
+   * Returns a lease for the specified resource with default timeout.
+   *
+   * @param resource
+   *        Resource for which lease has to be created
+   * @throws LeaseAlreadyExistException
+   *         If there is already a lease on the resource
+   */
+  public synchronized Lease<T> acquire(T resource)
+      throws LeaseAlreadyExistException {
+    return acquire(resource, defaultTimeout);
+  }
+
+  /**
+   * Returns a lease for the specified resource with the timeout provided.
+   *
+   * @param resource
+   *        Resource for which lease has to be created
+   * @param timeout
+   *        The timeout in milliseconds which has to be set on the lease
+   * @throws LeaseAlreadyExistException
+   *         If there is already a lease on the resource
+   */
+  public synchronized Lease<T> acquire(T resource, long timeout)
+      throws LeaseAlreadyExistException {
+    checkStatus();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout);
+    }
+    if(activeLeases.containsKey(resource)) {
+      throw new LeaseAlreadyExistException("Resource: " + resource);
+    }
+    Lease<T> lease = new Lease<>(resource, timeout);
+    activeLeases.put(resource, lease);
+    leaseMonitorThread.interrupt();
+    return lease;
+  }
+
+  /**
+   * Returns a lease associated with the specified resource.
+   *
+   * @param resource
+   *        Resource for which the lease has to be returned
+   * @throws LeaseNotFoundException
+   *         If there is no active lease on the resource
+   */
+  public Lease<T> get(T resource) throws LeaseNotFoundException {
+    checkStatus();
+    Lease<T> lease = activeLeases.get(resource);
+    if(lease != null) {
+      return lease;
+    }
+    throw new LeaseNotFoundException("Resource: " + resource);
+  }
+
+  /**
+   * Releases the lease associated with the specified resource.
+   *
+   * @param resource
+   *        The for which the lease has to be released
+   * @throws LeaseNotFoundException
+   *         If there is no active lease on the resource
+   */
+  public synchronized void release(T resource)
+      throws LeaseNotFoundException {
+    checkStatus();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Releasing lease on {}", resource);
+    }
+    Lease<T> lease = activeLeases.remove(resource);
+    if(lease == null) {
+      throw new LeaseNotFoundException("Resource: " + resource);
+    }
+    lease.invalidate();
+  }
+
+  /**
+   * Shuts down the LeaseManager and releases the resources. All the active
+   * {@link Lease} will be released (callbacks on leases will not be
+   * executed).
+   */
+  public void shutdown() {
+    checkStatus();
+    LOG.debug("Shutting down LeaseManager service");
+    leaseMonitor.disable();
+    leaseMonitorThread.interrupt();
+    for(T resource : activeLeases.keySet()) {
+      try {
+        release(resource);
+      }  catch(LeaseNotFoundException ex) {
+        //Ignore the exception, someone might have released the lease
+      }
+    }
+    isRunning = false;
+  }
+
+  /**
+   * Throws {@link LeaseManagerNotRunningException} if the service is not
+   * running.
+   */
+  private void checkStatus() {
+    if(!isRunning) {
+      throw new LeaseManagerNotRunningException("LeaseManager not running.");
+    }
+  }
+
+  /**
+   * Monitors the leases and expires them based on the timeout, also
+   * responsible for executing the callbacks of expired leases.
+   */
+  private final class LeaseMonitor implements Runnable {
+
+    private boolean monitor = true;
+    private ExecutorService executorService;
+
+    private LeaseMonitor() {
+      this.monitor = true;
+      this.executorService = Executors.newCachedThreadPool();
+    }
+
+    @Override
+    public void run() {
+      while(monitor) {
+        LOG.debug("LeaseMonitor: checking for lease expiry");
+        long sleepTime = Long.MAX_VALUE;
+
+        for (T resource : activeLeases.keySet()) {
+          try {
+            Lease<T> lease = get(resource);
+            long remainingTime = lease.getRemainingTime();
+            if (remainingTime <= 0) {
+              //Lease has timed out
+              List<Callable<Void>> leaseCallbacks = lease.getCallbacks();
+              release(resource);
+              executorService.execute(
+                  new LeaseCallbackExecutor(resource, leaseCallbacks));
+            } else {
+              sleepTime = remainingTime > sleepTime ?
+                  sleepTime : remainingTime;
+            }
+          } catch (LeaseNotFoundException | LeaseExpiredException ex) {
+            //Ignore the exception, someone might have released the lease
+          }
+        }
+
+        try {
+          if(!Thread.interrupted()) {
+            Thread.sleep(sleepTime);
+          }
+        } catch (InterruptedException ignored) {
+          // This means a new lease is added to activeLeases.
+        }
+      }
+    }
+
+    /**
+     * Disables lease monitor, next interrupt call on the thread
+     * will stop lease monitor.
+     */
+    public void disable() {
+      monitor = false;
+    }
+  }
+
+}

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that there LeaseManager service is not running.
+ */
+public class LeaseManagerNotRunningException  extends RuntimeException {
+
+  /**
+   * Constructs an {@code LeaseManagerNotRunningException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseManagerNotRunningException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseManagerNotRunningException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseManagerNotRunningException(String message) {
+    super(message);
+  }
+
+}

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that the lease that is being accessed does not
+ * exist.
+ */
+public class LeaseNotFoundException extends LeaseException {
+
+  /**
+   * Constructs an {@code LeaseNotFoundException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseNotFoundException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseNotFoundException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseNotFoundException(String message) {
+    super(message);
+  }
+
+}

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/lease/package-info.java

@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * A generic lease management API which can be used if a service
+ * needs any kind of lease management.
+ */
+
+package org.apache.hadoop.ozone.lease;
+/*
+ This package contains lease management related classes.
+ */

+ 374 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java

@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * A generic lease management API which can be used if a service
+ * needs any kind of lease management.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test class to check functionality and consistency of LeaseManager.
+ */
+public class TestLeaseManager {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Dummy resource on which leases can be acquired.
+   */
+  private final class DummyResource {
+
+    private final String name;
+
+    private DummyResource(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if(obj instanceof DummyResource) {
+        return name.equals(((DummyResource) obj).name);
+      }
+      return false;
+    }
+  }
+
+  @Test
+  public void testLeaseAcquireAndRelease() throws LeaseException {
+    //It is assumed that the test case execution won't take more than 5 seconds,
+    //if it takes more time increase the defaultTimeout value of LeaseManager.
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+    Assert.assertEquals(leaseThree, manager.get(resourceThree));
+    Assert.assertFalse(leaseOne.hasExpired());
+    Assert.assertFalse(leaseTwo.hasExpired());
+    Assert.assertFalse(leaseThree.hasExpired());
+    //The below releases should not throw LeaseNotFoundException.
+    manager.release(resourceOne);
+    manager.release(resourceTwo);
+    manager.release(resourceThree);
+    Assert.assertTrue(leaseOne.hasExpired());
+    Assert.assertTrue(leaseTwo.hasExpired());
+    Assert.assertTrue(leaseThree.hasExpired());
+    manager.shutdown();
+  }
+
+  @Test
+  public void testLeaseAlreadyExist() throws LeaseException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+
+    exception.expect(LeaseAlreadyExistException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.acquire(resourceOne);
+
+    manager.release(resourceOne);
+    manager.release(resourceTwo);
+    manager.shutdown();
+  }
+
+  @Test
+  public void testLeaseNotFound() throws LeaseException, InterruptedException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+
+    //Case 1: lease was never acquired.
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.get(resourceOne);
+
+    //Case 2: lease is acquired and released.
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+    Assert.assertFalse(leaseTwo.hasExpired());
+    manager.release(resourceTwo);
+    Assert.assertTrue(leaseTwo.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceTwo);
+    manager.get(resourceTwo);
+
+    //Case 3: lease acquired and timed out.
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
+    Assert.assertEquals(leaseThree, manager.get(resourceThree));
+    Assert.assertFalse(leaseThree.hasExpired());
+    long sleepTime = leaseThree.getRemainingTime() + 5;
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseThree.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceThree);
+    manager.get(resourceThree);
+    manager.shutdown();
+  }
+
+  @Test
+  public void testCustomLeaseTimeout() throws LeaseException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo, 10000);
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree, 50000);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+    Assert.assertEquals(leaseThree, manager.get(resourceThree));
+    Assert.assertFalse(leaseOne.hasExpired());
+    Assert.assertFalse(leaseTwo.hasExpired());
+    Assert.assertFalse(leaseThree.hasExpired());
+    Assert.assertEquals(5000, leaseOne.getLeaseLifeTime());
+    Assert.assertEquals(10000, leaseTwo.getLeaseLifeTime());
+    Assert.assertEquals(50000, leaseThree.getLeaseLifeTime());
+    // Releasing of leases is done in shutdown, so don't have to worry about
+    // lease release
+    manager.shutdown();
+  }
+
+  @Test
+  public void testLeaseCallback() throws LeaseException, InterruptedException {
+    Map<DummyResource, String> leaseStatus = new HashMap<>();
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    leaseStatus.put(resourceOne, "lease in use");
+    leaseOne.registerCallBack(() -> {
+      leaseStatus.put(resourceOne, "lease expired");
+      return null;
+    });
+    // wait for lease to expire
+    long sleepTime = leaseOne.getRemainingTime() + 5;
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseOne.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.get(resourceOne);
+    // check if callback has been executed
+    Assert.assertEquals("lease expired", leaseStatus.get(resourceOne));
+  }
+
+  @Test
+  public void testCallbackExecutionInCaseOfLeaseRelease()
+      throws LeaseException, InterruptedException {
+    // Callbacks should not be executed in case of lease release
+    Map<DummyResource, String> leaseStatus = new HashMap<>();
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    leaseStatus.put(resourceOne, "lease in use");
+    leaseOne.registerCallBack(() -> {
+      leaseStatus.put(resourceOne, "lease expired");
+      return null;
+    });
+    leaseStatus.put(resourceOne, "lease released");
+    manager.release(resourceOne);
+    Assert.assertTrue(leaseOne.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.get(resourceOne);
+    Assert.assertEquals("lease released", leaseStatus.get(resourceOne));
+  }
+
+  @Test
+  public void testLeaseCallbackWithMultipleLeases()
+      throws LeaseException, InterruptedException {
+    Map<DummyResource, String> leaseStatus = new HashMap<>();
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+    DummyResource resourceFour = new DummyResource("four");
+    DummyResource resourceFive = new DummyResource("five");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
+    Lease<DummyResource> leaseFour = manager.acquire(resourceFour);
+    Lease<DummyResource> leaseFive = manager.acquire(resourceFive);
+    leaseStatus.put(resourceOne, "lease in use");
+    leaseStatus.put(resourceTwo, "lease in use");
+    leaseStatus.put(resourceThree, "lease in use");
+    leaseStatus.put(resourceFour, "lease in use");
+    leaseStatus.put(resourceFive, "lease in use");
+    leaseOne.registerCallBack(() -> {
+      leaseStatus.put(resourceOne, "lease expired");
+      return null;
+    });
+    leaseTwo.registerCallBack(() -> {
+      leaseStatus.put(resourceTwo, "lease expired");
+      return null;
+    });
+    leaseThree.registerCallBack(() -> {
+      leaseStatus.put(resourceThree, "lease expired");
+      return null;
+    });
+    leaseFour.registerCallBack(() -> {
+      leaseStatus.put(resourceFour, "lease expired");
+      return null;
+    });
+    leaseFive.registerCallBack(() -> {
+      leaseStatus.put(resourceFive, "lease expired");
+      return null;
+    });
+
+    // release lease one, two and three
+    leaseStatus.put(resourceOne, "lease released");
+    manager.release(resourceOne);
+    leaseStatus.put(resourceTwo, "lease released");
+    manager.release(resourceTwo);
+    leaseStatus.put(resourceThree, "lease released");
+    manager.release(resourceThree);
+
+    // wait for other leases to expire
+    long sleepTime = leaseFive.getRemainingTime() + 10;
+
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseOne.hasExpired());
+    Assert.assertTrue(leaseTwo.hasExpired());
+    Assert.assertTrue(leaseThree.hasExpired());
+    Assert.assertTrue(leaseFour.hasExpired());
+    Assert.assertTrue(leaseFive.hasExpired());
+
+    Assert.assertEquals("lease released", leaseStatus.get(resourceOne));
+    Assert.assertEquals("lease released", leaseStatus.get(resourceTwo));
+    Assert.assertEquals("lease released", leaseStatus.get(resourceThree));
+    Assert.assertEquals("lease expired", leaseStatus.get(resourceFour));
+    Assert.assertEquals("lease expired", leaseStatus.get(resourceFive));
+    manager.shutdown();
+  }
+
+  @Test
+  public void testReuseReleasedLease() throws LeaseException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+
+    manager.release(resourceOne);
+    Assert.assertTrue(leaseOne.hasExpired());
+
+    Lease<DummyResource> sameResourceLease = manager.acquire(resourceOne);
+    Assert.assertEquals(sameResourceLease, manager.get(resourceOne));
+    Assert.assertFalse(sameResourceLease.hasExpired());
+
+    manager.release(resourceOne);
+    Assert.assertTrue(sameResourceLease.hasExpired());
+    manager.shutdown();
+  }
+
+  @Test
+  public void testReuseTimedOutLease()
+      throws LeaseException, InterruptedException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+
+    // wait for lease to expire
+    long sleepTime = leaseOne.getRemainingTime() + 5;
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseOne.hasExpired());
+
+    Lease<DummyResource> sameResourceLease = manager.acquire(resourceOne);
+    Assert.assertEquals(sameResourceLease, manager.get(resourceOne));
+    Assert.assertFalse(sameResourceLease.hasExpired());
+
+    manager.release(resourceOne);
+    Assert.assertTrue(sameResourceLease.hasExpired());
+    manager.shutdown();
+  }
+
+  @Test
+  public void testRenewLease() throws LeaseException, InterruptedException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+
+    // add 5 more seconds to the lease
+    leaseOne.renew(5000);
+
+    Thread.sleep(5000);
+
+    // lease should still be active
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+    manager.release(resourceOne);
+    manager.shutdown();
+  }
+
+}

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/lease/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+/*
+ This package contains lease management unit test classes.
+ */