浏览代码

HDFS-4904. Remove JournalService. Contributed by Arpit Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493235 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 12 年之前
父节点
当前提交
0d1a994fbb

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -120,6 +120,8 @@ Trunk (Unreleased)
     HDFS-4633 TestDFSClientExcludedNodes fails sporadically if excluded nodes
     cache expires too quickly  (Chris Nauroth via Sanjay)
 
+    HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 0 - 64
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java

@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.journalservice;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-
-/**
- * JournalListener is a callback interface to handle journal records
- * received from the namenode.
- */
-public interface JournalListener {
-  /**
-   * Check the namespace information returned by a namenode
-   * @param service service that is making the callback
-   * @param info returned namespace information from the namenode
-   * 
-   * The application using {@link JournalService} can stop the service if
-   * {@code info} validation fails.
-   */
-  public void verifyVersion(JournalService service, NamespaceInfo info);
-  
-  /**
-   * Process the received Journal record
-   * @param service {@link JournalService} making the callback
-   * @param firstTxnId first transaction Id in the journal
-   * @param numTxns number of records
-   * @param records journal records
-   * @throws IOException on error
-   * 
-   * Any IOException thrown from the listener is thrown back in 
-   * {@link JournalProtocol#journal}
-   */
-  public void journal(JournalService service, long firstTxnId, int numTxns,
-      byte[] records) throws IOException;
-  
-  /**
-   * Roll the editlog
-   * @param service {@link JournalService} making the callback
-   * @param txid transaction ID to roll at
-   * 
-   * Any IOException thrown from the listener is thrown back in 
-   * {@link JournalProtocol#startLogSegment}
-   */
-  public void startLogSegment(JournalService service, long txid) throws IOException;
-}

+ 0 - 366
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java

@@ -1,366 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.journalservice;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
-import org.apache.hadoop.hdfs.server.protocol.FencedException;
-import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
-
-/**
- * This class interfaces with the namenode using {@link JournalProtocol} over
- * RPC. It has two modes: <br>
- * <ul>
- * <li>Mode where an RPC.Server is provided from outside, on which it
- * {@link JournalProtocol} is registered. The RPC.Server stop and start is
- * managed outside by the application.</li>
- * <li>Stand alone mode where an RPC.Server is started and managed by the
- * JournalListener.</li>
- * </ul>
- * 
- * The received journal operations are sent to a listener over callbacks. The
- * listener implementation can handle the callbacks based on the application
- * requirement.
- */
-public class JournalService implements JournalProtocol {
-  public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
-
-  private final JournalListener listener;
-  private final InetSocketAddress nnAddress;
-  private final NamenodeRegistration registration;
-  private final NamenodeProtocol namenode;
-  private final StateHandler stateHandler = new StateHandler();
-  private final RPC.Server rpcServer;
-  private long epoch = 0;
-  private String fencerInfo;
-  
-  enum State {
-    /** The service is initialized and ready to start. */
-    INIT(false, false),
-    /**
-     * RPC server is started.
-     * The service is ready to receive requests from namenode.
-     */
-    STARTED(false, false),
-    /** The service is fenced by a namenode and waiting for roll. */
-    WAITING_FOR_ROLL(false, true),
-    /**
-     * The existing log is syncing with another source
-     * and it accepts journal from Namenode.
-     */
-    SYNCING(true, true),
-    /** The existing log is in sync and it accepts journal from Namenode. */
-    IN_SYNC(true, true),
-    /** The service is stopped. */
-    STOPPED(false, false);
-
-    final boolean isJournalAllowed;
-    final boolean isStartLogSegmentAllowed;
-    
-    State(boolean isJournalAllowed, boolean isStartLogSegmentAllowed) {
-      this.isJournalAllowed = isJournalAllowed;
-      this.isStartLogSegmentAllowed = isStartLogSegmentAllowed;
-    }
-  }
-  
-  static class StateHandler {
-    State current = State.INIT;
-    
-    synchronized void start() {
-      if (current != State.INIT) {
-        throw new IllegalStateException("Service cannot be started in "
-            + current + " state.");
-      }
-      current = State.STARTED;
-    }
-
-    synchronized void waitForRoll() {
-      if (current != State.STARTED) {
-        throw new IllegalStateException("Cannot wait-for-roll in " + current
-            + " state.");
-      }
-      current = State.WAITING_FOR_ROLL;
-    }
-
-    synchronized void startLogSegment() {
-      if (current == State.WAITING_FOR_ROLL) {
-        current = State.SYNCING;
-      }
-    }
-
-    synchronized void isStartLogSegmentAllowed() throws IOException {
-      if (!current.isStartLogSegmentAllowed) {
-        throw new IOException("Cannot start log segment in " + current
-            + " state.");
-      }
-    }
-
-    synchronized void isJournalAllowed() throws IOException {
-      if (!current.isJournalAllowed) {
-        throw new IOException("Cannot journal in " + current + " state.");
-      }
-    }
-
-    synchronized boolean isStopped() {
-      if (current == State.STOPPED) {
-        LOG.warn("Ignore stop request since the service is in " + current
-            + " state.");
-        return true;
-      }
-      current = State.STOPPED;
-      return false;
-    }
-  }
-  
-  /**
-   * Constructor to create {@link JournalService} where an RPC server is
-   * created by this service.
-   * @param conf Configuration
-   * @param nnAddr host:port for the active Namenode's RPC server
-   * @param serverAddress address to start RPC server to receive
-   *          {@link JournalProtocol} requests. This can be null, if
-   *          {@code server} is a valid server that is managed out side this
-   *          service.
-   * @param listener call-back interface to listen to journal activities
-   * @throws IOException on error
-   */
-  JournalService(Configuration conf, InetSocketAddress nnAddr,
-      InetSocketAddress serverAddress, JournalListener listener)
-      throws IOException {
-    this.nnAddress = nnAddr;
-    this.listener = listener;
-    this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
-        NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
-        .getProxy();
-    this.rpcServer = createRpcServer(conf, serverAddress, this);
-
-    String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
-    StorageInfo storage = new StorageInfo(
-        LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
-    registration = new NamenodeRegistration(addr, "", storage,
-        NamenodeRole.BACKUP);
-  }
-  
-  /**
-   * Start the service.
-   */
-  public void start() {
-    stateHandler.start();
-
-    // Start the RPC server
-    LOG.info("Starting rpc server");
-    rpcServer.start();
-
-    for(boolean registered = false, handshakeComplete = false; ; ) {
-      try {
-        // Perform handshake
-        if (!handshakeComplete) {
-          handshake();
-          handshakeComplete = true;
-          LOG.info("handshake completed");
-        }
-        
-        // Register with the namenode
-        if (!registered) {
-          registerWithNamenode();
-          registered = true;
-          LOG.info("Registration completed");
-          break;
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Encountered exception ", ioe);
-      } catch (Exception e) {
-        LOG.warn("Encountered exception ", e);
-      }
-      
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ie) {
-        LOG.warn("Encountered exception ", ie);
-      }
-    }
-
-    stateHandler.waitForRoll();
-    try {
-      namenode.rollEditLog();
-    } catch (IOException e) {
-      LOG.warn("Encountered exception ", e);
-    }
-  }
-
-  /**
-   * Stop the service. For application with RPC Server managed outside, the
-   * RPC Server must be stopped the application.
-   */
-  public void stop() {
-    if (!stateHandler.isStopped()) {
-      rpcServer.stop();
-    }
-  }
-
-  @Override
-  public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
-      int numTxns, byte[] records) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Received journal " + firstTxnId + " " + numTxns);
-    }
-    stateHandler.isJournalAllowed();
-    verify(epoch, journalInfo);
-    listener.journal(this, firstTxnId, numTxns, records);
-  }
-
-  @Override
-  public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
-      throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Received startLogSegment " + txid);
-    }
-    stateHandler.isStartLogSegmentAllowed();
-    verify(epoch, journalInfo);
-    listener.startLogSegment(this, txid);
-    stateHandler.startLogSegment();
-  }
-
-  @Override
-  public FenceResponse fence(JournalInfo journalInfo, long epoch,
-      String fencerInfo) throws IOException {
-    LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
-    verifyFence(epoch, fencerInfo);
-    verify(journalInfo);
-    long previousEpoch = epoch;
-    this.epoch = epoch;
-    this.fencerInfo = fencerInfo;
-    
-    // TODO:HDFS-3092 set lastTransId and inSync
-    return new FenceResponse(previousEpoch, 0, false);
-  }
-
-  /** Create an RPC server. */
-  private static RPC.Server createRpcServer(Configuration conf,
-      InetSocketAddress address, JournalProtocol impl) throws IOException {
-    RPC.setProtocolEngine(conf, JournalProtocolPB.class,
-        ProtobufRpcEngine.class);
-    JournalProtocolServerSideTranslatorPB xlator = 
-        new JournalProtocolServerSideTranslatorPB(impl);
-    BlockingService service = 
-        JournalProtocolService.newReflectiveBlockingService(xlator);
-    return new RPC.Builder(conf).setProtocol(JournalProtocolPB.class)
-        .setInstance(service).setBindAddress(address.getHostName())
-        .setPort(address.getPort()).setNumHandlers(1).setVerbose(false).build();
-  }
-  
-  private void verifyEpoch(long e) throws FencedException {
-    if (epoch != e) {
-      String errorMsg = "Epoch " + e + " is not valid. "
-          + "Resource has already been fenced by " + fencerInfo
-          + " with epoch " + epoch;
-      LOG.warn(errorMsg);
-      throw new FencedException(errorMsg);
-    }
-  }
-  
-  private void verifyFence(long e, String fencer) throws FencedException {
-    if (e <= epoch) {
-      String errorMsg = "Epoch " + e + " from fencer " + fencer
-          + " is not valid. " + "Resource has already been fenced by "
-          + fencerInfo + " with epoch " + epoch;
-      LOG.warn(errorMsg);
-      throw new FencedException(errorMsg);
-    }
-  }
-  
-  /** 
-   * Verifies a journal request
-   */
-  private void verify(JournalInfo journalInfo) throws IOException {
-    String errorMsg = null;
-    int expectedNamespaceID = registration.getNamespaceID();
-    if (journalInfo.getNamespaceId() != expectedNamespaceID) {
-      errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
-          + " actual " + journalInfo.getNamespaceId();
-      LOG.warn(errorMsg);
-      throw new UnregisteredNodeException(journalInfo);
-    } 
-    if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
-      errorMsg = "Invalid clusterId in journal request - expected "
-          + journalInfo.getClusterId() + " actual " + registration.getClusterID();
-      LOG.warn(errorMsg);
-      throw new UnregisteredNodeException(journalInfo);
-    }
-  }
-  
-  /** 
-   * Verifies a journal request
-   */
-  private void verify(long e, JournalInfo journalInfo) throws IOException {
-    verifyEpoch(e);
-    verify(journalInfo);
-  }
-  
-  /**
-   * Register this service with the active namenode.
-   */
-  private void registerWithNamenode() throws IOException {
-    NamenodeRegistration nnReg = namenode.register(registration);
-    String msg = null;
-    if(nnReg == null) { // consider as a rejection
-      msg = "Registration rejected by " + nnAddress;
-    } else if(!nnReg.isRole(NamenodeRole.NAMENODE)) {
-      msg = " Name-node " + nnAddress + " is not active";
-    }
-    if(msg != null) {
-      LOG.error(msg);
-      throw new IOException(msg); // stop the node
-    }
-  }
-  
-  private void handshake() throws IOException {
-    NamespaceInfo nsInfo = namenode.versionRequest();
-    listener.verifyVersion(this, nsInfo);
-    registration.setStorageInfo(nsInfo);
-  }
-
-  @VisibleForTesting
-  long getEpoch() {
-    return epoch;
-  }
-}

+ 0 - 127
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java

@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.journalservice;
-
-import static org.junit.Assert.assertNotNull;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemTestHelper;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
-import org.apache.hadoop.hdfs.server.protocol.FencedException;
-import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Tests for {@link JournalService}
- */
-public class TestJournalService {
-  private MiniDFSCluster cluster;
-  private Configuration conf = new HdfsConfiguration();
-  
-  /**
-   * Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and
-   * {@link JournalListener#journal(JournalService, long, int, byte[])} are
-   * called.
-   */
-  @Test
-  public void testCallBacks() throws Exception {
-    JournalListener listener = Mockito.mock(JournalListener.class);
-    JournalService service = null;
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-      cluster.waitActive(0);
-      service = startJournalService(listener);
-      verifyRollLogsCallback(service, listener);
-      verifyJournalCallback(service, listener);
-      verifyFence(service, cluster.getNameNode(0));
-    } finally {
-      if (service != null) {
-        service.stop();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  private JournalService startJournalService(JournalListener listener)
-      throws IOException {
-    InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
-    InetSocketAddress serverAddr = new InetSocketAddress(0);
-    JournalService service = new JournalService(conf, nnAddr, serverAddr,
-        listener);
-    service.start();
-    return service;
-  }
-  
-  /**
-   * Starting {@link JournalService} should result in Namenode calling
-   * {@link JournalService#startLogSegment}, resulting in callback 
-   * {@link JournalListener#rollLogs}
-   */
-  private void verifyRollLogsCallback(JournalService s, JournalListener l)
-      throws IOException {
-    Mockito.verify(l, Mockito.times(1)).startLogSegment(Mockito.eq(s), Mockito.anyLong());
-  }
-
-  /**
-   * File system write operations should result in JournalListener call
-   * backs.
-   */
-  private void verifyJournalCallback(JournalService s, JournalListener l) throws IOException {
-    Path fileName = new Path("/tmp/verifyJournalCallback");
-    FileSystem fs = cluster.getFileSystem();
-    FileSystemTestHelper.createFile(fs, fileName);
-    fs.delete(fileName, true);
-    Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
-        Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
-  }
-  
-  public void verifyFence(JournalService s, NameNode nn) throws Exception {
-    String cid = nn.getNamesystem().getClusterId();
-    int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
-    int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
-    
-    // Fence the journal service
-    JournalInfo info = new JournalInfo(lv, cid, nsId);
-    long currentEpoch = s.getEpoch();
-    
-    // New epoch lower than the current epoch is rejected
-    try {
-      s.fence(info, (currentEpoch - 1), "fencer");
-    } catch (FencedException ignore) { /* Ignored */ } 
-    
-    // New epoch equal to the current epoch is rejected
-    try {
-      s.fence(info, currentEpoch, "fencer");
-    } catch (FencedException ignore) { /* Ignored */ } 
-    
-    // New epoch higher than the current epoch is successful
-    FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
-    assertNotNull(resp);
-  }
-}