Browse Source

svn merge -c 1337423 from branch-2 for HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo and epoch in JournalProtocol.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.0.0-alpha@1338328 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
1e17973e44

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -295,6 +295,9 @@ Release 2.0.0 - UNRELEASED
     so that INodeFile and INodeFileUnderConstruction do not have to be used in
     so that INodeFile and INodeFileUnderConstruction do not have to be used in
     block management.  (John George via szetszwo)
     block management.  (John George via szetszwo)
 
 
+    HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo
+    and epoch in JournalProtocol. (suresh via szetszwo)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-2477. Optimize computing the diff between a block report and the
     HDFS-2477. Optimize computing the diff between a block report and the

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 
 
 /**
 /**
@@ -33,6 +34,10 @@ import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 public class UnregisteredNodeException extends IOException {
 public class UnregisteredNodeException extends IOException {
   private static final long serialVersionUID = -5620209396945970810L;
   private static final long serialVersionUID = -5620209396945970810L;
 
 
+  public UnregisteredNodeException(JournalInfo info) {
+    super("Unregistered server: " + info.toString());
+  }
+  
   public UnregisteredNodeException(NodeRegistration nodeReg) {
   public UnregisteredNodeException(NodeRegistration nodeReg) {
     super("Unregistered server: " + nodeReg.toString());
     super("Unregistered server: " + nodeReg.toString());
   }
   }

+ 20 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java

@@ -20,10 +20,13 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.RpcController;
@@ -48,9 +51,8 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
   public JournalResponseProto journal(RpcController unused,
   public JournalResponseProto journal(RpcController unused,
       JournalRequestProto req) throws ServiceException {
       JournalRequestProto req) throws ServiceException {
     try {
     try {
-      impl.journal(PBHelper.convert(req.getJournalInfo()),
-          req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
-              .toByteArray());
+      impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
+          req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray());
     } catch (IOException e) {
     } catch (IOException e) {
       throw new ServiceException(e);
       throw new ServiceException(e);
     }
     }
@@ -63,10 +65,24 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
       StartLogSegmentRequestProto req) throws ServiceException {
       StartLogSegmentRequestProto req) throws ServiceException {
     try {
     try {
       impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
       impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
-          req.getTxid());
+          req.getEpoch(), req.getTxid());
     } catch (IOException e) {
     } catch (IOException e) {
       throw new ServiceException(e);
       throw new ServiceException(e);
     }
     }
     return StartLogSegmentResponseProto.newBuilder().build();
     return StartLogSegmentResponseProto.newBuilder().build();
   }
   }
+
+  @Override
+  public FenceResponseProto fence(RpcController controller,
+      FenceRequestProto req) throws ServiceException {
+    try {
+      FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
+          req.getFencerInfo());
+      return FenceResponseProto.newBuilder().setInSync(resp.isInSync())
+          .setLastTransactionId(resp.getLastTransactionId())
+          .setPreviousEpoch(resp.getPreviousEpoch()).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
 }

+ 24 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java

@@ -22,10 +22,13 @@ import java.io.IOException;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
@@ -58,10 +61,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
   }
   }
 
 
   @Override
   @Override
-  public void journal(NamenodeRegistration reg, long firstTxnId,
+  public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
       int numTxns, byte[] records) throws IOException {
       int numTxns, byte[] records) throws IOException {
     JournalRequestProto req = JournalRequestProto.newBuilder()
     JournalRequestProto req = JournalRequestProto.newBuilder()
-        .setJournalInfo(PBHelper.convertToJournalInfo(reg))
+        .setJournalInfo(PBHelper.convert(journalInfo))
+        .setEpoch(epoch)
         .setFirstTxnId(firstTxnId)
         .setFirstTxnId(firstTxnId)
         .setNumTxns(numTxns)
         .setNumTxns(numTxns)
         .setRecords(PBHelper.getByteString(records))
         .setRecords(PBHelper.getByteString(records))
@@ -74,10 +78,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
   }
   }
 
 
   @Override
   @Override
-  public void startLogSegment(NamenodeRegistration registration, long txid)
+  public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
       throws IOException {
       throws IOException {
     StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
     StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
-        .setJournalInfo(PBHelper.convertToJournalInfo(registration))
+        .setJournalInfo(PBHelper.convert(journalInfo))
+        .setEpoch(epoch)
         .setTxid(txid)
         .setTxid(txid)
         .build();
         .build();
     try {
     try {
@@ -86,6 +91,20 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
       throw ProtobufHelper.getRemoteException(e);
       throw ProtobufHelper.getRemoteException(e);
     }
     }
   }
   }
+  
+  @Override
+  public FenceResponse fence(JournalInfo journalInfo, long epoch,
+      String fencerInfo) throws IOException {
+    FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
+        .setJournalInfo(PBHelper.convert(journalInfo)).build();
+    try {
+      FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
+      return new FenceResponse(resp.getPreviousEpoch(),
+          resp.getLastTransactionId(), resp.getInSync());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
 
   @Override
   @Override
   public boolean isMethodSupported(String methodName) throws IOException {
   public boolean isMethodSupported(String methodName) throws IOException {

+ 8 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
@@ -1349,25 +1350,19 @@ public class PBHelper {
         .setStorageID(r.getStorageID()).build();
         .setStorageID(r.getStorageID()).build();
   }
   }
 
 
-  public static NamenodeRegistration convert(JournalInfoProto info) {
+  public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
-    StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0);
-    
-    // Note that the role is always {@link NamenodeRole#NAMENODE} as this
-    // conversion happens for messages from Namenode to Journal receivers.
-    // Addresses in the registration are unused.
-    return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE);
+    return new JournalInfo(lv, info.getClusterID(), nsID);
   }
   }
 
 
   /**
   /**
    * Method used for converting {@link JournalInfoProto} sent from Namenode
    * Method used for converting {@link JournalInfoProto} sent from Namenode
    * to Journal receivers to {@link NamenodeRegistration}.
    * to Journal receivers to {@link NamenodeRegistration}.
    */
    */
-  public static JournalInfoProto convertToJournalInfo(
-      NamenodeRegistration reg) {
-    return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID())
-        .setLayoutVersion(reg.getLayoutVersion())
-        .setNamespaceID(reg.getNamespaceID()).build();
+  public static JournalInfoProto convert(JournalInfo j) {
+    return JournalInfoProto.newBuilder().setClusterID(j.getClusterId())
+        .setLayoutVersion(j.getLayoutVersion())
+        .setNamespaceID(j.getNamespaceId()).build();
   }
   }
 }
 }

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 
 
 /**
 /**
@@ -26,19 +27,20 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
  * to a BackupNode.
  * to a BackupNode.
  */
  */
 class BackupJournalManager implements JournalManager {
 class BackupJournalManager implements JournalManager {
-
-  private final NamenodeRegistration nnReg;
   private final NamenodeRegistration bnReg;
   private final NamenodeRegistration bnReg;
+  private final JournalInfo journalInfo;
   
   
   BackupJournalManager(NamenodeRegistration bnReg,
   BackupJournalManager(NamenodeRegistration bnReg,
       NamenodeRegistration nnReg) {
       NamenodeRegistration nnReg) {
+    journalInfo = new JournalInfo(nnReg.getLayoutVersion(),
+        nnReg.getClusterID(), nnReg.getNamespaceID());
     this.bnReg = bnReg;
     this.bnReg = bnReg;
-    this.nnReg = nnReg;
   }
   }
 
 
   @Override
   @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
-    EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
+    EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
+        journalInfo);
     stm.startLogSegment(txId);
     stm.startLogSegment(txId);
     return stm;
     return stm;
   }
   }

+ 27 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -207,7 +209,8 @@ public class BackupNode extends NameNode {
   }
   }
   
   
   /* @Override */// NameNode
   /* @Override */// NameNode
-  public boolean setSafeMode(SafeModeAction action) throws IOException {
+  public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action)
+      throws IOException {
     throw new UnsupportedActionException("setSafeMode");
     throw new UnsupportedActionException("setSafeMode");
   }
   }
   
   
@@ -226,51 +229,56 @@ public class BackupNode extends NameNode {
     
     
     /** 
     /** 
      * Verifies a journal request
      * Verifies a journal request
-     * @param nodeReg node registration
-     * @throws UnregisteredNodeException if the registration is invalid
      */
      */
-    void verifyJournalRequest(NamenodeRegistration reg) throws IOException {
-      verifyLayoutVersion(reg.getLayoutVersion());
+    private void verifyJournalRequest(JournalInfo journalInfo)
+        throws IOException {
+      verifyLayoutVersion(journalInfo.getLayoutVersion());
       String errorMsg = null;
       String errorMsg = null;
       int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
       int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
-      if (reg.getNamespaceID() != expectedNamespaceID) {
+      if (journalInfo.getNamespaceId() != expectedNamespaceID) {
         errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
         errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
-            + " actual " + reg.getNamespaceID();
+            + " actual " + journalInfo.getNamespaceId();
         LOG.warn(errorMsg);
         LOG.warn(errorMsg);
-        throw new UnregisteredNodeException(reg);
+        throw new UnregisteredNodeException(journalInfo);
       } 
       } 
-      if (!reg.getClusterID().equals(namesystem.getClusterId())) {
+      if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) {
         errorMsg = "Invalid clusterId in journal request - expected "
         errorMsg = "Invalid clusterId in journal request - expected "
-            + reg.getClusterID() + " actual " + namesystem.getClusterId();
+            + journalInfo.getClusterId() + " actual " + namesystem.getClusterId();
         LOG.warn(errorMsg);
         LOG.warn(errorMsg);
-        throw new UnregisteredNodeException(reg);
+        throw new UnregisteredNodeException(journalInfo);
       }
       }
     }
     }
 
 
-
     /////////////////////////////////////////////////////
     /////////////////////////////////////////////////////
     // BackupNodeProtocol implementation for backup node.
     // BackupNodeProtocol implementation for backup node.
     /////////////////////////////////////////////////////
     /////////////////////////////////////////////////////
     @Override
     @Override
-    public void startLogSegment(NamenodeRegistration registration, long txid)
-        throws IOException {
+    public void startLogSegment(JournalInfo journalInfo, long epoch,
+        long txid) throws IOException {
       namesystem.checkOperation(OperationCategory.JOURNAL);
       namesystem.checkOperation(OperationCategory.JOURNAL);
-      verifyJournalRequest(registration);
+      verifyJournalRequest(journalInfo);
       getBNImage().namenodeStartedLogSegment(txid);
       getBNImage().namenodeStartedLogSegment(txid);
     }
     }
     
     
     @Override
     @Override
-    public void journal(NamenodeRegistration nnReg,
-        long firstTxId, int numTxns,
-        byte[] records) throws IOException {
+    public void journal(JournalInfo journalInfo, long epoch, long firstTxId,
+        int numTxns, byte[] records) throws IOException {
       namesystem.checkOperation(OperationCategory.JOURNAL);
       namesystem.checkOperation(OperationCategory.JOURNAL);
-      verifyJournalRequest(nnReg);
+      verifyJournalRequest(journalInfo);
       getBNImage().journal(firstTxId, numTxns, records);
       getBNImage().journal(firstTxId, numTxns, records);
     }
     }
 
 
     private BackupImage getBNImage() {
     private BackupImage getBNImage() {
       return (BackupImage)nn.getFSImage();
       return (BackupImage)nn.getFSImage();
     }
     }
+
+    @Override
+    public FenceResponse fence(JournalInfo journalInfo, long epoch,
+        String fencerInfo) throws IOException {
+      LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
+      throw new UnsupportedOperationException(
+          "BackupNode does not support fence");
+    }
   }
   }
   
   
   //////////////////////////////////////////////////////
   //////////////////////////////////////////////////////

+ 9 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java

@@ -24,6 +24,7 @@ import java.util.Arrays;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -42,18 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation;
 class EditLogBackupOutputStream extends EditLogOutputStream {
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
   static int DEFAULT_BUFFER_SIZE = 256;
 
 
-  private JournalProtocol backupNode;  // RPC proxy to backup node
-  private NamenodeRegistration bnRegistration;  // backup node registration
-  private NamenodeRegistration nnRegistration;  // active node registration
+  private final JournalProtocol backupNode;  // RPC proxy to backup node
+  private final NamenodeRegistration bnRegistration;  // backup node registration
+  private final JournalInfo journalInfo;  // active node registration
+  private final DataOutputBuffer out;     // serialized output sent to backup node
   private EditsDoubleBuffer doubleBuf;
   private EditsDoubleBuffer doubleBuf;
-  private DataOutputBuffer out;     // serialized output sent to backup node
 
 
   EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
   EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
-                            NamenodeRegistration nnReg) // active name-node
+                            JournalInfo journalInfo) // active name-node
   throws IOException {
   throws IOException {
     super();
     super();
     this.bnRegistration = bnReg;
     this.bnRegistration = bnReg;
-    this.nnRegistration = nnReg;
+    this.journalInfo = journalInfo;
     InetSocketAddress bnAddress =
     InetSocketAddress bnAddress =
       NetUtils.createSocketAddr(bnRegistration.getAddress());
       NetUtils.createSocketAddr(bnRegistration.getAddress());
     try {
     try {
@@ -127,8 +128,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
       out.reset();
       out.reset();
       assert out.getLength() == 0 : "Output buffer is not empty";
       assert out.getLength() == 0 : "Output buffer is not empty";
 
 
-      backupNode.journal(nnRegistration,
-          firstTxToFlush, numReadyTxns, data);
+      backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data);
     }
     }
   }
   }
 
 
@@ -140,6 +140,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
   }
   }
 
 
   void startLogSegment(long txId) throws IOException {
   void startLogSegment(long txId) throws IOException {
-    backupNode.startLogSegment(nnRegistration, txId);
+    backupNode.startLogSegment(journalInfo, 0, txId);
   }
   }
 }
 }

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Response to a journal fence request. See {@link JournalProtocol#fence}
+ */
+@InterfaceAudience.Private
+public class FenceResponse {
+  private final long previousEpoch;
+  private final long lastTransactionId;
+  private final boolean isInSync;
+  
+  public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) {
+    this.previousEpoch = previousEpoch;
+    this.lastTransactionId = lastTransId;
+    this.isInSync = inSync;
+  }
+
+  public boolean isInSync() {
+    return isInSync;
+  }
+
+  public long getLastTransactionId() {
+    return lastTransactionId;
+  }
+
+  public long getPreviousEpoch() {
+    return previousEpoch;
+  }
+}

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+/**
+ * If a previous user of a resource tries to use a shared resource, after
+ * fenced by another user, this exception is thrown.
+ */
+public class FencedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  
+  public FencedException(String errorMsg) {
+    super(errorMsg);
+  }
+}

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Information that describes a journal
+ */
+@InterfaceAudience.Private
+public class JournalInfo {
+  private final int layoutVersion;
+  private final String clusterId;
+  private final int namespaceId;
+
+  public JournalInfo(int lv, String clusterId, int nsId) {
+    this.layoutVersion = lv;
+    this.clusterId = clusterId;
+    this.namespaceId = nsId;
+  }
+
+  public int getLayoutVersion() {
+    return layoutVersion;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public int getNamespaceId() {
+    return namespaceId;
+  }
+}

+ 22 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java

@@ -21,7 +21,6 @@ import java.io.IOException;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
@@ -53,12 +52,15 @@ public interface JournalProtocol {
    * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
    * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
    * changes with the backup namespace image.
    * changes with the backup namespace image.
    * 
    * 
-   * @param registration active node registration
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
    * @param firstTxnId the first transaction of this batch
    * @param firstTxnId the first transaction of this batch
    * @param numTxns number of transactions
    * @param numTxns number of transactions
    * @param records byte array containing serialized journal records
    * @param records byte array containing serialized journal records
+   * @throws FencedException if the resource has been fenced
    */
    */
-  public void journal(NamenodeRegistration registration,
+  public void journal(JournalInfo journalInfo,
+                      long epoch,
                       long firstTxnId,
                       long firstTxnId,
                       int numTxns,
                       int numTxns,
                       byte[] records) throws IOException;
                       byte[] records) throws IOException;
@@ -66,9 +68,24 @@ public interface JournalProtocol {
   /**
   /**
    * Notify the BackupNode that the NameNode has rolled its edit logs
    * Notify the BackupNode that the NameNode has rolled its edit logs
    * and is now writing a new log segment.
    * and is now writing a new log segment.
-   * @param registration the registration of the active NameNode
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
    * @param txid the first txid in the new log
    * @param txid the first txid in the new log
+   * @throws FencedException if the resource has been fenced
    */
    */
-  public void startLogSegment(NamenodeRegistration registration,
+  public void startLogSegment(JournalInfo journalInfo, long epoch,
       long txid) throws IOException;
       long txid) throws IOException;
+  
+  /**
+   * Request to fence any other journal writers.
+   * Older writers with at previous epoch will be fenced and can no longer
+   * perform journal operations.
+   * 
+   * @param journalInfo journal information
+   * @param epoch marks beginning a new journal writer
+   * @param fencerInfo info about fencer for debugging purposes
+   * @throws FencedException if the resource has been fenced
+   */
+  public FenceResponse fence(JournalInfo journalInfo, long epoch,
+      String fencerInfo) throws IOException;
 }
 }

+ 34 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto

@@ -36,16 +36,18 @@ message JournalInfoProto {
 }
 }
 
 
 /**
 /**
- * JournalInfo - the information about the journal
+ * journalInfo - the information about the journal
  * firstTxnId - the first txid in the journal records
  * firstTxnId - the first txid in the journal records
  * numTxns - Number of transactions in editlog
  * numTxns - Number of transactions in editlog
  * records - bytes containing serialized journal records
  * records - bytes containing serialized journal records
+ * epoch - change to this represents change of journal writer
  */
  */
 message JournalRequestProto {
 message JournalRequestProto {
   required JournalInfoProto journalInfo = 1;
   required JournalInfoProto journalInfo = 1;
   required uint64 firstTxnId = 2;
   required uint64 firstTxnId = 2;
   required uint32 numTxns = 3;
   required uint32 numTxns = 3;
   required bytes records = 4;
   required bytes records = 4;
+  required uint64 epoch = 5;
 }
 }
 
 
 /**
 /**
@@ -55,12 +57,13 @@ message JournalResponseProto {
 }
 }
 
 
 /**
 /**
- * JournalInfo - the information about the journal
+ * journalInfo - the information about the journal
  * txid - first txid in the new log
  * txid - first txid in the new log
  */
  */
 message StartLogSegmentRequestProto {
 message StartLogSegmentRequestProto {
-  required JournalInfoProto journalInfo = 1;
-  required uint64 txid = 2;
+  required JournalInfoProto journalInfo = 1; // Info about the journal
+  required uint64 txid = 2; // Transaction ID
+  required uint64 epoch = 3;
 }
 }
 
 
 /**
 /**
@@ -69,6 +72,27 @@ message StartLogSegmentRequestProto {
 message StartLogSegmentResponseProto { 
 message StartLogSegmentResponseProto { 
 }
 }
 
 
+/**
+ * journalInfo - the information about the journal
+ * txid - first txid in the new log
+ */
+message FenceRequestProto {
+  required JournalInfoProto journalInfo = 1; // Info about the journal
+  required uint64 epoch = 2; // Epoch - change indicates change in writer
+  optional string fencerInfo = 3; // Info about fencer for debugging
+}
+
+/**
+ * previousEpoch - previous epoch if any or zero
+ * lastTransactionId - last valid transaction Id in the journal
+ * inSync - if all journal segments are available and in sync
+ */
+message FenceResponseProto {
+  optional uint64 previousEpoch = 1;
+  optional uint64 lastTransactionId = 2;
+  optional bool inSync = 3;
+}
+
 /**
 /**
  * Protocol used to journal edits to a remote node. Currently,
  * Protocol used to journal edits to a remote node. Currently,
  * this is used to publish edits from the NameNode to a BackupNode.
  * this is used to publish edits from the NameNode to a BackupNode.
@@ -89,4 +113,10 @@ service JournalProtocolService {
    */
    */
   rpc startLogSegment(StartLogSegmentRequestProto) 
   rpc startLogSegment(StartLogSegmentRequestProto) 
       returns (StartLogSegmentResponseProto);
       returns (StartLogSegmentResponseProto);
+  
+  /**
+   * Request to fence a journal receiver.
+   */
+  rpc fence(FenceRequestProto)
+      returns (FenceResponseProto);
 }
 }