瀏覽代碼

Reverting revision r1405173 that accidentally committed extraneous change from my workspace

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1406915 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 年之前
父節點
當前提交
dd7d8b17e7

+ 4 - 4
CHANGES.txt

@@ -307,10 +307,6 @@ Release 1.2.0 - unreleased
     MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on 
     default FS. (Gera Shegalov via tucu)
 
-    HDFS-3791. HDFS-173 Backport - Namenode will not block until a large 
-    directory deletion completes. It allows other operations when the 
-    deletion is in progress. (umamahesh via suresh)
-
     MAPREDUCE-4765. Restarting the JobTracker programmatically can cause
     DelegationTokenRenewal to throw an exception. (rkanter via tucu)
 
@@ -326,6 +322,10 @@ Release 1.1.1 - Unreleased
 
   BUG FIXES
 
+    HDFS-3791. HDFS-173 Backport - Namenode will not block until a large 
+    directory deletion completes. It allows other operations when the 
+    deletion is in progress. (umamahesh via suresh)
+
 Release 1.1.0 - 2012.09.28
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/core/org/apache/hadoop/ipc/Client.java

@@ -109,7 +109,7 @@ public class Client {
    * @param conf Configuration
    * @return the ping interval
    */
-  public final static int getPingInterval(Configuration conf) {
+  final static int getPingInterval(Configuration conf) {
     return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
   }
   

+ 116 - 12
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -86,7 +86,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   volatile boolean clientRunning = true;
   Random r = new Random();
   final String clientName;
-  final LeaseRenewer leaserenewer;
+  final LeaseChecker leasechecker = new LeaseChecker();
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
@@ -250,9 +250,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
-    
-    // TODO: review this
-    leaserenewer = new LeaseRenewer(this, Client.getPingInterval(conf));
 
     ugi = UserGroupInformation.getCurrentUser();
 
@@ -320,10 +317,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leaserenewer.close();
+      leasechecker.close();
       clientRunning = false;
       try {
-        leaserenewer.interruptAndJoin();
+        leasechecker.interruptAndJoin();
       } catch (InterruptedException ie) {
       }
   
@@ -763,7 +760,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     OutputStream result = new DFSOutputStream(src, masked,
         overwrite, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
-    leaserenewer.put(src, result);
+    leasechecker.put(src, result);
     return result;
   }
 
@@ -818,7 +815,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
     final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
-    leaserenewer.put(src, result);
+    leasechecker.put(src, result);
     return result;
   }
 
@@ -1395,8 +1392,115 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     throw new IOException("No live nodes contain current block");
   }
 
-  boolean isLeaseRenewerStarted() {
-    return leaserenewer.isRunning();
+  boolean isLeaseCheckerStarted() {
+    return leasechecker.daemon != null;
+  }
+
+  /** Lease management*/
+  class LeaseChecker implements Runnable {
+    /** A map from src -> DFSOutputStream of files that are currently being
+     * written by this client.
+     */
+    private final SortedMap<String, OutputStream> pendingCreates
+        = new TreeMap<String, OutputStream>();
+
+    private Daemon daemon = null;
+    
+    synchronized void put(String src, OutputStream out) {
+      if (clientRunning) {
+        if (daemon == null) {
+          daemon = new Daemon(this);
+          daemon.start();
+        }
+        pendingCreates.put(src, out);
+      }
+    }
+    
+    synchronized void remove(String src) {
+      pendingCreates.remove(src);
+    }
+    
+    void interruptAndJoin() throws InterruptedException {
+      Daemon daemonCopy = null;
+      synchronized (this) {
+        if (daemon != null) {
+          daemon.interrupt();
+          daemonCopy = daemon;
+        }
+      }
+     
+      if (daemonCopy != null) {
+        LOG.debug("Wait for lease checker to terminate");
+        daemonCopy.join();
+      }
+    }
+
+    void close() {
+      while (true) {
+        String src;
+        OutputStream out;
+        synchronized (this) {
+          if (pendingCreates.isEmpty()) {
+            return;
+          }
+          src = pendingCreates.firstKey();
+          out = pendingCreates.remove(src);
+        }
+        if (out != null) {
+          try {
+            out.close();
+          } catch (IOException ie) {
+            LOG.error("Exception closing file " + src+ " : " + ie, ie);
+          }
+        }
+      }
+    }
+
+    private void renew() throws IOException {
+      synchronized(this) {
+        if (pendingCreates.isEmpty()) {
+          return;
+        }
+      }
+      namenode.renewLease(clientName);
+    }
+
+    /**
+     * Periodically check in with the namenode and renew all the leases
+     * when the lease period is half over.
+     */
+    public void run() {
+      long lastRenewed = 0;
+      while (clientRunning && !Thread.interrupted()) {
+        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
+          try {
+            renew();
+            lastRenewed = System.currentTimeMillis();
+          } catch (IOException ie) {
+            LOG.warn("Problem renewing lease for " + clientName, ie);
+          }
+        }
+
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this + " is interrupted.", ie);
+          }
+          return;
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      String s = getClass().getSimpleName();
+      if (LOG.isTraceEnabled()) {
+        return s + "@" + DFSClient.this + ": "
+               + StringUtils.stringifyException(new Throwable("for testing"));
+      }
+      return s;
+    }
   }
 
   /** Utility class to encapsulate data node info and its address. */
@@ -3890,7 +3994,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           throw e;
       }
       closeInternal();
-      leaserenewer.remove(src);
+      leasechecker.remove(src);
       
       if (s != null) {
         s.close();
@@ -3907,7 +4011,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       response.close();
       closed = true;
     }
-    
+ 
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {

+ 0 - 243
src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java

@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.SocketTimeoutException;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.StringUtils;
-
-public class LeaseRenewer {
-  private static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
-
-  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
-  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
-  /** A map from src -> DFSOutputStream of files that are currently being
-   * written by this client.
-   */
-  private final SortedMap<String, OutputStream> pendingCreates
-      = new TreeMap<String, OutputStream>();
-  /** The time in milliseconds that the map became empty. */
-  private long emptyTime = Long.MAX_VALUE;
-  /** A fixed lease renewal time period in milliseconds */
-  private final long renewal;
-
-  /** A daemon for renewing lease */
-  private Daemon daemon = null;
-  /** Only the daemon with currentId should run. */
-  private int currentId = 0;
-
-  /** 
-   * A period in milliseconds that the lease renewer thread should run
-   * after the map became empty.
-   * If the map is empty for a time period longer than the grace period,
-   * the renewer should terminate.  
-   */
-  private long gracePeriod;
-  /**
-   * The time period in milliseconds
-   * that the renewer sleeps for each iteration. 
-   */
-  private volatile long sleepPeriod;
-
-  private final DFSClient dfsclient;
-
-  LeaseRenewer(final DFSClient dfsclient, final long timeout) {
-    this.dfsclient = dfsclient;
-    this.renewal = (timeout > 0 && timeout < FSConstants.LEASE_SOFTLIMIT_PERIOD)? 
-        timeout/2: FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
-    setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
-  }
-
-  /** Set the grace period and adjust the sleep period accordingly. */
-  void setGraceSleepPeriod(final long gracePeriod) {
-    if (gracePeriod < 100L) {
-      throw new IllegalArgumentException(gracePeriod
-          + " = gracePeriod < 100ms is too small.");
-    }
-    synchronized(this) {
-      this.gracePeriod = gracePeriod;
-    }
-    final long half = gracePeriod/2;
-    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
-        half: LEASE_RENEWER_SLEEP_DEFAULT;
-  }
-
-  /** Is the daemon running? */
-  synchronized boolean isRunning() {
-    return daemon != null && daemon.isAlive();
-  }
-
-  /** Is the empty period longer than the grace period? */  
-  private synchronized boolean isRenewerExpired() {
-    return emptyTime != Long.MAX_VALUE
-        && System.currentTimeMillis() - emptyTime > gracePeriod;
-  }
-
-  synchronized void put(String src, OutputStream out) {
-    if (dfsclient.clientRunning) {
-      if (daemon == null || isRenewerExpired()) {
-        //start a new deamon with a new id.
-        final int id = ++currentId;
-        daemon = new Daemon(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              LeaseRenewer.this.run(id);
-            } catch(InterruptedException e) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
-                    + " is interrupted.", e);
-              }
-            }
-          }
-        });
-        daemon.start();
-      }
-      pendingCreates.put(src, out);
-      emptyTime = Long.MAX_VALUE;
-    }
-  }
-  
-  synchronized void remove(String src) {
-    pendingCreates.remove(src);
-    if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
-      //discover the first time that the map is empty.
-      emptyTime = System.currentTimeMillis();
-    }
-  }
-  
-  void interruptAndJoin() throws InterruptedException {
-    Daemon daemonCopy = null;
-    synchronized (this) {
-      if (isRunning()) {
-        daemon.interrupt();
-        daemonCopy = daemon;
-      }
-    }
-   
-    if (daemonCopy != null) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Wait for lease checker to terminate");
-      }
-      daemonCopy.join();
-    }
-  }
-
-  void close() {
-    while (true) {
-      String src;
-      OutputStream out;
-      synchronized (this) {
-        if (pendingCreates.isEmpty()) {
-          return;
-        }
-        src = pendingCreates.firstKey();
-        out = pendingCreates.remove(src);
-      }
-      if (out != null) {
-        try {
-          out.close();
-        } catch (IOException ie) {
-          LOG.error("Exception closing file " + src+ " : " + ie, ie);
-        }
-      }
-    }
-  }
-
-  /**
-   * Abort all open files. Release resources held. Ignore all errors.
-   */
-  synchronized void abort() {
-    dfsclient.clientRunning = false;
-    while (!pendingCreates.isEmpty()) {
-      String src = pendingCreates.firstKey();
-      DFSClient.DFSOutputStream out = (DFSClient.DFSOutputStream) pendingCreates
-          .remove(src);
-      if (out != null) {
-          // TODO:
-//        try {
-//          out.abort();
-//          
-//        } catch (IOException ie) {
-//          LOG.error("Exception aborting file " + src+ ": ", ie);
-//        }
-      }
-    }
-    RPC.stopProxy(dfsclient.namenode); // close connections to the namenode
-  }
-
-  private void renew() throws IOException {
-    synchronized(this) {
-      if (pendingCreates.isEmpty()) {
-        return;
-      }
-    }
-    dfsclient.namenode.renewLease(dfsclient.clientName);
-  }
-
-  /**
-   * Periodically check in with the namenode and renew all the leases
-   * when the lease period is half over.
-   */
-  private void run(final int id) throws InterruptedException {
-    for(long lastRenewed = System.currentTimeMillis();
-        dfsclient.clientRunning && !Thread.interrupted();
-        Thread.sleep(sleepPeriod)) {
-      if (System.currentTimeMillis() - lastRenewed >= renewal) {
-        try {
-          renew();
-          lastRenewed = System.currentTimeMillis();
-        } catch (SocketTimeoutException ie) {
-          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
-              + (renewal/1000) + " seconds.  Aborting ...", ie);
-          abort();
-          break;
-        } catch (IOException ie) {
-          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
-              + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
-        }
-      }
-
-      synchronized(this) {
-        if (id != currentId || isRenewerExpired()) {
-          //no longer the current daemon or expired
-          return;
-        }
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  public String toString() {
-    String s = getClass().getSimpleName();
-    if (LOG.isTraceEnabled()) {
-      return s + "@" + dfsclient + ": "
-             + StringUtils.stringifyException(new Throwable("for testing"));
-    }
-    return s;
-  }
-}

+ 1 - 1
src/test/org/apache/hadoop/hdfs/AppendTestUtil.java

@@ -165,7 +165,7 @@ public class AppendTestUtil {
     LOG.info("leasechecker.interruptAndJoin()");
     // lose the lease on the client
     DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
-    dfs.dfs.leaserenewer.interruptAndJoin();
+    dfs.dfs.leasechecker.interruptAndJoin();
   }
   
   public static void recoverFile(MiniDFSCluster cluster, FileSystem fs,

+ 8 - 8
src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -111,31 +111,31 @@ public class TestDistributedFileSystem {
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseRenewerStarted());
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
   
         //create a file
         FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseRenewerStarted());
+        assertTrue(dfs.dfs.isLeaseCheckerStarted());
   
         //write something and close
         out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseRenewerStarted());
+        assertTrue(dfs.dfs.isLeaseCheckerStarted());
         out.close();
-        assertTrue(dfs.dfs.isLeaseRenewerStarted());
+        assertTrue(dfs.dfs.isLeaseCheckerStarted());
         dfs.close();
       }
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseRenewerStarted());
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
 
         //open and check the file
         FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseRenewerStarted());
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseRenewerStarted());
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
         in.close();
-        assertFalse(dfs.dfs.isLeaseRenewerStarted());
+        assertFalse(dfs.dfs.isLeaseCheckerStarted());
         dfs.close();
       }
     }

+ 2 - 2
src/test/org/apache/hadoop/hdfs/TestFileAppend4.java

@@ -656,7 +656,7 @@ public class TestFileAppend4 extends TestCase {
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leaserenewer.interruptAndJoin();
+      client.leasechecker.interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
@@ -726,7 +726,7 @@ public class TestFileAppend4 extends TestCase {
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leaserenewer.interruptAndJoin();
+      client.leasechecker.interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(

+ 1 - 1
src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

@@ -157,7 +157,7 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
     stm.sync();
     if (triggerSoftLease) {
       AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leaserenewer.interruptAndJoin();
+      dfs.dfs.leasechecker.interruptAndJoin();
     }
     return filepath;
   }