Browse Source

HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee

Eric Payne 9 years ago
parent
commit
aede8c10ec

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -121,6 +121,13 @@ public interface ClientDatanodeProtocol {
    */
   void shutdownDatanode(boolean forUpgrade) throws IOException;
 
+  /**
+   * Evict clients that are writing to a datanode.
+   *
+   * @throws IOException
+   */
+  void evictWriters() throws IOException;
+
   /**
    * Obtains datanode info
    *

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@@ -97,6 +98,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
   private static final GetBalancerBandwidthRequestProto
       VOID_GET_BALANCER_BANDWIDTH =
       GetBalancerBandwidthRequestProto.newBuilder().build();
+  private final static EvictWritersRequestProto VOID_EVICT_WRITERS =
+      EvictWritersRequestProto.newBuilder().build();
 
   public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
       Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@@ -243,6 +246,15 @@ public class ClientDatanodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public void evictWriters() throws IOException {
+    try {
+      rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
   @Override
   public DatanodeLocalInfo getDatanodeInfo() throws IOException {
     GetDatanodeInfoResponseProto response;

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto

@@ -114,6 +114,13 @@ message ShutdownDatanodeRequestProto {
 message ShutdownDatanodeResponseProto {
 }
 
+/** Tell datanode to evict active clients that are writing */
+message EvictWritersRequestProto {
+}
+
+message EvictWritersResponseProto {
+}
+
 /**
  * Ping datanode for liveness and quick info
  */
@@ -176,6 +183,9 @@ service ClientDatanodeProtocolService {
   rpc shutdownDatanode(ShutdownDatanodeRequestProto)
       returns(ShutdownDatanodeResponseProto);
 
+  rpc evictWriters(EvictWritersRequestProto)
+      returns(EvictWritersResponseProto);
+
   rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
       returns(GetDatanodeInfoResponseProto);
 

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@@ -67,6 +69,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       StartReconfigurationResponseProto.newBuilder().build();
   private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP =
       TriggerBlockReportResponseProto.newBuilder().build();
+  private final static EvictWritersResponseProto EVICT_WRITERS_RESP =
+      EvictWritersResponseProto.newBuilder().build();
   
   private final ClientDatanodeProtocol impl;
 
@@ -142,6 +146,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
     return SHUTDOWN_DATANODE_RESP;
   }
 
+  @Override
+  public EvictWritersResponseProto evictWriters(RpcController unused,
+      EvictWritersRequestProto request) throws ServiceException {
+    try {
+      impl.evictWriters();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return EVICT_WRITERS_RESP;
+  }
+
   public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused,
       GetDatanodeInfoRequestProto request) throws ServiceException {
     GetDatanodeInfoResponseProto res;

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -889,6 +889,9 @@ class BlockReceiver implements Closeable {
   }
   
   public void sendOOB() throws IOException, InterruptedException {
+    if (isDatanode) {
+      return;
+    }
     ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
         .getRestartOOBStatus());
   }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -2973,6 +2973,13 @@ public class DataNode extends ReconfigurableBase
     shutdownThread.start();
   }
 
+  @Override //ClientDatanodeProtocol
+  public void evictWriters() throws IOException {
+    checkSuperuserPrivilege();
+    LOG.info("Evicting all writers.");
+    xserver.stopWriters();
+  }
+
   @Override //ClientDatanodeProtocol
   public DatanodeLocalInfo getDatanodeInfo() {
     long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000;

+ 40 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -116,6 +116,7 @@ class DataXceiver extends Receiver implements Runnable {
   private BlockReceiver blockReceiver = null;
   private final int ioFileBufferSize;
   private final int smallBufferSize;
+  private Thread xceiver = null;
 
   /**
    * Client Name used in previous operation. Not available on first request
@@ -178,9 +179,38 @@ class DataXceiver extends Receiver implements Runnable {
   }
 
   public void sendOOB() throws IOException, InterruptedException {
+    BlockReceiver br = getCurrentBlockReceiver();
+    if (br == null) {
+      return;
+    }
+    // This doesn't need to be in a critical section. Althogh the client
+    // can resue the connection to issue a different request, trying sending
+    // an OOB through the recently closed block receiver is harmless.
     LOG.info("Sending OOB to peer: " + peer);
-    if(blockReceiver!=null)
-      blockReceiver.sendOOB();
+    br.sendOOB();
+  }
+
+  public void stopWriter() {
+    // We want to interrupt the xceiver only when it is serving writes.
+    synchronized(this) {
+      if (getCurrentBlockReceiver() == null) {
+        return;
+      }
+      xceiver.interrupt();
+    }
+    LOG.info("Stopped the writer: " + peer);
+  }
+
+  /**
+   * blockReceiver is updated at multiple places. Use the synchronized setter
+   * and getter.
+   */
+  private synchronized void setCurrentBlockReceiver(BlockReceiver br) {
+    blockReceiver = br;
+  }
+
+  private synchronized BlockReceiver getCurrentBlockReceiver() {
+    return blockReceiver;
   }
   
   /**
@@ -192,6 +222,9 @@ class DataXceiver extends Receiver implements Runnable {
     Op op = null;
 
     try {
+      synchronized(this) {
+        xceiver = Thread.currentThread();
+      }
       dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
@@ -679,12 +712,12 @@ class DataXceiver extends Receiver implements Runnable {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
-        blockReceiver = getBlockReceiver(block, storageType, in,
+        setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist, pinning);
+            cachingStrategy, allowLazyPersist, pinning));
         replica = blockReceiver.getReplica();
       } else {
         replica = datanode.data.recoverClose(
@@ -853,7 +886,7 @@ class DataXceiver extends Receiver implements Runnable {
       IOUtils.closeStream(replyOut);
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
-      blockReceiver = null;
+      setCurrentBlockReceiver(null);
     }
 
     //update metrics
@@ -1060,7 +1093,6 @@ class DataXceiver extends Receiver implements Runnable {
     DataOutputStream proxyOut = null;
     Status opStatus = SUCCESS;
     String errMsg = null;
-    BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     boolean IoeDuringCopyBlockOperation = false;
     try {
@@ -1119,11 +1151,11 @@ class DataXceiver extends Receiver implements Runnable {
         DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
             checksumInfo.getChecksum());
         // open a block receiver and check if the block does not exist
-        blockReceiver = getBlockReceiver(block, storageType,
+        setCurrentBlockReceiver(getBlockReceiver(block, storageType,
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false, false);
+            CachingStrategy.newDropBehind(), false, false));
         
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -256,6 +256,12 @@ class DataXceiverServer implements Runnable {
       }
     }
   }
+
+  public synchronized void stopWriters() {
+    for (Peer p : peers.keySet()) {
+      peersXceiver.get(p).stopWriter();
+    }
+  }
   
   // Notify all peers of the shutdown and restart.
   // datanode.shouldRun should still be true and datanode.restarting should

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -1090,6 +1090,10 @@ public class DFSAdmin extends FsShell {
         + "\tclients will timeout and ignore the datanode. In such case, the\n"
         + "\tfast start-up mode will also be disabled.\n";
 
+    String evictWriters = "-evictWriters <datanode_host:ipc_port>\n"
+        + "\tMake the datanode evict all clients that are writing a block.\n"
+        + "\tThis is useful if decommissioning is hung due to slow writers.\n";
+
     String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n"
         + "\tGet the information about the given datanode. This command can\n"
         + "\tbe used for checking if a datanode is alive.\n";
@@ -1159,6 +1163,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(disallowSnapshot);
     } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) {
       System.out.println(shutdownDatanode);
+    } else if ("evictWriters".equalsIgnoreCase(cmd)) {
+      System.out.println(evictWriters);
     } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
       System.out.println(getDatanodeInfo);
     } else if ("help".equals(cmd)) {
@@ -1193,6 +1199,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(allowSnapshot);
       System.out.println(disallowSnapshot);
       System.out.println(shutdownDatanode);
+      System.out.println(evictWriters);
       System.out.println(getDatanodeInfo);
       System.out.println(triggerBlockReport);
       System.out.println(help);
@@ -2047,6 +2054,8 @@ public class DFSAdmin extends FsShell {
         exitCode = fetchImage(argv, i);
       } else if ("-shutdownDatanode".equals(cmd)) {
         exitCode = shutdownDatanode(argv, i);
+      } else if ("-evictWriters".equals(cmd)) {
+        exitCode = evictWriters(argv, i);
       } else if ("-getDatanodeInfo".equals(cmd)) {
         exitCode = getDatanodeInfo(argv, i);
       } else if ("-reconfig".equals(cmd)) {
@@ -2171,6 +2180,18 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  private int evictWriters(String[] argv, int i) throws IOException {
+    final String dn = argv[i];
+    ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn);
+    try {
+      dnProxy.evictWriters();
+      System.out.println("Requested writer eviction to datanode " + dn);
+    } catch (IOException ioe) {
+      return -1;
+    }
+    return 0;
+  }
+
   private int getDatanodeInfo(String[] argv, int i) throws IOException {
     ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
     try {

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -271,6 +271,55 @@ public class TestClientProtocolForPipelineRecovery {
     }
   }
 
+  /**
+   * Test that the writer is kicked out of a node.
+   */
+  @Test
+  public void testEvictWriter() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes((int)3)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      Path file = new Path("testEvictWriter.dat");
+      FSDataOutputStream out = fs.create(file, (short)2);
+      out.write(0x31);
+      out.hflush();
+
+      // get nodes in the pipeline
+      DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
+      DatanodeInfo[] nodes = dfsOut.getPipeline();
+      Assert.assertEquals(2, nodes.length);
+      String dnAddr = nodes[1].getIpcAddr(false);
+
+      // evict the writer from the second datanode and wait until
+      // the pipeline is rebuilt.
+      DFSAdmin dfsadmin = new DFSAdmin(conf);
+      final String[] args1 = {"-evictWriters", dnAddr };
+      Assert.assertEquals(0, dfsadmin.run(args1));
+      out.write(0x31);
+      out.hflush();
+
+      // get the new pipline and check the node is not in there.
+      nodes = dfsOut.getPipeline();
+      try {
+        Assert.assertTrue(nodes.length > 0 );
+        for (int i = 0; i < nodes.length; i++) {
+          Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false)));
+        }
+      } finally {
+        out.close();
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /** Test restart timeout */
   @Test
   public void testPipelineRecoveryOnRestartFailure() throws Exception {