Selaa lähdekoodia

commit 5e19a1e832bf827f04bc5ab9e9d71efa163e57cb
Author: Jitendra Nath Pandey <jitendra@sufferhome-lm.(none)>
Date: Sun May 9 01:30:36 2010 -0700

MAPREDUCE-1733 from https://issues.apache.org/jira/secure/attachment/12444054/MR-1733-y20.3.patch

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1733. Authentication between pipes processes and java counterparts.
+ (jitendra)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077456 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 vuotta sitten
vanhempi
commit
c40786e1e3

+ 14 - 10
src/c++/pipes/Makefile.in

@@ -1,4 +1,4 @@
-# Makefile.in generated by automake 1.9 from Makefile.am.
+# Makefile.in generated by automake 1.9.2 from Makefile.am.
 # @configure_input@
 
 # Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
@@ -37,13 +37,12 @@ POST_INSTALL = :
 NORMAL_UNINSTALL = :
 PRE_UNINSTALL = :
 POST_UNINSTALL = :
+build_triplet = @build@
 host_triplet = @host@
-DIST_COMMON = config.guess config.guess config.sub config.sub \
-	$(srcdir)/Makefile.in $(srcdir)/Makefile.am \
-	$(top_srcdir)/configure $(am__configure_deps) \
-	$(top_srcdir)/impl/config.h.in depcomp depcomp ltmain.sh \
-	ltmain.sh config.guess config.guess config.sub config.sub \
-	$(api_HEADERS)
+DIST_COMMON = config.guess config.sub $(srcdir)/Makefile.in \
+	$(srcdir)/Makefile.am $(top_srcdir)/configure \
+	$(am__configure_deps) $(top_srcdir)/impl/config.h.in depcomp \
+	ltmain.sh config.guess config.sub $(api_HEADERS)
 subdir = .
 ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
 am__aclocal_m4_deps = $(top_srcdir)/../utils/m4/hadoop_utils.m4 \
@@ -115,6 +114,7 @@ EGREP = @EGREP@
 EXEEXT = @EXEEXT@
 F77 = @F77@
 FFLAGS = @FFLAGS@
+GREP = @GREP@
 HADOOP_UTILS_PREFIX = @HADOOP_UTILS_PREFIX@
 INSTALL_DATA = @INSTALL_DATA@
 INSTALL_PROGRAM = @INSTALL_PROGRAM@
@@ -140,12 +140,9 @@ SET_MAKE = @SET_MAKE@
 SHELL = @SHELL@
 STRIP = @STRIP@
 VERSION = @VERSION@
-ac_ct_AR = @ac_ct_AR@
 ac_ct_CC = @ac_ct_CC@
 ac_ct_CXX = @ac_ct_CXX@
 ac_ct_F77 = @ac_ct_F77@
-ac_ct_RANLIB = @ac_ct_RANLIB@
-ac_ct_STRIP = @ac_ct_STRIP@
 am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
 am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
 am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
@@ -162,23 +159,30 @@ build_cpu = @build_cpu@
 build_os = @build_os@
 build_vendor = @build_vendor@
 datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
 exec_prefix = @exec_prefix@
 host = @host@
 host_alias = @host_alias@
 host_cpu = @host_cpu@
 host_os = @host_os@
 host_vendor = @host_vendor@
+htmldir = @htmldir@
 includedir = @includedir@
 infodir = @infodir@
 install_sh = @install_sh@
 libdir = @libdir@
 libexecdir = @libexecdir@
+localedir = @localedir@
 localstatedir = @localstatedir@
 mandir = @mandir@
 mkdir_p = @mkdir_p@
 oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
 prefix = @prefix@
 program_transform_name = @program_transform_name@
+psdir = @psdir@
 sbindir = @sbindir@
 sharedstatedir = @sharedstatedir@
 sysconfdir = @sysconfdir@

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 198 - 312
src/c++/pipes/aclocal.m4


Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 639 - 166
src/c++/pipes/configure


+ 106 - 3
src/c++/pipes/impl/HadoopPipes.cc

@@ -31,6 +31,11 @@
 #include <strings.h>
 #include <sys/socket.h>
 #include <pthread.h>
+#include <iostream>
+#include <fstream>
+
+#include <openssl/hmac.h>
+#include <openssl/buffer.h>
 
 using std::map;
 using std::string;
@@ -289,9 +294,9 @@ namespace HadoopPipes {
 
   enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
                      MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
-                     CLOSE, ABORT, 
+                     CLOSE, ABORT, AUTHENTICATION_REQ,
                      OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
-                     REGISTER_COUNTER, INCREMENT_COUNTER};
+                     REGISTER_COUNTER, INCREMENT_COUNTER, AUTHENTICATION_RESP};
 
   class BinaryUpwardProtocol: public UpwardProtocol {
   private:
@@ -302,6 +307,12 @@ namespace HadoopPipes {
       HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
     }
 
+    virtual void authenticate(const string &responseDigest) {
+      serializeInt(AUTHENTICATION_RESP, *stream);
+      serializeString(responseDigest, *stream);
+      stream->flush();
+    }
+
     virtual void output(const string& key, const string& value) {
       serializeInt(OUTPUT, *stream);
       serializeString(key, *stream);
@@ -358,6 +369,82 @@ namespace HadoopPipes {
     BinaryUpwardProtocol * uplink;
     string key;
     string value;
+    string password;
+    bool authDone;
+    void getPassword(string &password) {
+      const char *passwordFile = getenv("hadoop.pipes.shared.secret.location");
+      if (passwordFile == NULL) {
+        return;
+      }
+      std::ifstream fstr(passwordFile, std::fstream::binary);
+      if (fstr.fail()) {
+        std::cerr << "Could not open the password file" << std::endl;
+        return;
+      } 
+      unsigned char * passBuff = new unsigned char [512];
+      fstr.read((char *)passBuff, 512);
+      int passwordLength = fstr.gcount();
+      fstr.close();
+      passBuff[passwordLength] = 0;
+      password.replace(0, passwordLength, (const char *) passBuff, passwordLength);
+      delete [] passBuff;
+      return; 
+    }
+
+    void verifyDigestAndRespond(string& digest, string& challenge) {
+      if (password.empty()) {
+        //password can be empty if process is running in debug mode from
+        //command file.
+        authDone = true;
+        return;
+      }
+
+      if (!verifyDigest(password, digest, challenge)) {
+        std::cerr << "Server failed to authenticate. Exiting" << std::endl;
+        exit(-1);
+      }
+      authDone = true;
+      string responseDigest = createDigest(password, digest);
+      uplink->authenticate(responseDigest);
+    }
+
+    bool verifyDigest(string &password, string& digest, string& challenge) {
+      string expectedDigest = createDigest(password, challenge);
+      if (digest == expectedDigest) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    string createDigest(string &password, string& msg) {
+      HMAC_CTX ctx;
+      unsigned char digest[EVP_MAX_MD_SIZE];
+      HMAC_Init(&ctx, (const unsigned char *)password.c_str(), 
+          password.length(), EVP_sha1());
+      HMAC_Update(&ctx, (const unsigned char *)msg.c_str(), msg.length());
+      unsigned int digestLen;
+      HMAC_Final(&ctx, digest, &digestLen);
+      HMAC_cleanup(&ctx);
+
+      //now apply base64 encoding
+      BIO *bmem, *b64;
+      BUF_MEM *bptr;
+
+      b64 = BIO_new(BIO_f_base64());
+      bmem = BIO_new(BIO_s_mem());
+      b64 = BIO_push(b64, bmem);
+      BIO_write(b64, digest, digestLen);
+      BIO_flush(b64);
+      BIO_get_mem_ptr(b64, &bptr);
+
+      char digestBuffer[bptr->length];
+      memcpy(digestBuffer, bptr->data, bptr->length-1);
+      digestBuffer[bptr->length-1] = 0;
+      BIO_free_all(b64);
+
+      return string(digestBuffer);
+    }
 
   public:
     BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
@@ -365,6 +452,8 @@ namespace HadoopPipes {
       downStream->open(down);
       uplink = new BinaryUpwardProtocol(up);
       handler = _handler;
+      authDone = false;
+      getPassword(password);
     }
 
     UpwardProtocol* getUplink() {
@@ -374,7 +463,22 @@ namespace HadoopPipes {
     virtual void nextEvent() {
       int32_t cmd;
       cmd = deserializeInt(*downStream);
+      if (!authDone && cmd != AUTHENTICATION_REQ) {
+        //Authentication request must be the first message if
+        //authentication is not complete
+        std::cerr << "Command:" << cmd << "received before authentication. " 
+            << "Exiting.." << std::endl;
+        exit(-1);
+      }
       switch (cmd) {
+      case AUTHENTICATION_REQ: {
+        string digest;
+        string challenge;
+        deserializeString(digest, *downStream);
+        deserializeString(challenge, *downStream);
+        verifyDigestAndRespond(digest, challenge);
+        break;
+      }
       case START_MESSAGE: {
         int32_t prot;
         prot = deserializeInt(*downStream);
@@ -1021,7 +1125,6 @@ namespace HadoopPipes {
         setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
         HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
                                      + strerror(errno));
-
         connection = new BinaryProtocol(stream, context, outStream);
       } else if (getenv("hadoop.pipes.command.file")) {
         char* filename = getenv("hadoop.pipes.command.file");

+ 14 - 10
src/c++/utils/Makefile.in

@@ -1,4 +1,4 @@
-# Makefile.in generated by automake 1.9 from Makefile.am.
+# Makefile.in generated by automake 1.9.2 from Makefile.am.
 # @configure_input@
 
 # Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
@@ -37,13 +37,12 @@ POST_INSTALL = :
 NORMAL_UNINSTALL = :
 PRE_UNINSTALL = :
 POST_UNINSTALL = :
+build_triplet = @build@
 host_triplet = @host@
-DIST_COMMON = config.guess config.guess config.sub config.sub \
-	$(srcdir)/Makefile.in $(srcdir)/Makefile.am \
-	$(top_srcdir)/configure $(am__configure_deps) \
-	$(top_srcdir)/impl/config.h.in depcomp depcomp ltmain.sh \
-	ltmain.sh config.guess config.guess config.sub config.sub \
-	$(api_HEADERS)
+DIST_COMMON = config.guess config.sub $(srcdir)/Makefile.in \
+	$(srcdir)/Makefile.am $(top_srcdir)/configure \
+	$(am__configure_deps) $(top_srcdir)/impl/config.h.in depcomp \
+	ltmain.sh config.guess config.sub $(api_HEADERS)
 subdir = .
 ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
 am__aclocal_m4_deps = $(top_srcdir)/m4/hadoop_utils.m4 \
@@ -116,6 +115,7 @@ EGREP = @EGREP@
 EXEEXT = @EXEEXT@
 F77 = @F77@
 FFLAGS = @FFLAGS@
+GREP = @GREP@
 INSTALL_DATA = @INSTALL_DATA@
 INSTALL_PROGRAM = @INSTALL_PROGRAM@
 INSTALL_SCRIPT = @INSTALL_SCRIPT@
@@ -140,12 +140,9 @@ SET_MAKE = @SET_MAKE@
 SHELL = @SHELL@
 STRIP = @STRIP@
 VERSION = @VERSION@
-ac_ct_AR = @ac_ct_AR@
 ac_ct_CC = @ac_ct_CC@
 ac_ct_CXX = @ac_ct_CXX@
 ac_ct_F77 = @ac_ct_F77@
-ac_ct_RANLIB = @ac_ct_RANLIB@
-ac_ct_STRIP = @ac_ct_STRIP@
 am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
 am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
 am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
@@ -162,23 +159,30 @@ build_cpu = @build_cpu@
 build_os = @build_os@
 build_vendor = @build_vendor@
 datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
 exec_prefix = @exec_prefix@
 host = @host@
 host_alias = @host_alias@
 host_cpu = @host_cpu@
 host_os = @host_os@
 host_vendor = @host_vendor@
+htmldir = @htmldir@
 includedir = @includedir@
 infodir = @infodir@
 install_sh = @install_sh@
 libdir = @libdir@
 libexecdir = @libexecdir@
+localedir = @localedir@
 localstatedir = @localstatedir@
 mandir = @mandir@
 mkdir_p = @mkdir_p@
 oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
 prefix = @prefix@
 program_transform_name = @program_transform_name@
+psdir = @psdir@
 sbindir = @sbindir@
 sharedstatedir = @sharedstatedir@
 sysconfdir = @sysconfdir@

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 198 - 312
src/c++/utils/aclocal.m4


Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 637 - 166
src/c++/utils/configure


+ 2 - 0
src/c++/utils/m4/hadoop_utils.m4

@@ -51,6 +51,8 @@ AC_CHECK_HEADERS([pthread.h], [],
   AC_MSG_ERROR(Please check if you have installed the pthread library)) 
 AC_CHECK_LIB([pthread], [pthread_create], [], 
   AC_MSG_ERROR(Cannot find libpthread.so, please check))
+AC_CHECK_LIB([ssl], [HMAC_Init], [], 
+  AC_MSG_ERROR(Cannot find libssl.so, please check))
 ])
 
 # define a macro for using hadoop pipes

+ 14 - 9
src/examples/pipes/Makefile.in

@@ -1,4 +1,4 @@
-# Makefile.in generated by automake 1.9 from Makefile.am.
+# Makefile.in generated by automake 1.9.2 from Makefile.am.
 # @configure_input@
 
 # Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
@@ -36,14 +36,14 @@ POST_INSTALL = :
 NORMAL_UNINSTALL = :
 PRE_UNINSTALL = :
 POST_UNINSTALL = :
+build_triplet = @build@
 host_triplet = @host@
 bin_PROGRAMS = wordcount-simple$(EXEEXT) wordcount-part$(EXEEXT) \
 	wordcount-nopipe$(EXEEXT) pipes-sort$(EXEEXT)
-DIST_COMMON = config.guess config.guess config.sub config.sub \
-	$(srcdir)/Makefile.in $(srcdir)/Makefile.am \
-	$(top_srcdir)/configure $(am__configure_deps) \
-	$(top_srcdir)/impl/config.h.in depcomp depcomp ltmain.sh \
-	ltmain.sh config.guess config.guess config.sub config.sub
+DIST_COMMON = config.guess config.sub $(srcdir)/Makefile.in \
+	$(srcdir)/Makefile.am $(top_srcdir)/configure \
+	$(am__configure_deps) $(top_srcdir)/impl/config.h.in depcomp \
+	ltmain.sh config.guess config.sub
 subdir = .
 ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
 am__aclocal_m4_deps =  \
@@ -120,6 +120,7 @@ EGREP = @EGREP@
 EXEEXT = @EXEEXT@
 F77 = @F77@
 FFLAGS = @FFLAGS@
+GREP = @GREP@
 HADOOP_PIPES_PREFIX = @HADOOP_PIPES_PREFIX@
 HADOOP_UTILS_PREFIX = @HADOOP_UTILS_PREFIX@
 INSTALL_DATA = @INSTALL_DATA@
@@ -146,12 +147,9 @@ SET_MAKE = @SET_MAKE@
 SHELL = @SHELL@
 STRIP = @STRIP@
 VERSION = @VERSION@
-ac_ct_AR = @ac_ct_AR@
 ac_ct_CC = @ac_ct_CC@
 ac_ct_CXX = @ac_ct_CXX@
 ac_ct_F77 = @ac_ct_F77@
-ac_ct_RANLIB = @ac_ct_RANLIB@
-ac_ct_STRIP = @ac_ct_STRIP@
 am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
 am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
 am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
@@ -168,23 +166,30 @@ build_cpu = @build_cpu@
 build_os = @build_os@
 build_vendor = @build_vendor@
 datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
 exec_prefix = @exec_prefix@
 host = @host@
 host_alias = @host_alias@
 host_cpu = @host_cpu@
 host_os = @host_os@
 host_vendor = @host_vendor@
+htmldir = @htmldir@
 includedir = @includedir@
 infodir = @infodir@
 install_sh = @install_sh@
 libdir = @libdir@
 libexecdir = @libexecdir@
+localedir = @localedir@
 localstatedir = @localstatedir@
 mandir = @mandir@
 mkdir_p = @mkdir_p@
 oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
 prefix = @prefix@
 program_transform_name = @program_transform_name@
+psdir = @psdir@
 sbindir = @sbindir@
 sharedstatedir = @sharedstatedir@
 sysconfdir = @sysconfdir@

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 198 - 312
src/examples/pipes/aclocal.m4


Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 641 - 166
src/examples/pipes/configure


+ 79 - 2
src/mapred/org/apache/hadoop/mapred/pipes/Application.java

@@ -26,11 +26,18 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+
+import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -41,6 +48,11 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -82,6 +94,18 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
     env.put("hadoop.pipes.command.port", 
             Integer.toString(serverSocket.getLocalPort()));
+    
+    //Add token to the environment if security is enabled
+    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
+        .getCredentials());
+    // This password is used as shared secret key between this application and
+    // child pipes process
+    byte[]  password = jobToken.getPassword();
+    String localPasswordFile = conf.getJobLocalDir() + Path.SEPARATOR
+        + "jobTokenPassword";
+    writePasswordToLocalFile(localPasswordFile, password, conf);
+    env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
+ 
     List<String> cmd = new ArrayList<String>();
     String interpretor = conf.get("hadoop.pipes.executable.interpretor");
     if (interpretor != null) {
@@ -107,17 +131,52 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
 
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
-    handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
+    
+    String challenge = getSecurityChallenge();
+    String digestToSend = createDigest(password, challenge);
+    String digestExpected = createDigest(password, digestToSend);
+    
+    handler = new OutputHandler<K2, V2>(output, reporter, recordReader, 
+        digestExpected);
     K2 outputKey = (K2)
       ReflectionUtils.newInstance(outputKeyClass, conf);
     V2 outputValue = (V2) 
       ReflectionUtils.newInstance(outputValueClass, conf);
     downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
                                   outputKey, outputValue, conf);
+    
+    downlink.authenticate(digestToSend, challenge);
+    waitForAuthentication();
+    LOG.debug("Authentication succeeded");
     downlink.start();
     downlink.setJobConf(conf);
   }
 
+  private String getSecurityChallenge() {
+    Random rand = new Random(System.currentTimeMillis());
+    //Use 4 random integers so as to have 16 random bytes.
+    StringBuilder strBuilder = new StringBuilder();
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    return strBuilder.toString();
+  }
+
+  private void writePasswordToLocalFile(String localPasswordFile,
+      byte[] password, JobConf conf) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path localPath = new Path(localPasswordFile);
+    if (localFs.isFile(localPath)) {
+      LOG.debug("Password file is already created by previous path");
+      return;
+    }
+    FSDataOutputStream out = FileSystem.create(localFs, localPath,
+        new FsPermission("400"));
+    out.write(password);
+    out.close();
+  }
+
   /**
    * Get the downward protocol object that can send commands down to the
    * application.
@@ -126,7 +185,19 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
   DownwardProtocol<K1, V1> getDownlink() {
     return downlink;
   }
-
+  
+  /**
+   * Wait for authentication response.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void waitForAuthentication() throws IOException,
+      InterruptedException {
+    downlink.flush();
+    LOG.debug("Waiting for authentication response");
+    handler.waitForAuthentication();
+  }
+  
   /**
    * Wait for the application to finish
    * @return did the application finish correctly?
@@ -190,5 +261,11 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     Process result = builder.start();
     return result;
   }
+  
+  public static String createDigest(byte[] password, String data)
+      throws IOException {
+    SecretKey key = JobTokenSecretManager.createSecretKey(password);
+    return SecureShuffleUtils.hashFromString(data, key);
+  }
 
 }

+ 25 - 3
src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
@@ -34,6 +36,8 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -69,13 +73,15 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
                                     REDUCE_VALUE(7),
                                     CLOSE(8),
                                     ABORT(9),
+                                    AUTHENTICATION_REQ(10),
                                     OUTPUT(50),
                                     PARTITIONED_OUTPUT(51),
                                     STATUS(52),
                                     PROGRESS(53),
                                     DONE(54),
                                     REGISTER_COUNTER(55),
-                                    INCREMENT_COUNTER(56);
+                                    INCREMENT_COUNTER(56),
+                                    AUTHENTICATION_RESP(57);
     final int code;
     MessageType(int code) {
       this.code = code;
@@ -90,6 +96,7 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
     private UpwardProtocol<K2, V2> handler;
     private K2 key;
     private V2 value;
+    private boolean authPending = true;
     
     public UplinkReaderThread(InputStream stream,
                               UpwardProtocol<K2, V2> handler, 
@@ -113,7 +120,14 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
           }
           int cmd = WritableUtils.readVInt(inStream);
           LOG.debug("Handling uplink command " + cmd);
-          if (cmd == MessageType.OUTPUT.code) {
+          if (cmd == MessageType.AUTHENTICATION_RESP.code) {
+            String digest = Text.readString(inStream);
+            authPending = !handler.authenticate(digest);
+          } else if (authPending) {
+            LOG.warn("Message " + cmd + " received before authentication is "
+                + "complete. Ignoring");
+            continue;
+          } else if (cmd == MessageType.OUTPUT.code) {
             readObject(key);
             readObject(value);
             handler.output(key, value);
@@ -244,6 +258,15 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
     uplink.interrupt();
     uplink.join();
   }
+  
+  public void authenticate(String digest, String challenge)
+      throws IOException {
+    LOG.debug("Sending AUTHENTICATION_REQ, digest=" + digest + ", challenge="
+        + challenge);
+    WritableUtils.writeVInt(stream, MessageType.AUTHENTICATION_REQ.code);
+    Text.writeString(stream, digest);
+    Text.writeString(stream, challenge);
+  }
 
   public void start() throws IOException {
     LOG.debug("starting downlink");
@@ -344,5 +367,4 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
       stream.write(buffer.getData(), 0, length);
     }
   }
-  
 }

+ 6 - 0
src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java

@@ -31,6 +31,12 @@ import org.apache.hadoop.mapred.JobConf;
  * processed.
  */
 interface DownwardProtocol<K extends WritableComparable, V extends Writable> {
+  /**
+   * request authentication
+   * @throws IOException
+   */
+  void authenticate(String digest, String challenge) throws IOException;
+  
   /**
    * Start communication
    * @throws IOException

+ 33 - 1
src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java

@@ -44,21 +44,26 @@ class OutputHandler<K extends WritableComparable,
   private OutputCollector<K, V> collector;
   private float progressValue = 0.0f;
   private boolean done = false;
+  
   private Throwable exception = null;
   RecordReader<FloatWritable,NullWritable> recordReader = null;
   private Map<Integer, Counters.Counter> registeredCounters = 
     new HashMap<Integer, Counters.Counter>();
 
+  private String expectedDigest = null;
+  private boolean digestReceived = false;
   /**
    * Create a handler that will handle any records output from the application.
    * @param collector the "real" collector that takes the output
    * @param reporter the reporter for reporting progress
    */
   public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
-                       RecordReader<FloatWritable,NullWritable> recordReader) {
+                       RecordReader<FloatWritable,NullWritable> recordReader,
+                       String expectedDigest) {
     this.reporter = reporter;
     this.collector = collector;
     this.recordReader = recordReader;
+    this.expectedDigest = expectedDigest;
   }
 
   /**
@@ -155,5 +160,32 @@ class OutputHandler<K extends WritableComparable,
       throw new IOException("Invalid counter with id: " + id);
     }
   }
+  
+  public synchronized boolean authenticate(String digest) throws IOException {
+    boolean success = true;
+    if (!expectedDigest.equals(digest)) {
+      exception = new IOException("Authentication Failed: Expected digest="
+          + expectedDigest + ", received=" + digestReceived);
+      success = false;
+    }
+    digestReceived = true;
+    notify();
+    return success;
+  }
 
+  /**
+   * This is called by Application and blocks the thread until
+   * authentication response is received.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  synchronized void waitForAuthentication()
+      throws IOException, InterruptedException {
+    while (digestReceived == false && exception == null) {
+      wait();
+    }
+    if (exception != null) {
+      throw new IOException(exception.getMessage());
+    }
+  }
 }

+ 10 - 0
src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java

@@ -88,4 +88,14 @@ interface UpwardProtocol<K extends WritableComparable, V extends Writable> {
    * @throws IOException
    */
   void incrementCounter(int id, long amount) throws IOException;
+
+  /**
+   * Handles authentication response from client.
+   * It must notify the threads waiting for authentication response.
+   * @param digest
+   * @return true if authentication is successful
+   * @throws IOException
+   */
+  boolean authenticate(String digest) throws IOException;
+
 }

Kaikkia tiedostoja ei voida näyttää, sillä liian monta tiedostoa muuttui tässä diffissä