|
@@ -17,42 +17,100 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
-import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
-import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
-import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
|
|
|
+
|
|
|
+import java.io.BufferedInputStream;
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.Socket;
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
+import java.nio.BufferOverflowException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.util.AbstractMap;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.EnumSet;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.SortedMap;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+
|
|
|
+import javax.net.SocketFactory;
|
|
|
+import javax.security.auth.login.LoginException;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.BlockLocation;
|
|
|
+import org.apache.hadoop.fs.ChecksumException;
|
|
|
+import org.apache.hadoop.fs.ContentSummary;
|
|
|
+import org.apache.hadoop.fs.CreateFlag;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSInputChecker;
|
|
|
+import org.apache.hadoop.fs.FSInputStream;
|
|
|
+import org.apache.hadoop.fs.FSOutputSummer;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FsStatus;
|
|
|
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.Syncable;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.ipc.*;
|
|
|
-import org.apache.hadoop.net.NetUtils;
|
|
|
-import org.apache.hadoop.net.NodeBase;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
-import org.apache.hadoop.hdfs.protocol.*;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
|
-import org.apache.hadoop.security.InvalidAccessTokenException;
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.io.EnumSetWritable;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.MD5Hash;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
+import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
+import org.apache.hadoop.ipc.Client;
|
|
|
+import org.apache.hadoop.ipc.RPC;
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.AccessToken;
|
|
|
+import org.apache.hadoop.security.InvalidAccessTokenException;
|
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
-import org.apache.hadoop.util.*;
|
|
|
-
|
|
|
-import org.apache.commons.logging.*;
|
|
|
-
|
|
|
-import java.io.*;
|
|
|
-import java.net.*;
|
|
|
-import java.util.*;
|
|
|
-import java.util.zip.CRC32;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.nio.BufferOverflowException;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
-
|
|
|
-import javax.net.SocketFactory;
|
|
|
-import javax.security.auth.login.LoginException;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.DataChecksum;
|
|
|
+import org.apache.hadoop.util.Progressable;
|
|
|
+import org.apache.hadoop.util.PureJavaCrc32;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/********************************************************
|
|
|
* DFSClient can connect to a Hadoop Filesystem and
|
|
@@ -69,8 +127,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
public static final Log LOG = LogFactory.getLog(DFSClient.class);
|
|
|
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
|
|
|
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
|
- public final ClientProtocol namenode;
|
|
|
- final private ClientProtocol rpcNamenode;
|
|
|
+ private ClientProtocol namenode;
|
|
|
+ private ClientProtocol rpcNamenode;
|
|
|
final UnixUserGroupInformation ugi;
|
|
|
volatile boolean clientRunning = true;
|
|
|
Random r = new Random();
|
|
@@ -161,6 +219,29 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
|
|
|
FileSystem.Statistics stats)
|
|
|
throws IOException {
|
|
|
+ this(conf, stats);
|
|
|
+ this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
|
|
|
+ this.namenode = createNamenode(this.rpcNamenode);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a new DFSClient connected to the given namenode
|
|
|
+ * and rpcNamenode objects.
|
|
|
+ *
|
|
|
+ * This constructor was written to allow easy testing of the DFSClient class.
|
|
|
+ * End users will most likely want to use one of the other constructors.
|
|
|
+ */
|
|
|
+ public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
|
|
|
+ Configuration conf, FileSystem.Statistics stats)
|
|
|
+ throws IOException {
|
|
|
+ this(conf, stats);
|
|
|
+ this.namenode = namenode;
|
|
|
+ this.rpcNamenode = rpcNamenode;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private DFSClient(Configuration conf, FileSystem.Statistics stats)
|
|
|
+ throws IOException {
|
|
|
this.conf = conf;
|
|
|
this.stats = stats;
|
|
|
this.socketTimeout = conf.getInt("dfs.socket.timeout",
|
|
@@ -182,9 +263,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
throw (IOException)(new IOException().initCause(e));
|
|
|
}
|
|
|
|
|
|
- this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
|
|
|
- this.namenode = createNamenode(rpcNamenode);
|
|
|
-
|
|
|
String taskId = conf.get("mapred.task.id");
|
|
|
if (taskId != null) {
|
|
|
this.clientName = "DFSClient_" + taskId;
|
|
@@ -372,6 +450,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
return create(src, overwrite, replication, blockSize, null);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the namenode associated with this DFSClient object
|
|
|
+ * @return the namenode associated with this DFSClient object
|
|
|
+ */
|
|
|
+ public ClientProtocol getNamenode() {
|
|
|
+ return namenode;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Create a new dfs file with the specified block replication
|
|
@@ -619,15 +705,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("write to " + datanodes[j].getName() + ": "
|
|
|
- + DataTransferProtocol.OP_BLOCK_CHECKSUM +
|
|
|
- ", block=" + block);
|
|
|
+ + BLOCK_CHECKSUM + ", block=" + block);
|
|
|
}
|
|
|
DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
|
|
|
block.getGenerationStamp(), lb.getAccessToken());
|
|
|
|
|
|
- final short reply = in.readShort();
|
|
|
- if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
|
|
|
+ final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
|
|
|
+ if (reply != SUCCESS) {
|
|
|
+ if (reply == ERROR_ACCESS_TOKEN
|
|
|
&& i > lastRetriedIndex) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
|
@@ -1353,9 +1438,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
new BufferedInputStream(NetUtils.getInputStream(sock),
|
|
|
bufferSize));
|
|
|
|
|
|
- short status = in.readShort();
|
|
|
- if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
|
|
|
+ DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
|
|
|
+ if (status != SUCCESS) {
|
|
|
+ if (status == ERROR_ACCESS_TOKEN) {
|
|
|
throw new InvalidAccessTokenException(
|
|
|
"Got access token error in response to OP_READ_BLOCK "
|
|
|
+ "for file " + file + " for block " + blockId);
|
|
@@ -1402,9 +1487,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private void checksumOk(Socket sock) {
|
|
|
try {
|
|
|
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
|
|
|
- byte buf[] = { (DataTransferProtocol.OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
|
|
|
- (DataTransferProtocol.OP_STATUS_CHECKSUM_OK) & 0xff };
|
|
|
- out.write(buf);
|
|
|
+ CHECKSUM_OK.writeOutputStream(out);
|
|
|
out.flush();
|
|
|
} catch (IOException e) {
|
|
|
// its ok not to be able to send this.
|
|
@@ -2476,8 +2559,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
// processes response status from all datanodes.
|
|
|
for (int i = 0; i < targets.length && clientRunning; i++) {
|
|
|
- short reply = blockReplyStream.readShort();
|
|
|
- if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
+ final DataTransferProtocol.Status reply
|
|
|
+ = DataTransferProtocol.Status.read(blockReplyStream);
|
|
|
+ if (reply != SUCCESS) {
|
|
|
errorIndex = i; // first bad datanode
|
|
|
throw new IOException("Bad response " + reply +
|
|
|
" for block " + block +
|
|
@@ -2716,7 +2800,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
//
|
|
|
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
|
|
|
boolean recoveryFlag) {
|
|
|
- short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
+ DataTransferProtocol.Status pipelineStatus = SUCCESS;
|
|
|
String firstBadLink = "";
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
for (int i = 0; i < nodes.length; i++) {
|
|
@@ -2755,10 +2839,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
out.flush();
|
|
|
|
|
|
// receive ack for connect
|
|
|
- pipelineStatus = blockReplyStream.readShort();
|
|
|
+ pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
|
|
|
firstBadLink = Text.readString(blockReplyStream);
|
|
|
- if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
|
|
|
+ if (pipelineStatus != SUCCESS) {
|
|
|
+ if (pipelineStatus == ERROR_ACCESS_TOKEN) {
|
|
|
throw new InvalidAccessTokenException(
|
|
|
"Got access token error for connect ack with firstBadLink as "
|
|
|
+ firstBadLink);
|
|
@@ -2792,7 +2876,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
|
|
|
private LocatedBlock locateFollowingBlock(long start) throws IOException {
|
|
|
- int retries = 5;
|
|
|
+ int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
|
|
|
long sleeptime = 400;
|
|
|
while (true) {
|
|
|
long localstart = System.currentTimeMillis();
|
|
@@ -2808,26 +2892,32 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
if (ue != e) {
|
|
|
throw ue; // no need to retry these exceptions
|
|
|
}
|
|
|
-
|
|
|
- if (--retries == 0 &&
|
|
|
- !NotReplicatedYetException.class.getName().
|
|
|
+
|
|
|
+
|
|
|
+ if (NotReplicatedYetException.class.getName().
|
|
|
equals(e.getClassName())) {
|
|
|
- throw e;
|
|
|
- } else {
|
|
|
- LOG.info(StringUtils.stringifyException(e));
|
|
|
- if (System.currentTimeMillis() - localstart > 5000) {
|
|
|
- LOG.info("Waiting for replication for "
|
|
|
- + (System.currentTimeMillis() - localstart) / 1000
|
|
|
- + " seconds");
|
|
|
- }
|
|
|
- try {
|
|
|
- LOG.warn("NotReplicatedYetException sleeping " + src
|
|
|
- + " retries left " + retries);
|
|
|
- Thread.sleep(sleeptime);
|
|
|
- sleeptime *= 2;
|
|
|
- } catch (InterruptedException ie) {
|
|
|
+ if (retries == 0) {
|
|
|
+ throw e;
|
|
|
+ } else {
|
|
|
+ --retries;
|
|
|
+ LOG.info(StringUtils.stringifyException(e));
|
|
|
+ if (System.currentTimeMillis() - localstart > 5000) {
|
|
|
+ LOG.info("Waiting for replication for "
|
|
|
+ + (System.currentTimeMillis() - localstart) / 1000
|
|
|
+ + " seconds");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ LOG.warn("NotReplicatedYetException sleeping " + src
|
|
|
+ + " retries left " + retries);
|
|
|
+ Thread.sleep(sleeptime);
|
|
|
+ sleeptime *= 2;
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2919,7 +3009,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
private DFSOutputStream(String src, long blockSize, Progressable progress,
|
|
|
int bytesPerChecksum) throws IOException {
|
|
|
- super(new CRC32(), bytesPerChecksum, 4);
|
|
|
+ super(new PureJavaCrc32(), bytesPerChecksum, 4);
|
|
|
this.src = src;
|
|
|
this.blockSize = blockSize;
|
|
|
this.progress = progress;
|