Преглед изворни кода

HADOOP-3337. Loading FSEditLog was broken by HADOOP-3283 since it
changed Writable serialization of DatanodeInfo. This patch handles it.
(Tsz Wo (Nicholas), SZE via rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@653264 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi пре 17 година
родитељ
комит
596de87a65

+ 4 - 0
CHANGES.txt

@@ -138,6 +138,10 @@ Trunk (unreleased changes)
     committed. Updated all references to use the new JobID representation.
     (taton via nigel)
 
+    HADOOP-3337. Loading FSEditLog was broken by HADOOP-3283 since it 
+    changed Writable serialization of DatanodeInfo. This patch handles it.
+    (Tsz Wo (Nicholas), SZE via rangadi)
+
 Release 0.17.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 40 - 0
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -17,9 +17,16 @@
  */
 package org.apache.hadoop.dfs;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.*;
 
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
+import org.apache.hadoop.dfs.DatanodeInfo.AdminStates;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.WritableUtils;
 
 /**************************************************
  * DatanodeDescriptor tracks stats on a given DataNode,
@@ -365,4 +372,37 @@ public class DatanodeDescriptor extends DatanodeInfo {
       toRemove.add(it.next());
     this.removeBlock(delimiter);
   }
+
+  /** Serialization for FSEditLog */
+  //TODO: remove this method in HADOOP-3329
+  void write2FSEditLog(DataOutput out) throws IOException {
+    UTF8.writeString(out, name);
+    UTF8.writeString(out, storageID);
+    out.writeShort(infoPort);
+    out.writeLong(capacity);
+    out.writeLong(dfsUsed);
+    out.writeLong(remaining);
+    out.writeLong(lastUpdate);
+    out.writeInt(xceiverCount);
+    Text.writeString(out, location);
+    Text.writeString(out, hostName == null? "": hostName);
+    WritableUtils.writeEnum(out, getAdminState());
+  }
+  
+  /** Serialization for FSEditLog */
+  //TODO: remove this method in HADOOP-3329
+  void readFieldsFromFSEditLog(DataInput in) throws IOException {
+    this.name = UTF8.readString(in);
+    this.storageID = UTF8.readString(in);
+    this.infoPort = in.readShort() & 0x0000ffff;
+
+    this.capacity = in.readLong();
+    this.dfsUsed = in.readLong();
+    this.remaining = in.readLong();
+    this.lastUpdate = in.readLong();
+    this.xceiverCount = in.readInt();
+    this.location = Text.readString(in);
+    this.hostName = Text.readString(in);
+    setAdminState(WritableUtils.readEnum(in, AdminStates.class));
+  }
 }

+ 6 - 13
src/java/org/apache/hadoop/dfs/DatanodeInfo.java

@@ -24,7 +24,6 @@ import java.util.Date;
 
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
@@ -44,12 +43,12 @@ public class DatanodeInfo extends DatanodeID implements Node {
   protected long remaining;
   protected long lastUpdate;
   protected int xceiverCount;
-  private String location = NetworkTopology.UNRESOLVED;
+  protected String location = NetworkTopology.UNRESOLVED;
 
   /** HostName as suplied by the datanode during registration as its 
    * name. Namenode uses datanode IP address as the name.
    */
-  private String hostName = null;
+  protected String hostName = null;
   
   // administrative states of a datanode
   public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
@@ -285,7 +284,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
   public void write(DataOutput out) throws IOException {
     super.write(out);
 
-    //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+    //TODO: move it to DatanodeID once DatanodeID is not stored in FSImage
     out.writeShort(ipcPort);
 
     out.writeLong(capacity);
@@ -294,11 +293,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
     out.writeLong(lastUpdate);
     out.writeInt(xceiverCount);
     Text.writeString(out, location);
-    if (hostName == null) {
-      Text.writeString(out, "");
-    } else {
-      Text.writeString(out, hostName);
-    }
+    Text.writeString(out, hostName == null? "": hostName);
     WritableUtils.writeEnum(out, getAdminState());
   }
 
@@ -306,7 +301,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
 
-    //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+    //TODO: move it to DatanodeID once DatanodeID is not stored in FSImage
     this.ipcPort = in.readShort() & 0x0000ffff;
 
     this.capacity = in.readLong();
@@ -316,8 +311,6 @@ public class DatanodeInfo extends DatanodeID implements Node {
     this.xceiverCount = in.readInt();
     this.location = Text.readString(in);
     this.hostName = Text.readString(in);
-    AdminStates newState = WritableUtils.readEnum(in,
-                                                                AdminStates.class);
-    setAdminState(newState);
+    setAdminState(WritableUtils.readEnum(in, AdminStates.class));
   }
 }

+ 32 - 7
src/java/org/apache/hadoop/dfs/FSEditLog.java

@@ -18,7 +18,9 @@
 package org.apache.hadoop.dfs;
 
 import java.io.BufferedInputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
@@ -492,15 +494,12 @@ class FSEditLog {
             if (opcode == OP_ADD && logVersion <= -12) {
               UTF8 uu = new UTF8();
               UTF8 cl = new UTF8();
-              aw = new ArrayWritable(DatanodeDescriptor.class);
               uu.readFields(in);
               cl.readFields(in);
-              aw.readFields(in);
+              lastLocations = readDatanodeDescriptorArray(in);
               clientName = uu.toString();
               clientMachine = cl.toString();
               writables = aw.get();
-              lastLocations = new DatanodeDescriptor[writables.length];
-              System.arraycopy(writables, 0, lastLocations, 0, writables.length);
             } else {
               lastLocations = new DatanodeDescriptor[0];
             }
@@ -855,8 +854,7 @@ class FSEditLog {
    */
   void logOpenFile(String path, INodeFileUnderConstruction newNode) 
                    throws IOException {
-
-    DatanodeDescriptor[] locations = newNode.getLastBlockLocations();
+    final DatanodeDescriptor[] locations = newNode.getLastBlockLocations();
 
     UTF8 nameReplicationPair[] = new UTF8[] { 
       new UTF8(path), 
@@ -869,7 +867,12 @@ class FSEditLog {
             newNode.getPermissionStatus(),
             new UTF8(newNode.getClientName()),
             new UTF8(newNode.getClientMachine()),
-            new ArrayWritable(DatanodeDescriptor.class, locations));
+            new Writable() {
+              public void readFields(DataInput in) {}
+              public void write(DataOutput out) throws IOException {
+                writeDatanodeDescriptorArray(out, locations);
+              }
+    });
   }
 
   /** 
@@ -1066,4 +1069,26 @@ class FSEditLog {
   static void setBufferCapacity(int size) {
     sizeFlushBuffer = size;
   }
+
+  /** This method is defined for compatibility reason. */
+  //TODO: remove this class in HADOOP-3329
+  static private void writeDatanodeDescriptorArray(DataOutput out,
+      DatanodeDescriptor[] locations) throws IOException {
+    out.writeInt(locations.length);                 // write values
+    for (int i = 0; i < locations.length; i++) {
+      locations[i].write2FSEditLog(out);
+    }
+  }
+
+  /** This method is defined for compatibility reason. */
+  //TODO: remove this class in HADOOP-3329
+  static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
+      ) throws IOException {
+    DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = new DatanodeDescriptor();
+      locations[i].readFieldsFromFSEditLog(in);
+    }
+    return locations;
+  }
 }