Kaynağa Gözat

HDFS-2941. Add an administrative command to download a copy of the fsimage from the NN. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1305450 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 yıl önce
ebeveyn
işleme
7fb62a08e8

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -55,6 +55,9 @@ Release 0.23.3 - UNRELEASED
     Suresh Srinivas, Jitendra Nath Pandey, Hari Mankude, Brandon Li, Sanjay
     Radia, Mingjie Lai, and Gregory Chanan
 
+    HDFS-2941. Add an administrative command to download a copy of the fsimage
+    from the NN. (atm)
+
   IMPROVEMENTS
 
     HDFS-2018. Move all journal stream management code into one place.

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java

@@ -30,11 +30,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -258,4 +261,27 @@ public class HAUtil {
     LOG.debug("Mapped HA service delegation token for logical URI " +
         haUri + " to namenode " + singleNNAddr);
   }
+
+  /**
+   * Get the internet address of the currently-active NN. This should rarely be
+   * used, since callers of this method who connect directly to the NN using the
+   * resulting InetSocketAddress will not be able to connect to the active NN if
+   * a failover were to occur after this method has been called.
+   * 
+   * @param fs the file system to get the active address of.
+   * @return the internet address of the currently-active NN.
+   * @throws IOException if an error occurs while resolving the active NN.
+   */
+  @SuppressWarnings("deprecation")
+  public static InetSocketAddress getAddressOfActive(FileSystem fs)
+      throws IOException {
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IllegalArgumentException("FileSystem " + fs + " is not a DFS.");
+    }
+    // force client address resolution.
+    fs.exists(new Path("/"));
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DFSClient dfsClient = dfs.getClient();
+    return RPC.getServerAddress(dfsClient.getNamenode());
+  }
 }

+ 41 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java

@@ -57,10 +57,14 @@ public class GetImageServlet extends HttpServlet {
 
   private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
 
+  public final static String CONTENT_DISPOSITION = "Content-Disposition";
+  public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
+  
   private static final String TXID_PARAM = "txid";
   private static final String START_TXID_PARAM = "startTxId";
   private static final String END_TXID_PARAM = "endTxId";
   private static final String STORAGEINFO_PARAM = "storageInfo";
+  private static final String LATEST_FSIMAGE_VALUE = "latest";
   
   private static Set<Long> currentlyDownloadingCheckpoints =
     Collections.<Long>synchronizedSet(new HashSet<Long>());
@@ -101,10 +105,18 @@ public class GetImageServlet extends HttpServlet {
         public Void run() throws Exception {
           if (parsedParams.isGetImage()) {
             long txid = parsedParams.getTxId();
-            File imageFile = nnImage.getStorage().getFsImageName(txid);
+            File imageFile = null;
+            String errorMessage = "Could not find image";
+            if (parsedParams.shouldFetchLatest()) {
+              imageFile = nnImage.getStorage().getHighestFsImageName();
+            } else {
+              errorMessage += " with txid " + txid;
+              imageFile = nnImage.getStorage().getFsImageName(txid);
+            }
             if (imageFile == null) {
-              throw new IOException("Could not find image with txid " + txid);
+              throw new IOException(errorMessage);
             }
+            setFileNameHeaders(response, imageFile);
             setVerificationHeaders(response, imageFile);
             // send fsImage
             TransferFsImage.getFileServer(response.getOutputStream(), imageFile,
@@ -117,6 +129,7 @@ public class GetImageServlet extends HttpServlet {
                 .findFinalizedEditsFile(startTxId, endTxId);
             setVerificationHeaders(response, editFile);
             
+            setFileNameHeaders(response, editFile);
             // send edits
             TransferFsImage.getFileServer(response.getOutputStream(), editFile,
                 getThrottler(conf));
@@ -182,6 +195,13 @@ public class GetImageServlet extends HttpServlet {
     }
   }
   
+  private static void setFileNameHeaders(HttpServletResponse response,
+      File file) {
+    response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
+        file.getName());
+    response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName());
+  }
+  
   /**
    * Construct a throttler from conf
    * @param conf configuration
@@ -198,7 +218,6 @@ public class GetImageServlet extends HttpServlet {
     return throttler;
   }
   
-  @SuppressWarnings("deprecation")
   protected boolean isValidRequestor(String remoteUser, Configuration conf)
       throws IOException {
     if(remoteUser == null) { // This really shouldn't happen...
@@ -243,13 +262,16 @@ public class GetImageServlet extends HttpServlet {
       response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
     }
   }
+  
+  static String getParamStringForMostRecentImage() {
+    return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
+  }
 
   static String getParamStringForImage(long txid,
       StorageInfo remoteStorageInfo) {
     return "getimage=1&" + TXID_PARAM + "=" + txid
       + "&" + STORAGEINFO_PARAM + "=" +
       remoteStorageInfo.toColonSeparatedString();
-    
   }
 
   static String getParamStringForLog(RemoteEditLog log,
@@ -280,6 +302,7 @@ public class GetImageServlet extends HttpServlet {
     private String machineName;
     private long startTxId, endTxId, txId;
     private String storageInfoString;
+    private boolean fetchLatest;
 
     /**
      * @param request the object from which this servlet reads the url contents
@@ -291,7 +314,7 @@ public class GetImageServlet extends HttpServlet {
                            ) throws IOException {
       @SuppressWarnings("unchecked")
       Map<String, String[]> pmap = request.getParameterMap();
-      isGetImage = isGetEdit = isPutImage = false;
+      isGetImage = isGetEdit = isPutImage = fetchLatest = false;
       remoteport = 0;
       machineName = null;
 
@@ -300,7 +323,15 @@ public class GetImageServlet extends HttpServlet {
         String[] val = entry.getValue();
         if (key.equals("getimage")) { 
           isGetImage = true;
-          txId = parseLongParam(request, TXID_PARAM);
+          try {
+            txId = parseLongParam(request, TXID_PARAM);
+          } catch (NumberFormatException nfe) {
+            if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
+              fetchLatest = true;
+            } else {
+              throw nfe;
+            }
+          }
         } else if (key.equals("getedit")) { 
           isGetEdit = true;
           startTxId = parseLongParam(request, START_TXID_PARAM);
@@ -361,6 +392,10 @@ public class GetImageServlet extends HttpServlet {
       return machineName + ":" + remoteport;
     }
     
+    boolean shouldFetchLatest() {
+      return fetchLatest;
+    }
+    
     private static long parseLongParam(HttpServletRequest request, String param)
         throws IOException {
       // Parse the 'txid' parameter which indicates which image is to be

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java

@@ -530,6 +530,10 @@ public class NNStorage extends Storage implements Closeable {
     }
     return null;
   }
+  
+  public File getHighestFsImageName() {
+    return getFsImageName(getMostRecentCheckpointTxId());
+  }
 
   /** Create new dfs name directory.  Caution: this destroys all files
    * in this filesystem. */

+ 32 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java

@@ -21,6 +21,7 @@ import java.io.*;
 import java.net.*;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
+import java.util.ArrayList;
 import java.util.List;
 import java.lang.Math;
 
@@ -51,6 +52,13 @@ public class TransferFsImage {
   public final static String MD5_HEADER = "X-MD5-Digest";
 
   private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
+  
+  public static void downloadMostRecentImageToDirectory(String fsName,
+      File dir) throws IOException {
+    String fileId = GetImageServlet.getParamStringForMostRecentImage();
+    getFileClient(fsName, fileId, Lists.newArrayList(dir),
+        null, false);
+  }
 
   public static MD5Hash downloadImageToStorage(
       String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
@@ -227,6 +235,25 @@ public class TransferFsImage {
                             "by the namenode when trying to fetch " + str);
     }
     
+    if (localPaths != null) {
+      String fsImageName = connection.getHeaderField(
+          GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
+      // If the local paths refer to directories, use the server-provided header
+      // as the filename within that directory
+      List<File> newLocalPaths = new ArrayList<File>();
+      for (File localPath : localPaths) {
+        if (localPath.isDirectory()) {
+          if (fsImageName == null) {
+            throw new IOException("No filename header provided by server");
+          }
+          newLocalPaths.add(new File(localPath, fsImageName));
+        } else {
+          newLocalPaths.add(localPath);
+        }
+      }
+      localPaths = newLocalPaths;
+    }
+    
     MD5Hash advertisedDigest = parseMD5Header(connection);
 
     long received = 0;
@@ -251,7 +278,11 @@ public class TransferFsImage {
             outputStreams.add(new FileOutputStream(f));
           } catch (IOException ioe) {
             LOG.warn("Unable to download file " + f, ioe);
-            dstStorage.reportErrorOnFile(f);
+            // This will be null if we're downloading the fsimage to a file
+            // outside of an NNStorage directory.
+            if (dstStorage != null) {
+              dstStorage.reportErrorOnFile(f);
+            }
           }
         }
         

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.tools;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -37,6 +38,7 @@ import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -488,6 +491,25 @@ public class DFSAdmin extends FsShell {
     return exitCode;
   }
 
+  /**
+   * Download the most recent fsimage from the name node, and save it to a local
+   * file in the given directory.
+   * 
+   * @param argv
+   *          List of of command line parameters.
+   * @param idx
+   *          The index of the command that is being processed.
+   * @return an exit code indicating success or failure.
+   * @throws IOException
+   */
+  public int fetchImage(String[] argv, int idx) throws IOException {
+    String infoServer = DFSUtil.getInfoServer(
+        HAUtil.getAddressOfActive(getDFS()), getConf(), true);
+    TransferFsImage.downloadMostRecentImageToDirectory(infoServer,
+        new File(argv[idx]));
+    return 0;
+  }
+
   private void printHelp(String cmd) {
     String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
       "The full syntax is: \n\n" +
@@ -506,6 +528,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshNamenodes datanodehost:port]\n"+
       "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
       "\t[-setBalancerBandwidth <bandwidth>]\n" +
+      "\t[-fetchImage <local directory>]\n" +
       "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -592,6 +615,10 @@ public class DFSAdmin extends FsShell {
       "\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" +
       "\t\t--- NOTE: The new value is not persistent on the DataNode.---\n";
     
+    String fetchImage = "-fetchImage <local directory>:\n" +
+      "\tDownloads the most recent fsimage from the Name Node and saves it in" +
+      "\tthe specified local directory.\n";
+    
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -633,6 +660,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(deleteBlockPool);
     } else if ("setBalancerBandwidth".equals(cmd)) {
       System.out.println(setBalancerBandwidth);
+    } else if ("fetchImage".equals(cmd)) {
+      System.out.println(fetchImage);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -655,6 +684,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
+      System.out.println(setBalancerBandwidth);
+      System.out.println(fetchImage);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -917,6 +948,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-setBalancerBandwidth".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                   + " [-setBalancerBandwidth <bandwidth in bytes per second>]");
+    } else if ("-fetchImage".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+          + " [-fetchImage <local directory>]");
     } else {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
@@ -939,6 +973,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           ["+SetSpaceQuotaCommand.USAGE+"]");
       System.err.println("           ["+ClearSpaceQuotaCommand.USAGE+"]");      
       System.err.println("           [-setBalancerBandwidth <bandwidth in bytes per second>]");
+      System.err.println("           [-fetchImage <local directory>]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -1035,6 +1070,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-fetchImage".equals(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
     
     // initialize DFSAdmin
@@ -1089,6 +1129,8 @@ public class DFSAdmin extends FsShell {
         exitCode = deleteBlockPool(argv, i);
       } else if ("-setBalancerBandwidth".equals(cmd)) {
         exitCode = setBalancerBandwidth(argv, i);
+      } else if ("-fetchImage".equals(cmd)) {
+        exitCode = fetchImage(argv, i);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

+ 2 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
-import java.net.InetSocketAddress;
 import java.net.URL;
 import java.net.URLConnection;
 import java.net.URLEncoder;
@@ -32,14 +31,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -226,20 +222,7 @@ public class DFSck extends Configured implements Tool {
       return null;
     }
     
-    // force client address resolution.
-    fs.exists(new Path("/"));
-    
-    // Derive the nameservice ID from the filesystem connection. The URI may
-    // have been provided by a human, the server name may be aliased, or there
-    // may be multiple possible actual addresses (e.g. in an HA setup) so
-    // compare InetSocketAddresses instead of URI strings, and test against both
-    // possible configurations of RPC address (DFS_NAMENODE_RPC_ADDRESS_KEY and
-    // DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY).
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
-    DFSClient dfsClient = dfs.getClient();
-    InetSocketAddress addr = RPC.getServerAddress(dfsClient.getNamenode());
-    
-    return DFSUtil.getInfoServer(addr, conf, true);
+    return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf, true);
   }
 
   private int doWork(final String[] args) throws IOException {

+ 118 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFetchImage.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.junit.Test;
+
+public class TestFetchImage {
+  
+  private static File FETCHED_IMAGE_FILE = new File(
+      System.getProperty("build.test.dir"), "fetched-image-dir");
+  // Shamelessly stolen from NNStorage.
+  private static final Pattern IMAGE_REGEX = Pattern.compile("fsimage_(\\d+)");
+
+  /**
+   * Download a few fsimages using `hdfs dfsadmin -fetchImage ...' and verify
+   * the results.
+   */
+  @Test
+  public void testFetchImage() throws Exception {
+    FETCHED_IMAGE_FILE.mkdirs();
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    FileSystem fs = null;
+    try {
+      DFSAdmin dfsAdmin = new DFSAdmin();
+      dfsAdmin.setConf(conf);
+      
+      runFetchImage(dfsAdmin, cluster);
+      
+      fs = cluster.getFileSystem();
+      fs.mkdirs(new Path("/foo"));
+      fs.mkdirs(new Path("/foo2"));
+      fs.mkdirs(new Path("/foo3"));
+      
+      cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      cluster.getNameNodeRpc().saveNamespace();
+      cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+      
+      runFetchImage(dfsAdmin, cluster);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Run `hdfs dfsadmin -fetchImage ...' and verify that the downloaded image is
+   * correct.
+   */
+  private static void runFetchImage(DFSAdmin dfsAdmin, MiniDFSCluster cluster)
+      throws Exception {
+    int retVal = dfsAdmin.run(new String[]{"-fetchImage",
+        FETCHED_IMAGE_FILE.getPath() });
+    
+    assertEquals(0, retVal);
+    
+    File highestImageOnNn = getHighestFsImageOnCluster(cluster);
+    MD5Hash expected = MD5FileUtils.computeMd5ForFile(highestImageOnNn);
+    MD5Hash actual = MD5FileUtils.computeMd5ForFile(
+        new File(FETCHED_IMAGE_FILE, highestImageOnNn.getName()));
+    
+    assertEquals(expected, actual);
+  }
+  
+  /**
+   * @return the fsimage with highest transaction ID in the cluster.
+   */
+  private static File getHighestFsImageOnCluster(MiniDFSCluster cluster) {
+    long highestImageTxId = -1;
+    File highestImageOnNn = null;
+    for (URI nameDir : cluster.getNameDirs(0)) {
+      for (File imageFile : new File(new File(nameDir), "current").listFiles()) {
+        Matcher imageMatch = IMAGE_REGEX.matcher(imageFile.getName());
+        if (imageMatch.matches()) {
+          long imageTxId = Long.valueOf(imageMatch.group(1));
+          if (imageTxId > highestImageTxId) {
+            highestImageTxId = imageTxId;
+            highestImageOnNn = imageFile;
+          }
+        }
+      }
+    }
+    return highestImageOnNn;
+  }
+}