|
@@ -18,6 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
|
|
|
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
|
|
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
|
|
|
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
|
|
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
|
|
@@ -60,6 +62,8 @@ import org.apache.hadoop.io.DataInputByteBuffer;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.ReadaheadPool;
|
|
|
import org.apache.hadoop.io.SecureIOUtils;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
|
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
@@ -72,6 +76,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
|
|
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
|
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.util.Shell;
|
|
@@ -81,7 +86,14 @@ import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
|
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
|
|
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
|
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
+import org.fusesource.leveldbjni.JniDBFactory;
|
|
|
+import org.fusesource.leveldbjni.internal.NativeDB;
|
|
|
+import org.iq80.leveldb.DB;
|
|
|
+import org.iq80.leveldb.DBException;
|
|
|
+import org.iq80.leveldb.Logger;
|
|
|
+import org.iq80.leveldb.Options;
|
|
|
import org.jboss.netty.bootstrap.ServerBootstrap;
|
|
|
import org.jboss.netty.buffer.ChannelBuffers;
|
|
|
import org.jboss.netty.channel.Channel;
|
|
@@ -115,6 +127,7 @@ import org.mortbay.jetty.HttpHeaders;
|
|
|
|
|
|
import com.google.common.base.Charsets;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
+import com.google.protobuf.ByteString;
|
|
|
|
|
|
public class ShuffleHandler extends AuxiliaryService {
|
|
|
|
|
@@ -132,6 +145,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
|
|
|
Pattern.CASE_INSENSITIVE);
|
|
|
|
|
|
+ private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
|
|
|
+ private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
|
|
|
+ private static final String STATE_DB_SCHEMA_VERSION = "1.0";
|
|
|
+
|
|
|
private int port;
|
|
|
private ChannelFactory selector;
|
|
|
private final ChannelGroup accepted = new DefaultChannelGroup();
|
|
@@ -149,14 +166,14 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
private boolean shuffleTransferToAllowed;
|
|
|
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
|
|
|
|
|
|
+ private Map<String,String> userRsrc;
|
|
|
+ private JobTokenSecretManager secretManager;
|
|
|
+
|
|
|
+ private DB stateDb = null;
|
|
|
+
|
|
|
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
|
|
|
"mapreduce_shuffle";
|
|
|
|
|
|
- private static final Map<String,String> userRsrc =
|
|
|
- new ConcurrentHashMap<String,String>();
|
|
|
- private static final JobTokenSecretManager secretManager =
|
|
|
- new JobTokenSecretManager();
|
|
|
-
|
|
|
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
|
|
|
public static final int DEFAULT_SHUFFLE_PORT = 13562;
|
|
|
|
|
@@ -292,9 +309,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
|
|
|
// TODO: Once SHuffle is out of NM, this can use MR APIs
|
|
|
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
|
|
|
- userRsrc.put(jobId.toString(), user);
|
|
|
- LOG.info("Added token for " + jobId.toString());
|
|
|
- secretManager.addTokenForJob(jobId.toString(), jt);
|
|
|
+ recordJobShuffleInfo(jobId, user, jt);
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Error during initApp", e);
|
|
|
// TODO add API to AuxiliaryServices to report failures
|
|
@@ -305,8 +320,12 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
public void stopApplication(ApplicationTerminationContext context) {
|
|
|
ApplicationId appId = context.getApplicationId();
|
|
|
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
|
|
|
- secretManager.removeTokenForJob(jobId.toString());
|
|
|
- userRsrc.remove(jobId.toString());
|
|
|
+ try {
|
|
|
+ removeJobShuffleInfo(jobId);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error during stopApp", e);
|
|
|
+ // TODO add API to AuxiliaryServices to report failures
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -350,6 +369,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
Configuration conf = getConfig();
|
|
|
+ userRsrc = new ConcurrentHashMap<String,String>();
|
|
|
+ secretManager = new JobTokenSecretManager();
|
|
|
+ recoverState(conf);
|
|
|
ServerBootstrap bootstrap = new ServerBootstrap(selector);
|
|
|
try {
|
|
|
pipelineFact = new HttpPipelineFactory(conf);
|
|
@@ -389,6 +411,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
if (pipelineFact != null) {
|
|
|
pipelineFact.destroy();
|
|
|
}
|
|
|
+ if (stateDb != null) {
|
|
|
+ stateDb.close();
|
|
|
+ }
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
@@ -407,6 +432,140 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
return new Shuffle(conf);
|
|
|
}
|
|
|
|
|
|
+ private void recoverState(Configuration conf) throws IOException {
|
|
|
+ Path recoveryRoot = getRecoveryPath();
|
|
|
+ if (recoveryRoot != null) {
|
|
|
+ startStore(recoveryRoot);
|
|
|
+ Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
|
|
|
+ LeveldbIterator iter = null;
|
|
|
+ try {
|
|
|
+ iter = new LeveldbIterator(stateDb);
|
|
|
+ iter.seek(bytes(JobID.JOB));
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Map.Entry<byte[],byte[]> entry = iter.next();
|
|
|
+ String key = asString(entry.getKey());
|
|
|
+ if (!jobPattern.matcher(key).matches()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ recoverJobShuffleInfo(key, entry.getValue());
|
|
|
+ }
|
|
|
+ } catch (DBException e) {
|
|
|
+ throw new IOException("Database error during recovery", e);
|
|
|
+ } finally {
|
|
|
+ if (iter != null) {
|
|
|
+ iter.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startStore(Path recoveryRoot) throws IOException {
|
|
|
+ Options options = new Options();
|
|
|
+ options.createIfMissing(false);
|
|
|
+ options.logger(new LevelDBLogger());
|
|
|
+ Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
|
|
|
+ LOG.info("Using state database at " + dbPath + " for recovery");
|
|
|
+ File dbfile = new File(dbPath.toString());
|
|
|
+ byte[] schemaVersionData;
|
|
|
+ try {
|
|
|
+ stateDb = JniDBFactory.factory.open(dbfile, options);
|
|
|
+ schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
|
|
|
+ } catch (NativeDB.DBException e) {
|
|
|
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
|
|
+ LOG.info("Creating state database at " + dbfile);
|
|
|
+ options.createIfMissing(true);
|
|
|
+ try {
|
|
|
+ stateDb = JniDBFactory.factory.open(dbfile, options);
|
|
|
+ schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
|
|
|
+ stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
|
|
|
+ } catch (DBException dbExc) {
|
|
|
+ throw new IOException("Unable to create state store", dbExc);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (schemaVersionData != null) {
|
|
|
+ String schemaVersion = asString(schemaVersionData);
|
|
|
+ // only support exact schema matches for now
|
|
|
+ if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
|
|
|
+ throw new IOException("Incompatible state database schema, found "
|
|
|
+ + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IOException("State database schema version not found");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addJobToken(JobID jobId, String user,
|
|
|
+ Token<JobTokenIdentifier> jobToken) {
|
|
|
+ userRsrc.put(jobId.toString(), user);
|
|
|
+ secretManager.addTokenForJob(jobId.toString(), jobToken);
|
|
|
+ LOG.info("Added token for " + jobId.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void recoverJobShuffleInfo(String jobIdStr, byte[] data)
|
|
|
+ throws IOException {
|
|
|
+ JobID jobId;
|
|
|
+ try {
|
|
|
+ jobId = JobID.forName(jobIdStr);
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
+ throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data);
|
|
|
+ String user = proto.getUser();
|
|
|
+ TokenProto tokenProto = proto.getJobToken();
|
|
|
+ Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
|
|
|
+ tokenProto.getIdentifier().toByteArray(),
|
|
|
+ tokenProto.getPassword().toByteArray(),
|
|
|
+ new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
|
|
|
+ addJobToken(jobId, user, jobToken);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void recordJobShuffleInfo(JobID jobId, String user,
|
|
|
+ Token<JobTokenIdentifier> jobToken) throws IOException {
|
|
|
+ if (stateDb != null) {
|
|
|
+ TokenProto tokenProto = TokenProto.newBuilder()
|
|
|
+ .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier()))
|
|
|
+ .setPassword(ByteString.copyFrom(jobToken.getPassword()))
|
|
|
+ .setKind(jobToken.getKind().toString())
|
|
|
+ .setService(jobToken.getService().toString())
|
|
|
+ .build();
|
|
|
+ JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder()
|
|
|
+ .setUser(user).setJobToken(tokenProto).build();
|
|
|
+ try {
|
|
|
+ stateDb.put(bytes(jobId.toString()), proto.toByteArray());
|
|
|
+ } catch (DBException e) {
|
|
|
+ throw new IOException("Error storing " + jobId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ addJobToken(jobId, user, jobToken);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removeJobShuffleInfo(JobID jobId) throws IOException {
|
|
|
+ String jobIdStr = jobId.toString();
|
|
|
+ secretManager.removeTokenForJob(jobIdStr);
|
|
|
+ userRsrc.remove(jobIdStr);
|
|
|
+ if (stateDb != null) {
|
|
|
+ try {
|
|
|
+ stateDb.delete(bytes(jobIdStr));
|
|
|
+ } catch (DBException e) {
|
|
|
+ throw new IOException("Unable to remove " + jobId
|
|
|
+ + " from state store", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class LevelDBLogger implements Logger {
|
|
|
+ private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void log(String message) {
|
|
|
+ LOG.info(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
class HttpPipelineFactory implements ChannelPipelineFactory {
|
|
|
|
|
|
final Shuffle SHUFFLE;
|