|
@@ -19,11 +19,9 @@
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
|
|
|
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
|
|
|
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
|
|
|
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
|
|
|
|
|
|
-import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
@@ -33,37 +31,24 @@ import java.io.OutputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
-import java.nio.BufferOverflowException;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
-import java.util.AbstractMap;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
|
-import javax.security.auth.login.LoginException;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
-import org.apache.hadoop.fs.ChecksumException;
|
|
|
import org.apache.hadoop.fs.ContentSummary;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
-import org.apache.hadoop.fs.FSInputChecker;
|
|
|
-import org.apache.hadoop.fs.FSInputStream;
|
|
|
-import org.apache.hadoop.fs.FSOutputSummer;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -71,7 +56,6 @@ import org.apache.hadoop.fs.FsServerDefaults;
|
|
|
import org.apache.hadoop.fs.FsStatus;
|
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.Syncable;
|
|
|
import org.apache.hadoop.fs.Options;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
@@ -86,16 +70,11 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
|
|
|
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
|
|
|
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
|
|
|
import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
@@ -114,9 +93,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
-import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
-import org.apache.hadoop.util.PureJavaCrc32;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/********************************************************
|
|
@@ -134,8 +111,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
public static final Log LOG = LogFactory.getLog(DFSClient.class);
|
|
|
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
|
|
|
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
|
|
|
- private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
|
- private final ClientProtocol namenode;
|
|
|
+ static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
|
+ final ClientProtocol namenode;
|
|
|
private final ClientProtocol rpcNamenode;
|
|
|
final UserGroupInformation ugi;
|
|
|
volatile boolean clientRunning = true;
|
|
@@ -144,16 +121,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
Random r = new Random();
|
|
|
final String clientName;
|
|
|
final LeaseChecker leasechecker = new LeaseChecker();
|
|
|
- private Configuration conf;
|
|
|
- private long defaultBlockSize;
|
|
|
+ Configuration conf;
|
|
|
+ long defaultBlockSize;
|
|
|
private short defaultReplication;
|
|
|
- private SocketFactory socketFactory;
|
|
|
- private int socketTimeout;
|
|
|
- private int datanodeWriteTimeout;
|
|
|
+ SocketFactory socketFactory;
|
|
|
+ int socketTimeout;
|
|
|
final int writePacketSize;
|
|
|
- private final FileSystem.Statistics stats;
|
|
|
- private int maxBlockAcquireFailures;
|
|
|
- private final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
+ final FileSystem.Statistics stats;
|
|
|
+ final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
|
|
|
/**
|
|
|
* The locking hierarchy is to first acquire lock on DFSClient object, followed by
|
|
@@ -253,13 +228,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
this.stats = stats;
|
|
|
this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
|
|
HdfsConstants.READ_TIMEOUT);
|
|
|
- this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
|
|
|
- HdfsConstants.WRITE_TIMEOUT);
|
|
|
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
|
|
|
// dfs.write.packet.size is an internal config variable
|
|
|
this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
|
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
|
- this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
|
|
|
// The hdfsTimeout is currently the same as the ipc timeout
|
|
|
this.hdfsTimeout = Client.getTimeout(conf);
|
|
|
|
|
@@ -287,13 +259,41 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void checkOpen() throws IOException {
|
|
|
+ /**
|
|
|
+ * Return the number of times the client should go back to the namenode
|
|
|
+ * to retrieve block locations when reading.
|
|
|
+ */
|
|
|
+ int getMaxBlockAcquireFailures() {
|
|
|
+ return conf.getInt("dfs.client.max.block.acquire.failures",
|
|
|
+ MAX_BLOCK_ACQUIRE_FAILURES);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the timeout that clients should use when writing to datanodes.
|
|
|
+ * @param numNodes the number of nodes in the pipeline.
|
|
|
+ */
|
|
|
+ int getDatanodeWriteTimeout(int numNodes) {
|
|
|
+ int confTime =
|
|
|
+ conf.getInt("dfs.datanode.socket.write.timeout",
|
|
|
+ HdfsConstants.WRITE_TIMEOUT);
|
|
|
+
|
|
|
+ return (confTime > 0) ?
|
|
|
+ (confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ int getDatanodeReadTimeout(int numNodes) {
|
|
|
+ return socketTimeout > 0 ?
|
|
|
+ (HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes +
|
|
|
+ socketTimeout) : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ void checkOpen() throws IOException {
|
|
|
if (!clientRunning) {
|
|
|
IOException result = new IOException("Filesystem closed");
|
|
|
throw result;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Close the file system, abandoning all of the leases and files being
|
|
|
* created and close connections to the namenode.
|
|
@@ -330,11 +330,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static int getMaxBlockAcquireFailures(Configuration conf) {
|
|
|
- return conf.getInt("dfs.client.max.block.acquire.failures",
|
|
|
- MAX_BLOCK_ACQUIRE_FAILURES);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get server default values for a number of configuration params.
|
|
|
*/
|
|
@@ -377,7 +372,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
return defaultReplication;
|
|
|
}
|
|
|
|
|
|
- private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
|
|
|
+ static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
|
|
|
String src, long start, long length) throws IOException {
|
|
|
try {
|
|
|
return namenode.getBlockLocations(src, start, length);
|
|
@@ -458,7 +453,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
) throws IOException {
|
|
|
checkOpen();
|
|
|
// Get block info from namenode
|
|
|
- return new DFSInputStream(src, buffersize, verifyChecksum);
|
|
|
+ return new DFSInputStream(this, src, buffersize, verifyChecksum);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -603,7 +598,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
|
|
|
LOG.debug(src + ": masked=" + masked);
|
|
|
- OutputStream result = new DFSOutputStream(src, masked,
|
|
|
+ OutputStream result = new DFSOutputStream(this, src, masked,
|
|
|
flag, createParent, replication, blockSize, progress, buffersize,
|
|
|
conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
|
|
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
|
|
@@ -628,7 +623,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
int bytesPerChecksum)
|
|
|
throws IOException {
|
|
|
checkOpen();
|
|
|
- OutputStream result = new DFSOutputStream(src, absPermission,
|
|
|
+ OutputStream result = new DFSOutputStream(this, src, absPermission,
|
|
|
flag, createParent, replication, blockSize, progress, buffersize,
|
|
|
bytesPerChecksum);
|
|
|
leasechecker.put(src, result);
|
|
@@ -659,7 +654,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
NSQuotaExceededException.class,
|
|
|
DSQuotaExceededException.class);
|
|
|
}
|
|
|
- OutputStream result = new DFSOutputStream(src, buffersize, progress,
|
|
|
+ OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
|
|
|
lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
|
|
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
|
|
|
leasechecker.put(src, result);
|
|
@@ -1165,23 +1160,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Pick the best node from which to stream the data.
|
|
|
- * Entries in <i>nodes</i> are already in the priority order
|
|
|
- */
|
|
|
- private DatanodeInfo bestNode(DatanodeInfo nodes[],
|
|
|
- AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
|
|
|
- throws IOException {
|
|
|
- if (nodes != null) {
|
|
|
- for (int i = 0; i < nodes.length; i++) {
|
|
|
- if (!deadNodes.containsKey(nodes[i])) {
|
|
|
- return nodes[i];
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- throw new IOException("No live nodes contain current block");
|
|
|
- }
|
|
|
-
|
|
|
boolean isLeaseCheckerStarted() {
|
|
|
return leasechecker.daemon != null;
|
|
|
}
|
|
@@ -1324,2600 +1302,42 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Utility class to encapsulate data node info and its ip address. */
|
|
|
- private static class DNAddrPair {
|
|
|
- DatanodeInfo info;
|
|
|
- InetSocketAddress addr;
|
|
|
- DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
|
|
|
- this.info = info;
|
|
|
- this.addr = addr;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** This is a wrapper around connection to datadone
|
|
|
- * and understands checksum, offset etc
|
|
|
+ /**
|
|
|
+ * The Hdfs implementation of {@link FSDataInputStream}
|
|
|
*/
|
|
|
- public static class BlockReader extends FSInputChecker {
|
|
|
-
|
|
|
- Socket dnSock; //for now just sending checksumOk.
|
|
|
- private DataInputStream in;
|
|
|
- private DataChecksum checksum;
|
|
|
-
|
|
|
- /** offset in block of the last chunk received */
|
|
|
- private long lastChunkOffset = -1;
|
|
|
- private long lastChunkLen = -1;
|
|
|
- private long lastSeqNo = -1;
|
|
|
-
|
|
|
- /** offset in block where reader wants to actually read */
|
|
|
- private long startOffset;
|
|
|
-
|
|
|
- /** offset in block of of first chunk - may be less than startOffset
|
|
|
- if startOffset is not chunk-aligned */
|
|
|
- private final long firstChunkOffset;
|
|
|
-
|
|
|
- private int bytesPerChecksum;
|
|
|
- private int checksumSize;
|
|
|
-
|
|
|
- /**
|
|
|
- * The total number of bytes we need to transfer from the DN.
|
|
|
- * This is the amount that the user has requested plus some padding
|
|
|
- * at the beginning so that the read can begin on a chunk boundary.
|
|
|
- */
|
|
|
- private final long bytesNeededToFinish;
|
|
|
-
|
|
|
- private boolean gotEOS = false;
|
|
|
-
|
|
|
- byte[] skipBuf = null;
|
|
|
- ByteBuffer checksumBytes = null;
|
|
|
- int dataLeft = 0;
|
|
|
-
|
|
|
- /* FSInputChecker interface */
|
|
|
-
|
|
|
- /* same interface as inputStream java.io.InputStream#read()
|
|
|
- * used by DFSInputStream#read()
|
|
|
- * This violates one rule when there is a checksum error:
|
|
|
- * "Read should not modify user buffer before successful read"
|
|
|
- * because it first reads the data to user buffer and then checks
|
|
|
- * the checksum.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized int read(byte[] buf, int off, int len)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- // This has to be set here, *before* the skip, since we can
|
|
|
- // hit EOS during the skip, in the case that our entire read
|
|
|
- // is smaller than the checksum chunk.
|
|
|
- boolean eosBefore = gotEOS;
|
|
|
-
|
|
|
- //for the first read, skip the extra bytes at the front.
|
|
|
- if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
|
|
|
- // Skip these bytes. But don't call this.skip()!
|
|
|
- int toSkip = (int)(startOffset - firstChunkOffset);
|
|
|
- if ( skipBuf == null ) {
|
|
|
- skipBuf = new byte[bytesPerChecksum];
|
|
|
- }
|
|
|
- if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
|
|
|
- // should never happen
|
|
|
- throw new IOException("Could not skip required number of bytes");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- int nRead = super.read(buf, off, len);
|
|
|
-
|
|
|
- // if gotEOS was set in the previous read and checksum is enabled :
|
|
|
- if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
|
|
|
- //checksum is verified and there are no errors.
|
|
|
- checksumOk(dnSock);
|
|
|
- }
|
|
|
- return nRead;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized long skip(long n) throws IOException {
|
|
|
- /* How can we make sure we don't throw a ChecksumException, at least
|
|
|
- * in majority of the cases?. This one throws. */
|
|
|
- if ( skipBuf == null ) {
|
|
|
- skipBuf = new byte[bytesPerChecksum];
|
|
|
- }
|
|
|
-
|
|
|
- long nSkipped = 0;
|
|
|
- while ( nSkipped < n ) {
|
|
|
- int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
|
|
|
- int ret = read(skipBuf, 0, toSkip);
|
|
|
- if ( ret <= 0 ) {
|
|
|
- return nSkipped;
|
|
|
- }
|
|
|
- nSkipped += ret;
|
|
|
- }
|
|
|
- return nSkipped;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public int read() throws IOException {
|
|
|
- throw new IOException("read() is not expected to be invoked. " +
|
|
|
- "Use read(buf, off, len) instead.");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
- /* Checksum errors are handled outside the BlockReader.
|
|
|
- * DFSInputStream does not always call 'seekToNewSource'. In the
|
|
|
- * case of pread(), it just tries a different replica without seeking.
|
|
|
- */
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void seek(long pos) throws IOException {
|
|
|
- throw new IOException("Seek() is not supported in BlockInputChecker");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected long getChunkPosition(long pos) {
|
|
|
- throw new RuntimeException("getChunkPosition() is not supported, " +
|
|
|
- "since seek is not required");
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Makes sure that checksumBytes has enough capacity
|
|
|
- * and limit is set to the number of checksum bytes needed
|
|
|
- * to be read.
|
|
|
- */
|
|
|
- private void adjustChecksumBytes(int dataLen) {
|
|
|
- int requiredSize =
|
|
|
- ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
|
|
|
- if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
|
|
|
- checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
|
|
|
- } else {
|
|
|
- checksumBytes.clear();
|
|
|
- }
|
|
|
- checksumBytes.limit(requiredSize);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected synchronized int readChunk(long pos, byte[] buf, int offset,
|
|
|
- int len, byte[] checksumBuf)
|
|
|
- throws IOException {
|
|
|
- // Read one chunk.
|
|
|
- if ( gotEOS ) {
|
|
|
- // Already hit EOF
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- // Read one DATA_CHUNK.
|
|
|
- long chunkOffset = lastChunkOffset;
|
|
|
- if ( lastChunkLen > 0 ) {
|
|
|
- chunkOffset += lastChunkLen;
|
|
|
- }
|
|
|
-
|
|
|
- // pos is relative to the start of the first chunk of the read.
|
|
|
- // chunkOffset is relative to the start of the block.
|
|
|
- // This makes sure that the read passed from FSInputChecker is the
|
|
|
- // for the same chunk we expect to be reading from the DN.
|
|
|
- if ( (pos + firstChunkOffset) != chunkOffset ) {
|
|
|
- throw new IOException("Mismatch in pos : " + pos + " + " +
|
|
|
- firstChunkOffset + " != " + chunkOffset);
|
|
|
- }
|
|
|
-
|
|
|
- // Read next packet if the previous packet has been read completely.
|
|
|
- if (dataLeft <= 0) {
|
|
|
- //Read packet headers.
|
|
|
- int packetLen = in.readInt();
|
|
|
- long offsetInBlock = in.readLong();
|
|
|
- long seqno = in.readLong();
|
|
|
- boolean lastPacketInBlock = in.readBoolean();
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DFSClient readChunk got seqno " + seqno +
|
|
|
- " offsetInBlock " + offsetInBlock +
|
|
|
- " lastPacketInBlock " + lastPacketInBlock +
|
|
|
- " packetLen " + packetLen);
|
|
|
- }
|
|
|
-
|
|
|
- int dataLen = in.readInt();
|
|
|
-
|
|
|
- // Sanity check the lengths
|
|
|
- if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
|
|
|
- ( dataLen != 0 && lastPacketInBlock) ||
|
|
|
- (seqno != (lastSeqNo + 1)) ) {
|
|
|
- throw new IOException("BlockReader: error in packet header" +
|
|
|
- "(chunkOffset : " + chunkOffset +
|
|
|
- ", dataLen : " + dataLen +
|
|
|
- ", seqno : " + seqno +
|
|
|
- " (last: " + lastSeqNo + "))");
|
|
|
- }
|
|
|
-
|
|
|
- lastSeqNo = seqno;
|
|
|
- dataLeft = dataLen;
|
|
|
- adjustChecksumBytes(dataLen);
|
|
|
- if (dataLen > 0) {
|
|
|
- IOUtils.readFully(in, checksumBytes.array(), 0,
|
|
|
- checksumBytes.limit());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Sanity checks
|
|
|
- assert len >= bytesPerChecksum;
|
|
|
- assert checksum != null;
|
|
|
- assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
|
|
|
-
|
|
|
-
|
|
|
- int checksumsToRead, bytesToRead;
|
|
|
-
|
|
|
- if (checksumSize > 0) {
|
|
|
-
|
|
|
- // How many chunks left in our stream - this is a ceiling
|
|
|
- // since we may have a partial chunk at the end of the file
|
|
|
- int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
|
|
|
-
|
|
|
- // How many chunks we can fit in databuffer
|
|
|
- // - note this is a floor since we always read full chunks
|
|
|
- int chunksCanFit = Math.min(len / bytesPerChecksum,
|
|
|
- checksumBuf.length / checksumSize);
|
|
|
-
|
|
|
- // How many chunks should we read
|
|
|
- checksumsToRead = Math.min(chunksLeft, chunksCanFit);
|
|
|
- // How many bytes should we actually read
|
|
|
- bytesToRead = Math.min(
|
|
|
- checksumsToRead * bytesPerChecksum, // full chunks
|
|
|
- dataLeft); // in case we have a partial
|
|
|
- } else {
|
|
|
- // no checksum
|
|
|
- bytesToRead = Math.min(dataLeft, len);
|
|
|
- checksumsToRead = 0;
|
|
|
- }
|
|
|
-
|
|
|
- if ( bytesToRead > 0 ) {
|
|
|
- // Assert we have enough space
|
|
|
- assert bytesToRead <= len;
|
|
|
- assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
|
|
|
- assert checksumBuf.length >= checksumSize * checksumsToRead;
|
|
|
- IOUtils.readFully(in, buf, offset, bytesToRead);
|
|
|
- checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
|
|
|
- }
|
|
|
-
|
|
|
- dataLeft -= bytesToRead;
|
|
|
- assert dataLeft >= 0;
|
|
|
-
|
|
|
- lastChunkOffset = chunkOffset;
|
|
|
- lastChunkLen = bytesToRead;
|
|
|
-
|
|
|
- // If there's no data left in the current packet after satisfying
|
|
|
- // this read, and we have satisfied the client read, we expect
|
|
|
- // an empty packet header from the DN to signify this.
|
|
|
- // Note that pos + bytesToRead may in fact be greater since the
|
|
|
- // DN finishes off the entire last chunk.
|
|
|
- if (dataLeft == 0 &&
|
|
|
- pos + bytesToRead >= bytesNeededToFinish) {
|
|
|
-
|
|
|
- // Read header
|
|
|
- int packetLen = in.readInt();
|
|
|
- long offsetInBlock = in.readLong();
|
|
|
- long seqno = in.readLong();
|
|
|
- boolean lastPacketInBlock = in.readBoolean();
|
|
|
- int dataLen = in.readInt();
|
|
|
-
|
|
|
- if (!lastPacketInBlock ||
|
|
|
- dataLen != 0) {
|
|
|
- throw new IOException("Expected empty end-of-read packet! Header: " +
|
|
|
- "(packetLen : " + packetLen +
|
|
|
- ", offsetInBlock : " + offsetInBlock +
|
|
|
- ", seqno : " + seqno +
|
|
|
- ", lastInBlock : " + lastPacketInBlock +
|
|
|
- ", dataLen : " + dataLen);
|
|
|
- }
|
|
|
-
|
|
|
- gotEOS = true;
|
|
|
- }
|
|
|
-
|
|
|
- if ( bytesToRead == 0 ) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- return bytesToRead;
|
|
|
- }
|
|
|
-
|
|
|
- private BlockReader( String file, long blockId, DataInputStream in,
|
|
|
- DataChecksum checksum, boolean verifyChecksum,
|
|
|
- long startOffset, long firstChunkOffset,
|
|
|
- long bytesToRead,
|
|
|
- Socket dnSock ) {
|
|
|
- super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
|
|
|
- 1, verifyChecksum,
|
|
|
- checksum.getChecksumSize() > 0? checksum : null,
|
|
|
- checksum.getBytesPerChecksum(),
|
|
|
- checksum.getChecksumSize());
|
|
|
-
|
|
|
- this.dnSock = dnSock;
|
|
|
- this.in = in;
|
|
|
- this.checksum = checksum;
|
|
|
- this.startOffset = Math.max( startOffset, 0 );
|
|
|
-
|
|
|
- // The total number of bytes that we need to transfer from the DN is
|
|
|
- // the amount that the user wants (bytesToRead), plus the padding at
|
|
|
- // the beginning in order to chunk-align. Note that the DN may elect
|
|
|
- // to send more than this amount if the read ends mid-chunk.
|
|
|
- this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
|
|
|
-
|
|
|
- this.firstChunkOffset = firstChunkOffset;
|
|
|
- lastChunkOffset = firstChunkOffset;
|
|
|
- lastChunkLen = -1;
|
|
|
-
|
|
|
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
|
|
- checksumSize = this.checksum.getChecksumSize();
|
|
|
- }
|
|
|
-
|
|
|
- public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken,
|
|
|
- long genStamp, long startOffset, long len, int bufferSize) throws IOException {
|
|
|
- return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
|
|
|
- true);
|
|
|
- }
|
|
|
-
|
|
|
- /** Java Doc required */
|
|
|
- public static BlockReader newBlockReader( Socket sock, String file, long blockId,
|
|
|
- BlockAccessToken accessToken,
|
|
|
- long genStamp,
|
|
|
- long startOffset, long len,
|
|
|
- int bufferSize, boolean verifyChecksum)
|
|
|
- throws IOException {
|
|
|
- return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
|
|
|
- len, bufferSize, verifyChecksum, "");
|
|
|
+ public static class DFSDataInputStream extends FSDataInputStream {
|
|
|
+ public DFSDataInputStream(DFSInputStream in)
|
|
|
+ throws IOException {
|
|
|
+ super(in);
|
|
|
}
|
|
|
-
|
|
|
- public static BlockReader newBlockReader( Socket sock, String file,
|
|
|
- long blockId,
|
|
|
- BlockAccessToken accessToken,
|
|
|
- long genStamp,
|
|
|
- long startOffset, long len,
|
|
|
- int bufferSize, boolean verifyChecksum,
|
|
|
- String clientName)
|
|
|
- throws IOException {
|
|
|
- // in and out will be closed when sock is closed (by the caller)
|
|
|
- DataTransferProtocol.Sender.opReadBlock(
|
|
|
- new DataOutputStream(new BufferedOutputStream(
|
|
|
- NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
|
|
|
- blockId, genStamp, startOffset, len, clientName, accessToken);
|
|
|
-
|
|
|
- //
|
|
|
- // Get bytes in block, set streams
|
|
|
- //
|
|
|
-
|
|
|
- DataInputStream in = new DataInputStream(
|
|
|
- new BufferedInputStream(NetUtils.getInputStream(sock),
|
|
|
- bufferSize));
|
|
|
-
|
|
|
- DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
|
|
|
- if (status != SUCCESS) {
|
|
|
- if (status == ERROR_ACCESS_TOKEN) {
|
|
|
- throw new InvalidAccessTokenException(
|
|
|
- "Got access token error for OP_READ_BLOCK, self="
|
|
|
- + sock.getLocalSocketAddress() + ", remote="
|
|
|
- + sock.getRemoteSocketAddress() + ", for file " + file
|
|
|
- + ", for block " + blockId + "_" + genStamp);
|
|
|
- } else {
|
|
|
- throw new IOException("Got error for OP_READ_BLOCK, self="
|
|
|
- + sock.getLocalSocketAddress() + ", remote="
|
|
|
- + sock.getRemoteSocketAddress() + ", for file " + file
|
|
|
- + ", for block " + blockId + "_" + genStamp);
|
|
|
- }
|
|
|
- }
|
|
|
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
|
|
|
- //Warning when we get CHECKSUM_NULL?
|
|
|
|
|
|
- // Read the first chunk offset.
|
|
|
- long firstChunkOffset = in.readLong();
|
|
|
-
|
|
|
- if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
|
|
|
- firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
|
|
|
- throw new IOException("BlockReader: error in first chunk offset (" +
|
|
|
- firstChunkOffset + ") startOffset is " +
|
|
|
- startOffset + " for file " + file);
|
|
|
- }
|
|
|
-
|
|
|
- return new BlockReader( file, blockId, in, checksum, verifyChecksum,
|
|
|
- startOffset, firstChunkOffset, len,
|
|
|
- sock );
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- startOffset = -1;
|
|
|
- checksum = null;
|
|
|
- // in will be closed when its Socket is closed.
|
|
|
- }
|
|
|
-
|
|
|
- /** kind of like readFully(). Only reads as much as possible.
|
|
|
- * And allows use of protected readFully().
|
|
|
- */
|
|
|
- public int readAll(byte[] buf, int offset, int len) throws IOException {
|
|
|
- return readFully(this, buf, offset, len);
|
|
|
- }
|
|
|
-
|
|
|
- /* When the reader reaches end of a block and there are no checksum
|
|
|
- * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
|
|
|
- * checksum was verified and there was no error.
|
|
|
- */
|
|
|
- void checksumOk(Socket sock) {
|
|
|
- try {
|
|
|
- OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
|
|
|
- CHECKSUM_OK.writeOutputStream(out);
|
|
|
- out.flush();
|
|
|
- } catch (IOException e) {
|
|
|
- // its ok not to be able to send this.
|
|
|
- LOG.debug("Could not write to datanode " + sock.getInetAddress() +
|
|
|
- ": " + e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /****************************************************************
|
|
|
- * DFSInputStream provides bytes from a named file. It handles
|
|
|
- * negotiation of the namenode and various datanodes as necessary.
|
|
|
- ****************************************************************/
|
|
|
- class DFSInputStream extends FSInputStream {
|
|
|
- private Socket s = null;
|
|
|
- private boolean closed = false;
|
|
|
-
|
|
|
- private String src;
|
|
|
- private long prefetchSize = 10 * defaultBlockSize;
|
|
|
- private BlockReader blockReader = null;
|
|
|
- private boolean verifyChecksum;
|
|
|
- private LocatedBlocks locatedBlocks = null;
|
|
|
- private long lastBlockBeingWrittenLength = 0;
|
|
|
- private DatanodeInfo currentNode = null;
|
|
|
- private Block currentBlock = null;
|
|
|
- private long pos = 0;
|
|
|
- private long blockEnd = -1;
|
|
|
-
|
|
|
- /**
|
|
|
- * This variable tracks the number of failures since the start of the
|
|
|
- * most recent user-facing operation. That is to say, it should be reset
|
|
|
- * whenever the user makes a call on this stream, and if at any point
|
|
|
- * during the retry logic, the failure count exceeds a threshold,
|
|
|
- * the errors will be thrown back to the operation.
|
|
|
- *
|
|
|
- * Specifically this counts the number of times the client has gone
|
|
|
- * back to the namenode to get a new list of block locations, and is
|
|
|
- * capped at maxBlockAcquireFailures
|
|
|
- */
|
|
|
- private int failures = 0;
|
|
|
- private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
|
|
|
-
|
|
|
- /* XXX Use of CocurrentHashMap is temp fix. Need to fix
|
|
|
- * parallel accesses to DFSInputStream (through ptreads) properly */
|
|
|
- private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
|
|
|
- new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
|
|
|
- private int buffersize = 1;
|
|
|
-
|
|
|
- private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
|
|
|
-
|
|
|
- void addToDeadNodes(DatanodeInfo dnInfo) {
|
|
|
- deadNodes.put(dnInfo, dnInfo);
|
|
|
- }
|
|
|
-
|
|
|
- DFSInputStream(String src, int buffersize, boolean verifyChecksum
|
|
|
- ) throws IOException {
|
|
|
- this.verifyChecksum = verifyChecksum;
|
|
|
- this.buffersize = buffersize;
|
|
|
- this.src = src;
|
|
|
- prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
|
|
|
- timeWindow = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
|
|
|
- openInfo();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Grab the open-file info from namenode
|
|
|
- */
|
|
|
- synchronized void openInfo() throws IOException {
|
|
|
- LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("newInfo = " + newInfo);
|
|
|
- }
|
|
|
- if (newInfo == null) {
|
|
|
- throw new IOException("Cannot open filename " + src);
|
|
|
- }
|
|
|
-
|
|
|
- if (locatedBlocks != null) {
|
|
|
- Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
|
|
|
- Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
|
|
|
- while (oldIter.hasNext() && newIter.hasNext()) {
|
|
|
- if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
|
|
- throw new IOException("Blocklist for " + src + " has changed!");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- this.locatedBlocks = newInfo;
|
|
|
- this.lastBlockBeingWrittenLength = 0;
|
|
|
- if (!locatedBlocks.isLastBlockComplete()) {
|
|
|
- final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
|
|
|
- if (last != null) {
|
|
|
- final long len = readBlockLength(last);
|
|
|
- last.getBlock().setNumBytes(len);
|
|
|
- this.lastBlockBeingWrittenLength = len;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- this.currentNode = null;
|
|
|
- }
|
|
|
-
|
|
|
- /** Read the block length from one of the datanodes. */
|
|
|
- private long readBlockLength(LocatedBlock locatedblock) throws IOException {
|
|
|
- if (locatedblock == null || locatedblock.getLocations().length == 0) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- for(DatanodeInfo datanode : locatedblock.getLocations()) {
|
|
|
- try {
|
|
|
- final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
|
|
|
- datanode, conf);
|
|
|
- final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
|
|
- if (n >= 0) {
|
|
|
- return n;
|
|
|
- }
|
|
|
- }
|
|
|
- catch(IOException ioe) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Faild to getReplicaVisibleLength from datanode "
|
|
|
- + datanode + " for block " + locatedblock.getBlock(), ioe);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- throw new IOException("Cannot obtain block length for " + locatedblock);
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized long getFileLength() {
|
|
|
- return locatedBlocks == null? 0:
|
|
|
- locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Returns the datanode from which the stream is currently reading.
|
|
|
*/
|
|
|
public DatanodeInfo getCurrentDatanode() {
|
|
|
- return currentNode;
|
|
|
+ return ((DFSInputStream)in).getCurrentDatanode();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Returns the block containing the target position.
|
|
|
*/
|
|
|
public Block getCurrentBlock() {
|
|
|
- return currentBlock;
|
|
|
+ return ((DFSInputStream)in).getCurrentBlock();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Return collection of blocks that has already been located.
|
|
|
*/
|
|
|
synchronized List<LocatedBlock> getAllBlocks() throws IOException {
|
|
|
- return getBlockRange(0, this.getFileLength());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get block at the specified position.
|
|
|
- * Fetch it from the namenode if not cached.
|
|
|
- *
|
|
|
- * @param offset
|
|
|
- * @param updatePosition whether to update current position
|
|
|
- * @return located block
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private synchronized LocatedBlock getBlockAt(long offset,
|
|
|
- boolean updatePosition) throws IOException {
|
|
|
- assert (locatedBlocks != null) : "locatedBlocks is null";
|
|
|
-
|
|
|
- final LocatedBlock blk;
|
|
|
-
|
|
|
- //check offset
|
|
|
- if (offset < 0 || offset >= getFileLength()) {
|
|
|
- throw new IOException("offset < 0 || offset > getFileLength(), offset="
|
|
|
- + offset
|
|
|
- + ", updatePosition=" + updatePosition
|
|
|
- + ", locatedBlocks=" + locatedBlocks);
|
|
|
- }
|
|
|
- else if (offset >= locatedBlocks.getFileLength()) {
|
|
|
- // offset to the portion of the last block,
|
|
|
- // which is not known to the name-node yet;
|
|
|
- // getting the last block
|
|
|
- blk = locatedBlocks.getLastLocatedBlock();
|
|
|
- }
|
|
|
- else {
|
|
|
- // search cached blocks first
|
|
|
- int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
|
- if (targetBlockIdx < 0) { // block is not cached
|
|
|
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
|
|
- // fetch more blocks
|
|
|
- LocatedBlocks newBlocks;
|
|
|
- newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
|
|
|
- assert (newBlocks != null) : "Could not find target position " + offset;
|
|
|
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
|
- }
|
|
|
- blk = locatedBlocks.get(targetBlockIdx);
|
|
|
- }
|
|
|
-
|
|
|
- // update current position
|
|
|
- if (updatePosition) {
|
|
|
- this.pos = offset;
|
|
|
- this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
|
|
|
- this.currentBlock = blk.getBlock();
|
|
|
- }
|
|
|
- return blk;
|
|
|
- }
|
|
|
-
|
|
|
- /** Fetch a block from namenode and cache it */
|
|
|
- private synchronized void fetchBlockAt(long offset) throws IOException {
|
|
|
- int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
|
- if (targetBlockIdx < 0) { // block is not cached
|
|
|
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
|
|
- }
|
|
|
- // fetch blocks
|
|
|
- LocatedBlocks newBlocks;
|
|
|
- newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
|
|
|
- if (newBlocks == null) {
|
|
|
- throw new IOException("Could not find target position " + offset);
|
|
|
- }
|
|
|
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get blocks in the specified range.
|
|
|
- * Fetch them from the namenode if not cached.
|
|
|
- *
|
|
|
- * @param offset
|
|
|
- * @param length
|
|
|
- * @return consequent segment of located blocks
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private synchronized List<LocatedBlock> getBlockRange(long offset,
|
|
|
- long length)
|
|
|
- throws IOException {
|
|
|
- final List<LocatedBlock> blocks;
|
|
|
- if (locatedBlocks.isLastBlockComplete()) {
|
|
|
- blocks = getFinalizedBlockRange(offset, length);
|
|
|
- }
|
|
|
- else {
|
|
|
- if (length + offset > locatedBlocks.getFileLength()) {
|
|
|
- length = locatedBlocks.getFileLength() - offset;
|
|
|
- }
|
|
|
- blocks = getFinalizedBlockRange(offset, length);
|
|
|
- blocks.add(locatedBlocks.getLastLocatedBlock());
|
|
|
- }
|
|
|
- return blocks;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get blocks in the specified range.
|
|
|
- * Includes only the complete blocks.
|
|
|
- * Fetch them from the namenode if not cached.
|
|
|
- */
|
|
|
- private synchronized List<LocatedBlock> getFinalizedBlockRange(
|
|
|
- long offset, long length) throws IOException {
|
|
|
- assert (locatedBlocks != null) : "locatedBlocks is null";
|
|
|
- List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
|
|
|
- // search cached blocks first
|
|
|
- int blockIdx = locatedBlocks.findBlock(offset);
|
|
|
- if (blockIdx < 0) { // block is not cached
|
|
|
- blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
|
|
|
- }
|
|
|
- long remaining = length;
|
|
|
- long curOff = offset;
|
|
|
- while(remaining > 0) {
|
|
|
- LocatedBlock blk = null;
|
|
|
- if(blockIdx < locatedBlocks.locatedBlockCount())
|
|
|
- blk = locatedBlocks.get(blockIdx);
|
|
|
- if (blk == null || curOff < blk.getStartOffset()) {
|
|
|
- LocatedBlocks newBlocks;
|
|
|
- newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
|
|
|
- locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
|
|
|
- continue;
|
|
|
- }
|
|
|
- assert curOff >= blk.getStartOffset() : "Block not found";
|
|
|
- blockRange.add(blk);
|
|
|
- long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
|
|
|
- remaining -= bytesRead;
|
|
|
- curOff += bytesRead;
|
|
|
- blockIdx++;
|
|
|
- }
|
|
|
- return blockRange;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Open a DataInputStream to a DataNode so that it can be read from.
|
|
|
- * We get block ID and the IDs of the destinations at startup, from the namenode.
|
|
|
- */
|
|
|
- private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
|
|
|
- if (target >= getFileLength()) {
|
|
|
- throw new IOException("Attempted to read past end of file");
|
|
|
- }
|
|
|
-
|
|
|
- if ( blockReader != null ) {
|
|
|
- blockReader.close();
|
|
|
- blockReader = null;
|
|
|
- }
|
|
|
-
|
|
|
- if (s != null) {
|
|
|
- s.close();
|
|
|
- s = null;
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Connect to best DataNode for desired Block, with potential offset
|
|
|
- //
|
|
|
- DatanodeInfo chosenNode = null;
|
|
|
- int refetchToken = 1; // only need to get a new access token once
|
|
|
-
|
|
|
- while (true) {
|
|
|
- //
|
|
|
- // Compute desired block
|
|
|
- //
|
|
|
- LocatedBlock targetBlock = getBlockAt(target, true);
|
|
|
- assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
|
|
|
- long offsetIntoBlock = target - targetBlock.getStartOffset();
|
|
|
-
|
|
|
- DNAddrPair retval = chooseDataNode(targetBlock);
|
|
|
- chosenNode = retval.info;
|
|
|
- InetSocketAddress targetAddr = retval.addr;
|
|
|
-
|
|
|
- try {
|
|
|
- s = socketFactory.createSocket();
|
|
|
- NetUtils.connect(s, targetAddr, socketTimeout);
|
|
|
- s.setSoTimeout(socketTimeout);
|
|
|
- Block blk = targetBlock.getBlock();
|
|
|
- BlockAccessToken accessToken = targetBlock.getAccessToken();
|
|
|
-
|
|
|
- blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
|
|
|
- accessToken,
|
|
|
- blk.getGenerationStamp(),
|
|
|
- offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
|
|
|
- buffersize, verifyChecksum, clientName);
|
|
|
- return chosenNode;
|
|
|
- } catch (IOException ex) {
|
|
|
- if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
|
|
|
- LOG.info("Will fetch a new access token and retry, "
|
|
|
- + "access token was invalid when connecting to " + targetAddr
|
|
|
- + " : " + ex);
|
|
|
- /*
|
|
|
- * Get a new access token and retry. Retry is needed in 2 cases. 1)
|
|
|
- * When both NN and DN re-started while DFSClient holding a cached
|
|
|
- * access token. 2) In the case that NN fails to update its
|
|
|
- * access key at pre-set interval (by a wide margin) and
|
|
|
- * subsequently restarts. In this case, DN re-registers itself with
|
|
|
- * NN and receives a new access key, but DN will delete the old
|
|
|
- * access key from its memory since it's considered expired based on
|
|
|
- * the estimated expiration date.
|
|
|
- */
|
|
|
- refetchToken--;
|
|
|
- fetchBlockAt(target);
|
|
|
- } else {
|
|
|
- LOG.info("Failed to connect to " + targetAddr
|
|
|
- + ", add to deadNodes and continue", ex);
|
|
|
- // Put chosen node into dead list, continue
|
|
|
- addToDeadNodes(chosenNode);
|
|
|
- }
|
|
|
- if (s != null) {
|
|
|
- try {
|
|
|
- s.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- }
|
|
|
- }
|
|
|
- s = null;
|
|
|
- }
|
|
|
- }
|
|
|
+ return ((DFSInputStream)in).getAllBlocks();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Close it down!
|
|
|
+ * @return The visible length of the file.
|
|
|
*/
|
|
|
- @Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- if (closed) {
|
|
|
- return;
|
|
|
- }
|
|
|
- checkOpen();
|
|
|
-
|
|
|
- if ( blockReader != null ) {
|
|
|
- blockReader.close();
|
|
|
- blockReader = null;
|
|
|
- }
|
|
|
-
|
|
|
- if (s != null) {
|
|
|
- s.close();
|
|
|
- s = null;
|
|
|
- }
|
|
|
- super.close();
|
|
|
- closed = true;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized int read() throws IOException {
|
|
|
- int ret = read( oneByteBuf, 0, 1 );
|
|
|
- return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
|
|
- }
|
|
|
-
|
|
|
- /* This is a used by regular read() and handles ChecksumExceptions.
|
|
|
- * name readBuffer() is chosen to imply similarity to readBuffer() in
|
|
|
- * ChecksuFileSystem
|
|
|
- */
|
|
|
- private synchronized int readBuffer(byte buf[], int off, int len)
|
|
|
- throws IOException {
|
|
|
- IOException ioe;
|
|
|
-
|
|
|
- /* we retry current node only once. So this is set to true only here.
|
|
|
- * Intention is to handle one common case of an error that is not a
|
|
|
- * failure on datanode or client : when DataNode closes the connection
|
|
|
- * since client is idle. If there are other cases of "non-errors" then
|
|
|
- * then a datanode might be retried by setting this to true again.
|
|
|
- */
|
|
|
- boolean retryCurrentNode = true;
|
|
|
-
|
|
|
- while (true) {
|
|
|
- // retry as many times as seekToNewSource allows.
|
|
|
- try {
|
|
|
- return blockReader.read(buf, off, len);
|
|
|
- } catch ( ChecksumException ce ) {
|
|
|
- LOG.warn("Found Checksum error for " + currentBlock + " from " +
|
|
|
- currentNode.getName() + " at " + ce.getPos());
|
|
|
- reportChecksumFailure(src, currentBlock, currentNode);
|
|
|
- ioe = ce;
|
|
|
- retryCurrentNode = false;
|
|
|
- } catch ( IOException e ) {
|
|
|
- if (!retryCurrentNode) {
|
|
|
- LOG.warn("Exception while reading from " + currentBlock +
|
|
|
- " of " + src + " from " + currentNode + ": " +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- ioe = e;
|
|
|
- }
|
|
|
- boolean sourceFound = false;
|
|
|
- if (retryCurrentNode) {
|
|
|
- /* possibly retry the same node so that transient errors don't
|
|
|
- * result in application level failures (e.g. Datanode could have
|
|
|
- * closed the connection because the client is idle for too long).
|
|
|
- */
|
|
|
- sourceFound = seekToBlockSource(pos);
|
|
|
- } else {
|
|
|
- addToDeadNodes(currentNode);
|
|
|
- sourceFound = seekToNewSource(pos);
|
|
|
- }
|
|
|
- if (!sourceFound) {
|
|
|
- throw ioe;
|
|
|
- }
|
|
|
- retryCurrentNode = false;
|
|
|
- }
|
|
|
+ public long getVisibleLength() throws IOException {
|
|
|
+ return ((DFSInputStream)in).getFileLength();
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Read the entire buffer.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized int read(byte buf[], int off, int len) throws IOException {
|
|
|
- checkOpen();
|
|
|
- if (closed) {
|
|
|
- throw new IOException("Stream closed");
|
|
|
- }
|
|
|
- failures = 0;
|
|
|
- if (pos < getFileLength()) {
|
|
|
- int retries = 2;
|
|
|
- while (retries > 0) {
|
|
|
- try {
|
|
|
- if (pos > blockEnd) {
|
|
|
- currentNode = blockSeekTo(pos);
|
|
|
- }
|
|
|
- int realLen = Math.min(len, (int) (blockEnd - pos + 1));
|
|
|
- int result = readBuffer(buf, off, realLen);
|
|
|
-
|
|
|
- if (result >= 0) {
|
|
|
- pos += result;
|
|
|
- } else {
|
|
|
- // got a EOS from reader though we expect more data on it.
|
|
|
- throw new IOException("Unexpected EOS from the reader");
|
|
|
- }
|
|
|
- if (stats != null && result != -1) {
|
|
|
- stats.incrementBytesRead(result);
|
|
|
- }
|
|
|
- return result;
|
|
|
- } catch (ChecksumException ce) {
|
|
|
- throw ce;
|
|
|
- } catch (IOException e) {
|
|
|
- if (retries == 1) {
|
|
|
- LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- blockEnd = -1;
|
|
|
- if (currentNode != null) { addToDeadNodes(currentNode); }
|
|
|
- if (--retries == 0) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private DNAddrPair chooseDataNode(LocatedBlock block)
|
|
|
- throws IOException {
|
|
|
- while (true) {
|
|
|
- DatanodeInfo[] nodes = block.getLocations();
|
|
|
- try {
|
|
|
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
|
|
|
- InetSocketAddress targetAddr =
|
|
|
- NetUtils.createSocketAddr(chosenNode.getName());
|
|
|
- return new DNAddrPair(chosenNode, targetAddr);
|
|
|
- } catch (IOException ie) {
|
|
|
- String blockInfo = block.getBlock() + " file=" + src;
|
|
|
- if (failures >= maxBlockAcquireFailures) {
|
|
|
- throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
|
|
|
- block.getStartOffset());
|
|
|
- }
|
|
|
-
|
|
|
- if (nodes == null || nodes.length == 0) {
|
|
|
- LOG.info("No node available for block: " + blockInfo);
|
|
|
- }
|
|
|
- LOG.info("Could not obtain block " + block.getBlock()
|
|
|
- + " from any node: " + ie
|
|
|
- + ". Will get new block locations from namenode and retry...");
|
|
|
- try {
|
|
|
- // Introducing a random factor to the wait time before another retry.
|
|
|
- // The wait time is dependent on # of failures and a random factor.
|
|
|
- // At the first time of getting a BlockMissingException, the wait time
|
|
|
- // is a random number between 0..3000 ms. If the first retry
|
|
|
- // still fails, we will wait 3000 ms grace period before the 2nd retry.
|
|
|
- // Also at the second retry, the waiting window is expanded to 6000 ms
|
|
|
- // alleviating the request rate from the server. Similarly the 3rd retry
|
|
|
- // will wait 6000ms grace period before retry and the waiting window is
|
|
|
- // expanded to 9000ms.
|
|
|
- double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
|
|
- timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for each failure
|
|
|
- LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
|
|
|
- Thread.sleep((long)waitTime);
|
|
|
- } catch (InterruptedException iex) {
|
|
|
- }
|
|
|
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
|
|
|
- openInfo();
|
|
|
- block = getBlockAt(block.getStartOffset(), false);
|
|
|
- failures++;
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void fetchBlockByteRange(LocatedBlock block, long start,
|
|
|
- long end, byte[] buf, int offset) throws IOException {
|
|
|
- //
|
|
|
- // Connect to best DataNode for desired Block, with potential offset
|
|
|
- //
|
|
|
- Socket dn = null;
|
|
|
- int refetchToken = 1; // only need to get a new access token once
|
|
|
-
|
|
|
- while (true) {
|
|
|
- // cached block locations may have been updated by chooseDataNode()
|
|
|
- // or fetchBlockAt(). Always get the latest list of locations at the
|
|
|
- // start of the loop.
|
|
|
- block = getBlockAt(block.getStartOffset(), false);
|
|
|
- DNAddrPair retval = chooseDataNode(block);
|
|
|
- DatanodeInfo chosenNode = retval.info;
|
|
|
- InetSocketAddress targetAddr = retval.addr;
|
|
|
- BlockReader reader = null;
|
|
|
-
|
|
|
- try {
|
|
|
- dn = socketFactory.createSocket();
|
|
|
- NetUtils.connect(dn, targetAddr, socketTimeout);
|
|
|
- dn.setSoTimeout(socketTimeout);
|
|
|
- BlockAccessToken accessToken = block.getAccessToken();
|
|
|
-
|
|
|
- int len = (int) (end - start + 1);
|
|
|
-
|
|
|
- reader = BlockReader.newBlockReader(dn, src,
|
|
|
- block.getBlock().getBlockId(),
|
|
|
- accessToken,
|
|
|
- block.getBlock().getGenerationStamp(),
|
|
|
- start, len, buffersize,
|
|
|
- verifyChecksum, clientName);
|
|
|
- int nread = reader.readAll(buf, offset, len);
|
|
|
- if (nread != len) {
|
|
|
- throw new IOException("truncated return from reader.read(): " +
|
|
|
- "excpected " + len + ", got " + nread);
|
|
|
- }
|
|
|
- return;
|
|
|
- } catch (ChecksumException e) {
|
|
|
- LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
|
|
|
- src + " at " + block.getBlock() + ":" +
|
|
|
- e.getPos() + " from " + chosenNode.getName());
|
|
|
- reportChecksumFailure(src, block.getBlock(), chosenNode);
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
|
|
|
- LOG.info("Will get a new access token and retry, "
|
|
|
- + "access token was invalid when connecting to " + targetAddr
|
|
|
- + " : " + e);
|
|
|
- refetchToken--;
|
|
|
- fetchBlockAt(block.getStartOffset());
|
|
|
- continue;
|
|
|
- } else {
|
|
|
- LOG.warn("Failed to connect to " + targetAddr + " for file " + src
|
|
|
- + " for block " + block.getBlock() + ":"
|
|
|
- + StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- } finally {
|
|
|
- IOUtils.closeStream(reader);
|
|
|
- IOUtils.closeSocket(dn);
|
|
|
- }
|
|
|
- // Put chosen node into dead list, continue
|
|
|
- addToDeadNodes(chosenNode);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read bytes starting from the specified position.
|
|
|
- *
|
|
|
- * @param position start read from this position
|
|
|
- * @param buffer read buffer
|
|
|
- * @param offset offset into buffer
|
|
|
- * @param length number of bytes to read
|
|
|
- *
|
|
|
- * @return actual number of bytes read
|
|
|
- */
|
|
|
- @Override
|
|
|
- public int read(long position, byte[] buffer, int offset, int length)
|
|
|
- throws IOException {
|
|
|
- // sanity checks
|
|
|
- checkOpen();
|
|
|
- if (closed) {
|
|
|
- throw new IOException("Stream closed");
|
|
|
- }
|
|
|
- failures = 0;
|
|
|
- long filelen = getFileLength();
|
|
|
- if ((position < 0) || (position >= filelen)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- int realLen = length;
|
|
|
- if ((position + length) > filelen) {
|
|
|
- realLen = (int)(filelen - position);
|
|
|
- }
|
|
|
-
|
|
|
- // determine the block and byte range within the block
|
|
|
- // corresponding to position and realLen
|
|
|
- List<LocatedBlock> blockRange = getBlockRange(position, realLen);
|
|
|
- int remaining = realLen;
|
|
|
- for (LocatedBlock blk : blockRange) {
|
|
|
- long targetStart = position - blk.getStartOffset();
|
|
|
- long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
|
|
|
- fetchBlockByteRange(blk, targetStart,
|
|
|
- targetStart + bytesToRead - 1, buffer, offset);
|
|
|
- remaining -= bytesToRead;
|
|
|
- position += bytesToRead;
|
|
|
- offset += bytesToRead;
|
|
|
- }
|
|
|
- assert remaining == 0 : "Wrong number of bytes read.";
|
|
|
- if (stats != null) {
|
|
|
- stats.incrementBytesRead(realLen);
|
|
|
- }
|
|
|
- return realLen;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long skip(long n) throws IOException {
|
|
|
- if ( n > 0 ) {
|
|
|
- long curPos = getPos();
|
|
|
- long fileLen = getFileLength();
|
|
|
- if( n+curPos > fileLen ) {
|
|
|
- n = fileLen - curPos;
|
|
|
- }
|
|
|
- seek(curPos+n);
|
|
|
- return n;
|
|
|
- }
|
|
|
- return n < 0 ? -1 : 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Seek to a new arbitrary location
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void seek(long targetPos) throws IOException {
|
|
|
- if (targetPos > getFileLength()) {
|
|
|
- throw new IOException("Cannot seek after EOF");
|
|
|
- }
|
|
|
- boolean done = false;
|
|
|
- if (pos <= targetPos && targetPos <= blockEnd) {
|
|
|
- //
|
|
|
- // If this seek is to a positive position in the current
|
|
|
- // block, and this piece of data might already be lying in
|
|
|
- // the TCP buffer, then just eat up the intervening data.
|
|
|
- //
|
|
|
- int diff = (int)(targetPos - pos);
|
|
|
- if (diff <= TCP_WINDOW_SIZE) {
|
|
|
- try {
|
|
|
- pos += blockReader.skip(diff);
|
|
|
- if (pos == targetPos) {
|
|
|
- done = true;
|
|
|
- }
|
|
|
- } catch (IOException e) {//make following read to retry
|
|
|
- LOG.debug("Exception while seek to " + targetPos + " from "
|
|
|
- + currentBlock +" of " + src + " from " + currentNode +
|
|
|
- ": " + StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (!done) {
|
|
|
- pos = targetPos;
|
|
|
- blockEnd = -1;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Same as {@link #seekToNewSource(long)} except that it does not exclude
|
|
|
- * the current datanode and might connect to the same node.
|
|
|
- */
|
|
|
- private synchronized boolean seekToBlockSource(long targetPos)
|
|
|
- throws IOException {
|
|
|
- currentNode = blockSeekTo(targetPos);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Seek to given position on a node other than the current node. If
|
|
|
- * a node other than the current node is found, then returns true.
|
|
|
- * If another node could not be found, then returns false.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
- boolean markedDead = deadNodes.containsKey(currentNode);
|
|
|
- addToDeadNodes(currentNode);
|
|
|
- DatanodeInfo oldNode = currentNode;
|
|
|
- DatanodeInfo newNode = blockSeekTo(targetPos);
|
|
|
- if (!markedDead) {
|
|
|
- /* remove it from deadNodes. blockSeekTo could have cleared
|
|
|
- * deadNodes and added currentNode again. Thats ok. */
|
|
|
- deadNodes.remove(oldNode);
|
|
|
- }
|
|
|
- if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
|
|
|
- currentNode = newNode;
|
|
|
- return true;
|
|
|
- } else {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized long getPos() throws IOException {
|
|
|
- return pos;
|
|
|
- }
|
|
|
-
|
|
|
- /** Return the size of the remaining available bytes
|
|
|
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
|
|
|
- * otherwise, return {@link Integer#MAX_VALUE}.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized int available() throws IOException {
|
|
|
- if (closed) {
|
|
|
- throw new IOException("Stream closed");
|
|
|
- }
|
|
|
-
|
|
|
- final long remaining = getFileLength() - pos;
|
|
|
- return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * We definitely don't support marks
|
|
|
- */
|
|
|
- @Override
|
|
|
- public boolean markSupported() {
|
|
|
- return false;
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void mark(int readLimit) {
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void reset() throws IOException {
|
|
|
- throw new IOException("Mark/reset not supported");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The Hdfs implementation of {@link FSDataInputStream}
|
|
|
- */
|
|
|
- public static class DFSDataInputStream extends FSDataInputStream {
|
|
|
- public DFSDataInputStream(DFSInputStream in)
|
|
|
- throws IOException {
|
|
|
- super(in);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the datanode from which the stream is currently reading.
|
|
|
- */
|
|
|
- public DatanodeInfo getCurrentDatanode() {
|
|
|
- return ((DFSInputStream)in).getCurrentDatanode();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the block containing the target position.
|
|
|
- */
|
|
|
- public Block getCurrentBlock() {
|
|
|
- return ((DFSInputStream)in).getCurrentBlock();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return collection of blocks that has already been located.
|
|
|
- */
|
|
|
- synchronized List<LocatedBlock> getAllBlocks() throws IOException {
|
|
|
- return ((DFSInputStream)in).getAllBlocks();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return The visible length of the file.
|
|
|
- */
|
|
|
- public long getVisibleLength() throws IOException {
|
|
|
- return ((DFSInputStream)in).getFileLength();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /****************************************************************
|
|
|
- * DFSOutputStream creates files from a stream of bytes.
|
|
|
- *
|
|
|
- * The client application writes data that is cached internally by
|
|
|
- * this stream. Data is broken up into packets, each packet is
|
|
|
- * typically 64K in size. A packet comprises of chunks. Each chunk
|
|
|
- * is typically 512 bytes and has an associated checksum with it.
|
|
|
- *
|
|
|
- * When a client application fills up the currentPacket, it is
|
|
|
- * enqueued into dataQueue. The DataStreamer thread picks up
|
|
|
- * packets from the dataQueue, sends it to the first datanode in
|
|
|
- * the pipeline and moves it from the dataQueue to the ackQueue.
|
|
|
- * The ResponseProcessor receives acks from the datanodes. When an
|
|
|
- * successful ack for a packet is received from all datanodes, the
|
|
|
- * ResponseProcessor removes the corresponding packet from the
|
|
|
- * ackQueue.
|
|
|
- *
|
|
|
- * In case of error, all outstanding packets and moved from
|
|
|
- * ackQueue. A new pipeline is setup by eliminating the bad
|
|
|
- * datanode from the original pipeline. The DataStreamer now
|
|
|
- * starts sending packets from the dataQueue.
|
|
|
- ****************************************************************/
|
|
|
- class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
- private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
|
|
- private Socket s;
|
|
|
- // closed is accessed by different threads under different locks.
|
|
|
- private volatile boolean closed = false;
|
|
|
-
|
|
|
- private String src;
|
|
|
- private final long blockSize;
|
|
|
- private final DataChecksum checksum;
|
|
|
- // both dataQueue and ackQueue are protected by dataQueue lock
|
|
|
- private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
|
|
|
- private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
|
|
|
- private Packet currentPacket = null;
|
|
|
- private DataStreamer streamer;
|
|
|
- private long currentSeqno = 0;
|
|
|
- private long bytesCurBlock = 0; // bytes writen in current block
|
|
|
- private int packetSize = 0; // write packet size, including the header.
|
|
|
- private int chunksPerPacket = 0;
|
|
|
- private volatile IOException lastException = null;
|
|
|
- private long artificialSlowdown = 0;
|
|
|
- private long lastFlushOffset = -1; // offset when flush was invoked
|
|
|
- //persist blocks on namenode
|
|
|
- private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
|
|
|
- private volatile boolean appendChunk = false; // appending to existing partial block
|
|
|
- private long initialFileSize = 0; // at time of file open
|
|
|
- private Progressable progress;
|
|
|
-
|
|
|
- private class Packet {
|
|
|
- ByteBuffer buffer; // only one of buf and buffer is non-null
|
|
|
- byte[] buf;
|
|
|
- long seqno; // sequencenumber of buffer in block
|
|
|
- long offsetInBlock; // offset in block
|
|
|
- boolean lastPacketInBlock; // is this the last packet in block?
|
|
|
- int numChunks; // number of chunks currently in packet
|
|
|
- int maxChunks; // max chunks in packet
|
|
|
- int dataStart;
|
|
|
- int dataPos;
|
|
|
- int checksumStart;
|
|
|
- int checksumPos;
|
|
|
- private static final long HEART_BEAT_SEQNO = -1L;
|
|
|
-
|
|
|
- /**
|
|
|
- * create a heartbeat packet
|
|
|
- */
|
|
|
- Packet() {
|
|
|
- this.lastPacketInBlock = false;
|
|
|
- this.numChunks = 0;
|
|
|
- this.offsetInBlock = 0;
|
|
|
- this.seqno = HEART_BEAT_SEQNO;
|
|
|
-
|
|
|
- buffer = null;
|
|
|
- int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
|
|
|
- buf = new byte[packetSize];
|
|
|
-
|
|
|
- checksumStart = dataStart = packetSize;
|
|
|
- checksumPos = checksumStart;
|
|
|
- dataPos = dataStart;
|
|
|
- maxChunks = 0;
|
|
|
- }
|
|
|
-
|
|
|
- // create a new packet
|
|
|
- Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
|
|
|
- this.lastPacketInBlock = false;
|
|
|
- this.numChunks = 0;
|
|
|
- this.offsetInBlock = offsetInBlock;
|
|
|
- this.seqno = currentSeqno;
|
|
|
- currentSeqno++;
|
|
|
-
|
|
|
- buffer = null;
|
|
|
- buf = new byte[pktSize];
|
|
|
-
|
|
|
- checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
|
|
|
- checksumPos = checksumStart;
|
|
|
- dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
|
|
|
- dataPos = dataStart;
|
|
|
- maxChunks = chunksPerPkt;
|
|
|
- }
|
|
|
-
|
|
|
- void writeData(byte[] inarray, int off, int len) {
|
|
|
- if ( dataPos + len > buf.length) {
|
|
|
- throw new BufferOverflowException();
|
|
|
- }
|
|
|
- System.arraycopy(inarray, off, buf, dataPos, len);
|
|
|
- dataPos += len;
|
|
|
- }
|
|
|
-
|
|
|
- void writeChecksum(byte[] inarray, int off, int len) {
|
|
|
- if (checksumPos + len > dataStart) {
|
|
|
- throw new BufferOverflowException();
|
|
|
- }
|
|
|
- System.arraycopy(inarray, off, buf, checksumPos, len);
|
|
|
- checksumPos += len;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns ByteBuffer that contains one full packet, including header.
|
|
|
- */
|
|
|
- ByteBuffer getBuffer() {
|
|
|
- /* Once this is called, no more data can be added to the packet.
|
|
|
- * setting 'buf' to null ensures that.
|
|
|
- * This is called only when the packet is ready to be sent.
|
|
|
- */
|
|
|
- if (buffer != null) {
|
|
|
- return buffer;
|
|
|
- }
|
|
|
-
|
|
|
- //prepare the header and close any gap between checksum and data.
|
|
|
-
|
|
|
- int dataLen = dataPos - dataStart;
|
|
|
- int checksumLen = checksumPos - checksumStart;
|
|
|
-
|
|
|
- if (checksumPos != dataStart) {
|
|
|
- /* move the checksum to cover the gap.
|
|
|
- * This can happen for the last packet.
|
|
|
- */
|
|
|
- System.arraycopy(buf, checksumStart, buf,
|
|
|
- dataStart - checksumLen , checksumLen);
|
|
|
- }
|
|
|
-
|
|
|
- int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;
|
|
|
-
|
|
|
- //normally dataStart == checksumPos, i.e., offset is zero.
|
|
|
- buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
|
|
|
- DataNode.PKT_HEADER_LEN + pktLen);
|
|
|
- buf = null;
|
|
|
- buffer.mark();
|
|
|
-
|
|
|
- /* write the header and data length.
|
|
|
- * The format is described in comment before DataNode.BlockSender
|
|
|
- */
|
|
|
- buffer.putInt(pktLen); // pktSize
|
|
|
- buffer.putLong(offsetInBlock);
|
|
|
- buffer.putLong(seqno);
|
|
|
- buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
|
|
|
- //end of pkt header
|
|
|
- buffer.putInt(dataLen); // actual data length, excluding checksum.
|
|
|
-
|
|
|
- buffer.reset();
|
|
|
- return buffer;
|
|
|
- }
|
|
|
-
|
|
|
- // get the packet's last byte's offset in the block
|
|
|
- long getLastByteOffsetBlock() {
|
|
|
- return offsetInBlock + dataPos - dataStart;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Check if this packet is a heart beat packet
|
|
|
- * @return true if the sequence number is HEART_BEAT_SEQNO
|
|
|
- */
|
|
|
- private boolean isHeartbeatPacket() {
|
|
|
- return seqno == HEART_BEAT_SEQNO;
|
|
|
- }
|
|
|
-
|
|
|
- public String toString() {
|
|
|
- return "packet seqno:" + this.seqno +
|
|
|
- " offsetInBlock:" + this.offsetInBlock +
|
|
|
- " lastPacketInBlock:" + this.lastPacketInBlock +
|
|
|
- " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // The DataStreamer class is responsible for sending data packets to the
|
|
|
- // datanodes in the pipeline. It retrieves a new blockid and block locations
|
|
|
- // from the namenode, and starts streaming packets to the pipeline of
|
|
|
- // Datanodes. Every packet has a sequence number associated with
|
|
|
- // it. When all the packets for a block are sent out and acks for each
|
|
|
- // if them are received, the DataStreamer closes the current block.
|
|
|
- //
|
|
|
- class DataStreamer extends Daemon {
|
|
|
- private volatile boolean streamerClosed = false;
|
|
|
- private Block block; // its length is number of bytes acked
|
|
|
- private BlockAccessToken accessToken;
|
|
|
- private DataOutputStream blockStream;
|
|
|
- private DataInputStream blockReplyStream;
|
|
|
- private ResponseProcessor response = null;
|
|
|
- private volatile DatanodeInfo[] nodes = null; // list of targets for current block
|
|
|
- private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
|
|
|
- volatile boolean hasError = false;
|
|
|
- volatile int errorIndex = -1;
|
|
|
- private BlockConstructionStage stage; // block construction stage
|
|
|
- private long bytesSent = 0; // number of bytes that've been sent
|
|
|
-
|
|
|
- /**
|
|
|
- * Default construction for file create
|
|
|
- */
|
|
|
- private DataStreamer() {
|
|
|
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Construct a data streamer for append
|
|
|
- * @param lastBlock last block of the file to be appended
|
|
|
- * @param stat status of the file to be appended
|
|
|
- * @param bytesPerChecksum number of bytes per checksum
|
|
|
- * @throws IOException if error occurs
|
|
|
- */
|
|
|
- private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
|
|
|
- int bytesPerChecksum) throws IOException {
|
|
|
- stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
|
|
- block = lastBlock.getBlock();
|
|
|
- bytesSent = block.getNumBytes();
|
|
|
- accessToken = lastBlock.getAccessToken();
|
|
|
- long usedInLastBlock = stat.getLen() % blockSize;
|
|
|
- int freeInLastBlock = (int)(blockSize - usedInLastBlock);
|
|
|
-
|
|
|
- // calculate the amount of free space in the pre-existing
|
|
|
- // last crc chunk
|
|
|
- int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
|
|
|
- int freeInCksum = bytesPerChecksum - usedInCksum;
|
|
|
-
|
|
|
- // if there is space in the last block, then we have to
|
|
|
- // append to that block
|
|
|
- if (freeInLastBlock == blockSize) {
|
|
|
- throw new IOException("The last block for file " +
|
|
|
- src + " is full.");
|
|
|
- }
|
|
|
-
|
|
|
- if (usedInCksum > 0 && freeInCksum > 0) {
|
|
|
- // if there is space in the last partial chunk, then
|
|
|
- // setup in such a way that the next packet will have only
|
|
|
- // one chunk that fills up the partial chunk.
|
|
|
- //
|
|
|
- computePacketChunkSize(0, freeInCksum);
|
|
|
- resetChecksumChunk(freeInCksum);
|
|
|
- appendChunk = true;
|
|
|
- } else {
|
|
|
- // if the remaining space in the block is smaller than
|
|
|
- // that expected size of of a packet, then create
|
|
|
- // smaller size packet.
|
|
|
- //
|
|
|
- computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock),
|
|
|
- bytesPerChecksum);
|
|
|
- }
|
|
|
-
|
|
|
- // setup pipeline to append to the last block XXX retries??
|
|
|
- nodes = lastBlock.getLocations();
|
|
|
- errorIndex = -1; // no errors yet.
|
|
|
- if (nodes.length < 1) {
|
|
|
- throw new IOException("Unable to retrieve blocks locations " +
|
|
|
- " for last block " + block +
|
|
|
- "of file " + src);
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Initialize for data streaming
|
|
|
- */
|
|
|
- private void initDataStreaming() {
|
|
|
- this.setName("DataStreamer for file " + src +
|
|
|
- " block " + block);
|
|
|
- response = new ResponseProcessor(nodes);
|
|
|
- response.start();
|
|
|
- stage = BlockConstructionStage.DATA_STREAMING;
|
|
|
- }
|
|
|
-
|
|
|
- private void endBlock() {
|
|
|
- LOG.debug("Closing old block " + block);
|
|
|
- this.setName("DataStreamer for file " + src);
|
|
|
- closeResponder();
|
|
|
- closeStream();
|
|
|
- nodes = null;
|
|
|
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * streamer thread is the only thread that opens streams to datanode,
|
|
|
- * and closes them. Any error recovery is also done by this thread.
|
|
|
- */
|
|
|
- public void run() {
|
|
|
- long lastPacket = System.currentTimeMillis();
|
|
|
- while (!streamerClosed && clientRunning) {
|
|
|
-
|
|
|
- // if the Responder encountered an error, shutdown Responder
|
|
|
- if (hasError && response != null) {
|
|
|
- try {
|
|
|
- response.close();
|
|
|
- response.join();
|
|
|
- response = null;
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Packet one = null;
|
|
|
-
|
|
|
- try {
|
|
|
- // process datanode IO errors if any
|
|
|
- boolean doSleep = false;
|
|
|
- if (hasError && errorIndex>=0) {
|
|
|
- doSleep = processDatanodeError();
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (dataQueue) {
|
|
|
- // wait for a packet to be sent.
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- while ((!streamerClosed && !hasError && clientRunning
|
|
|
- && dataQueue.size() == 0 &&
|
|
|
- (stage != BlockConstructionStage.DATA_STREAMING ||
|
|
|
- stage == BlockConstructionStage.DATA_STREAMING &&
|
|
|
- now - lastPacket < socketTimeout/2)) || doSleep ) {
|
|
|
- long timeout = socketTimeout/2 - (now-lastPacket);
|
|
|
- timeout = timeout <= 0 ? 1000 : timeout;
|
|
|
- timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
|
|
|
- timeout : 1000;
|
|
|
- try {
|
|
|
- dataQueue.wait(timeout);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- doSleep = false;
|
|
|
- now = System.currentTimeMillis();
|
|
|
- }
|
|
|
- if (streamerClosed || hasError || !clientRunning) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // get packet to be sent.
|
|
|
- if (dataQueue.isEmpty()) {
|
|
|
- one = new Packet(); // heartbeat packet
|
|
|
- } else {
|
|
|
- one = dataQueue.getFirst(); // regular data packet
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // get new block from namenode.
|
|
|
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
|
|
- LOG.debug("Allocating new block");
|
|
|
- nodes = nextBlockOutputStream(src);
|
|
|
- initDataStreaming();
|
|
|
- } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
|
|
- LOG.debug("Append to block " + block);
|
|
|
- setupPipelineForAppendOrRecovery();
|
|
|
- initDataStreaming();
|
|
|
- }
|
|
|
-
|
|
|
- long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
|
|
|
- if (lastByteOffsetInBlock > blockSize) {
|
|
|
- throw new IOException("BlockSize " + blockSize +
|
|
|
- " is smaller than data size. " +
|
|
|
- " Offset of packet in block " +
|
|
|
- lastByteOffsetInBlock +
|
|
|
- " Aborting file " + src);
|
|
|
- }
|
|
|
-
|
|
|
- if (one.lastPacketInBlock) {
|
|
|
- // wait for all data packets have been successfully acked
|
|
|
- synchronized (dataQueue) {
|
|
|
- while (!streamerClosed && !hasError &&
|
|
|
- ackQueue.size() != 0 && clientRunning) {
|
|
|
- try {
|
|
|
- // wait for acks to arrive from datanodes
|
|
|
- dataQueue.wait(1000);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (streamerClosed || hasError || !clientRunning) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- stage = BlockConstructionStage.PIPELINE_CLOSE;
|
|
|
- }
|
|
|
-
|
|
|
- // send the packet
|
|
|
- ByteBuffer buf = one.getBuffer();
|
|
|
-
|
|
|
- synchronized (dataQueue) {
|
|
|
- // move packet from dataQueue to ackQueue
|
|
|
- if (!one.isHeartbeatPacket()) {
|
|
|
- dataQueue.removeFirst();
|
|
|
- ackQueue.addLast(one);
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DataStreamer block " + block +
|
|
|
- " sending packet " + one);
|
|
|
- }
|
|
|
-
|
|
|
- // write out data to remote datanode
|
|
|
- blockStream.write(buf.array(), buf.position(), buf.remaining());
|
|
|
- blockStream.flush();
|
|
|
- lastPacket = System.currentTimeMillis();
|
|
|
-
|
|
|
- if (one.isHeartbeatPacket()) { //heartbeat packet
|
|
|
- }
|
|
|
-
|
|
|
- // update bytesSent
|
|
|
- long tmpBytesSent = one.getLastByteOffsetBlock();
|
|
|
- if (bytesSent < tmpBytesSent) {
|
|
|
- bytesSent = tmpBytesSent;
|
|
|
- }
|
|
|
-
|
|
|
- if (streamerClosed || hasError || !clientRunning) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // Is this block full?
|
|
|
- if (one.lastPacketInBlock) {
|
|
|
- // wait for the close packet has been acked
|
|
|
- synchronized (dataQueue) {
|
|
|
- while (!streamerClosed && !hasError &&
|
|
|
- ackQueue.size() != 0 && clientRunning) {
|
|
|
- dataQueue.wait(1000);// wait for acks to arrive from datanodes
|
|
|
- }
|
|
|
- }
|
|
|
- if (streamerClosed || hasError || !clientRunning) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- endBlock();
|
|
|
- }
|
|
|
- if (progress != null) { progress.progress(); }
|
|
|
-
|
|
|
- // This is used by unit test to trigger race conditions.
|
|
|
- if (artificialSlowdown != 0 && clientRunning) {
|
|
|
- Thread.sleep(artificialSlowdown);
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- LOG.warn("DataStreamer Exception: " +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- if (e instanceof IOException) {
|
|
|
- setLastException((IOException)e);
|
|
|
- }
|
|
|
- hasError = true;
|
|
|
- if (errorIndex == -1) { // not a datanode error
|
|
|
- streamerClosed = true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- closeInternal();
|
|
|
- }
|
|
|
-
|
|
|
- private void closeInternal() {
|
|
|
- closeResponder(); // close and join
|
|
|
- closeStream();
|
|
|
- streamerClosed = true;
|
|
|
- closed = true;
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * close both streamer and DFSOutputStream, should be called only
|
|
|
- * by an external thread and only after all data to be sent has
|
|
|
- * been flushed to datanode.
|
|
|
- *
|
|
|
- * Interrupt this data streamer if force is true
|
|
|
- *
|
|
|
- * @param force if this data stream is forced to be closed
|
|
|
- */
|
|
|
- void close(boolean force) {
|
|
|
- streamerClosed = true;
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- if (force) {
|
|
|
- this.interrupt();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void closeResponder() {
|
|
|
- if (response != null) {
|
|
|
- try {
|
|
|
- response.close();
|
|
|
- response.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- } finally {
|
|
|
- response = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void closeStream() {
|
|
|
- if (blockStream != null) {
|
|
|
- try {
|
|
|
- blockStream.close();
|
|
|
- } catch (IOException e) {
|
|
|
- } finally {
|
|
|
- blockStream = null;
|
|
|
- }
|
|
|
- }
|
|
|
- if (blockReplyStream != null) {
|
|
|
- try {
|
|
|
- blockReplyStream.close();
|
|
|
- } catch (IOException e) {
|
|
|
- } finally {
|
|
|
- blockReplyStream = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Processes reponses from the datanodes. A packet is removed
|
|
|
- // from the ackQueue when its response arrives.
|
|
|
- //
|
|
|
- private class ResponseProcessor extends Daemon {
|
|
|
-
|
|
|
- private volatile boolean responderClosed = false;
|
|
|
- private DatanodeInfo[] targets = null;
|
|
|
- private boolean isLastPacketInBlock = false;
|
|
|
-
|
|
|
- ResponseProcessor (DatanodeInfo[] targets) {
|
|
|
- this.targets = targets;
|
|
|
- }
|
|
|
-
|
|
|
- public void run() {
|
|
|
-
|
|
|
- this.setName("ResponseProcessor for block " + block);
|
|
|
- PipelineAck ack = new PipelineAck();
|
|
|
-
|
|
|
- while (!responderClosed && clientRunning && !isLastPacketInBlock) {
|
|
|
- // process responses from datanodes.
|
|
|
- try {
|
|
|
- // read an ack from the pipeline
|
|
|
- ack.readFields(blockReplyStream);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DFSClient " + ack);
|
|
|
- }
|
|
|
-
|
|
|
- long seqno = ack.getSeqno();
|
|
|
- // processes response status from datanodes.
|
|
|
- for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
|
|
|
- final DataTransferProtocol.Status reply = ack.getReply(i);
|
|
|
- if (reply != SUCCESS) {
|
|
|
- errorIndex = i; // first bad datanode
|
|
|
- throw new IOException("Bad response " + reply +
|
|
|
- " for block " + block +
|
|
|
- " from datanode " +
|
|
|
- targets[i].getName());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- assert seqno != PipelineAck.UNKOWN_SEQNO :
|
|
|
- "Ack for unkown seqno should be a failed ack: " + ack;
|
|
|
- if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // a success ack for a data packet
|
|
|
- Packet one = null;
|
|
|
- synchronized (dataQueue) {
|
|
|
- one = ackQueue.getFirst();
|
|
|
- }
|
|
|
- if (one.seqno != seqno) {
|
|
|
- throw new IOException("Responseprocessor: Expecting seqno " +
|
|
|
- " for block " + block +
|
|
|
- one.seqno + " but received " + seqno);
|
|
|
- }
|
|
|
- isLastPacketInBlock = one.lastPacketInBlock;
|
|
|
- // update bytesAcked
|
|
|
- block.setNumBytes(one.getLastByteOffsetBlock());
|
|
|
-
|
|
|
- synchronized (dataQueue) {
|
|
|
- ackQueue.removeFirst();
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- if (!responderClosed) {
|
|
|
- if (e instanceof IOException) {
|
|
|
- setLastException((IOException)e);
|
|
|
- }
|
|
|
- hasError = true;
|
|
|
- errorIndex = errorIndex==-1 ? 0 : errorIndex;
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- LOG.warn("DFSOutputStream ResponseProcessor exception " +
|
|
|
- " for block " + block +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- responderClosed = true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void close() {
|
|
|
- responderClosed = true;
|
|
|
- this.interrupt();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // If this stream has encountered any errors so far, shutdown
|
|
|
- // threads and mark stream as closed. Returns true if we should
|
|
|
- // sleep for a while after returning from this call.
|
|
|
- //
|
|
|
- private boolean processDatanodeError() throws IOException {
|
|
|
- if (response != null) {
|
|
|
- LOG.info("Error Recovery for block " + block +
|
|
|
- " waiting for responder to exit. ");
|
|
|
- return true;
|
|
|
- }
|
|
|
- closeStream();
|
|
|
-
|
|
|
- // move packets from ack queue to front of the data queue
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.addAll(0, ackQueue);
|
|
|
- ackQueue.clear();
|
|
|
- }
|
|
|
-
|
|
|
- boolean doSleep = setupPipelineForAppendOrRecovery();
|
|
|
-
|
|
|
- if (!streamerClosed && clientRunning) {
|
|
|
- if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.remove(); // remove the end of block packet
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- endBlock();
|
|
|
- } else {
|
|
|
- initDataStreaming();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return doSleep;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Open a DataOutputStream to a DataNode pipeline so that
|
|
|
- * it can be written to.
|
|
|
- * This happens when a file is appended or data streaming fails
|
|
|
- * It keeps on trying until a pipeline is setup
|
|
|
- */
|
|
|
- private boolean setupPipelineForAppendOrRecovery() throws IOException {
|
|
|
- // check number of datanodes
|
|
|
- if (nodes == null || nodes.length == 0) {
|
|
|
- String msg = "Could not get block locations. " + "Source file \""
|
|
|
- + src + "\" - Aborting...";
|
|
|
- LOG.warn(msg);
|
|
|
- setLastException(new IOException(msg));
|
|
|
- streamerClosed = true;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- boolean success = false;
|
|
|
- long newGS = 0L;
|
|
|
- while (!success && !streamerClosed && clientRunning) {
|
|
|
- boolean isRecovery = hasError;
|
|
|
- // remove bad datanode from list of datanodes.
|
|
|
- // If errorIndex was not set (i.e. appends), then do not remove
|
|
|
- // any datanodes
|
|
|
- //
|
|
|
- if (errorIndex >= 0) {
|
|
|
- StringBuilder pipelineMsg = new StringBuilder();
|
|
|
- for (int j = 0; j < nodes.length; j++) {
|
|
|
- pipelineMsg.append(nodes[j].getName());
|
|
|
- if (j < nodes.length - 1) {
|
|
|
- pipelineMsg.append(", ");
|
|
|
- }
|
|
|
- }
|
|
|
- if (nodes.length <= 1) {
|
|
|
- lastException = new IOException("All datanodes " + pipelineMsg
|
|
|
- + " are bad. Aborting...");
|
|
|
- streamerClosed = true;
|
|
|
- return false;
|
|
|
- }
|
|
|
- LOG.warn("Error Recovery for block " + block +
|
|
|
- " in pipeline " + pipelineMsg +
|
|
|
- ": bad datanode " + nodes[errorIndex].getName());
|
|
|
- DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
|
|
- System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
|
|
|
- System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
|
|
|
- newnodes.length-errorIndex);
|
|
|
- nodes = newnodes;
|
|
|
- this.hasError = false;
|
|
|
- lastException = null;
|
|
|
- errorIndex = -1;
|
|
|
- }
|
|
|
-
|
|
|
- // get a new generation stamp and an access token
|
|
|
- LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
|
|
|
- newGS = lb.getBlock().getGenerationStamp();
|
|
|
- accessToken = lb.getAccessToken();
|
|
|
-
|
|
|
- // set up the pipeline again with the remaining nodes
|
|
|
- success = createBlockOutputStream(nodes, newGS, isRecovery);
|
|
|
- }
|
|
|
-
|
|
|
- if (success) {
|
|
|
- // update pipeline at the namenode
|
|
|
- Block newBlock = new Block(
|
|
|
- block.getBlockId(), block.getNumBytes(), newGS);
|
|
|
- namenode.updatePipeline(clientName, block, newBlock, nodes);
|
|
|
- // update client side generation stamp
|
|
|
- block = newBlock;
|
|
|
- }
|
|
|
- return false; // do not sleep, continue processing
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Open a DataOutputStream to a DataNode so that it can be written to.
|
|
|
- * This happens when a file is created and each time a new block is allocated.
|
|
|
- * Must get block ID and the IDs of the destinations from the namenode.
|
|
|
- * Returns the list of target datanodes.
|
|
|
- */
|
|
|
- private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
|
|
|
- LocatedBlock lb = null;
|
|
|
- boolean retry = false;
|
|
|
- DatanodeInfo[] nodes = null;
|
|
|
- int count = conf.getInt("dfs.client.block.write.retries", 3);
|
|
|
- boolean success = false;
|
|
|
- do {
|
|
|
- hasError = false;
|
|
|
- lastException = null;
|
|
|
- errorIndex = -1;
|
|
|
- retry = false;
|
|
|
- success = false;
|
|
|
-
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
- DatanodeInfo[] w = excludedNodes.toArray(
|
|
|
- new DatanodeInfo[excludedNodes.size()]);
|
|
|
- lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
|
|
|
- block = lb.getBlock();
|
|
|
- block.setNumBytes(0);
|
|
|
- accessToken = lb.getAccessToken();
|
|
|
- nodes = lb.getLocations();
|
|
|
-
|
|
|
- //
|
|
|
- // Connect to first DataNode in the list.
|
|
|
- //
|
|
|
- success = createBlockOutputStream(nodes, 0L, false);
|
|
|
-
|
|
|
- if (!success) {
|
|
|
- LOG.info("Abandoning block " + block);
|
|
|
- namenode.abandonBlock(block, src, clientName);
|
|
|
- block = null;
|
|
|
-
|
|
|
- LOG.debug("Excluding datanode " + nodes[errorIndex]);
|
|
|
- excludedNodes.add(nodes[errorIndex]);
|
|
|
-
|
|
|
- // Connection failed. Let's wait a little bit and retry
|
|
|
- retry = true;
|
|
|
- try {
|
|
|
- if (System.currentTimeMillis() - startTime > 5000) {
|
|
|
- LOG.info("Waiting to find target node: " + nodes[0].getName());
|
|
|
- }
|
|
|
- //TODO fix this timout. Extract it o a constant, maybe make it available from conf
|
|
|
- Thread.sleep(6000);
|
|
|
- } catch (InterruptedException iex) {
|
|
|
- }
|
|
|
- }
|
|
|
- } while (retry && --count >= 0);
|
|
|
-
|
|
|
- if (!success) {
|
|
|
- throw new IOException("Unable to create new block.");
|
|
|
- }
|
|
|
- return nodes;
|
|
|
- }
|
|
|
-
|
|
|
- // connects to the first datanode in the pipeline
|
|
|
- // Returns true if success, otherwise return failure.
|
|
|
- //
|
|
|
- private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
|
|
- boolean recoveryFlag) {
|
|
|
- DataTransferProtocol.Status pipelineStatus = SUCCESS;
|
|
|
- String firstBadLink = "";
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- for (int i = 0; i < nodes.length; i++) {
|
|
|
- LOG.debug("pipeline = " + nodes[i].getName());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // persist blocks on namenode on next flush
|
|
|
- persistBlocks.set(true);
|
|
|
-
|
|
|
- try {
|
|
|
- LOG.debug("Connecting to " + nodes[0].getName());
|
|
|
- InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
|
|
|
- s = socketFactory.createSocket();
|
|
|
- int timeoutValue = (socketTimeout > 0) ? (HdfsConstants.READ_TIMEOUT_EXTENSION
|
|
|
- * nodes.length + socketTimeout) : 0;
|
|
|
- NetUtils.connect(s, target, timeoutValue);
|
|
|
- s.setSoTimeout(timeoutValue);
|
|
|
- s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
- LOG.debug("Send buf size " + s.getSendBufferSize());
|
|
|
- long writeTimeout = (datanodeWriteTimeout > 0) ?
|
|
|
- (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
|
|
|
- datanodeWriteTimeout) : 0;
|
|
|
-
|
|
|
- //
|
|
|
- // Xmit header info to datanode
|
|
|
- //
|
|
|
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
|
|
- NetUtils.getOutputStream(s, writeTimeout),
|
|
|
- DataNode.SMALL_BUFFER_SIZE));
|
|
|
- blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
|
|
|
-
|
|
|
- // send the request
|
|
|
- DataTransferProtocol.Sender.opWriteBlock(out,
|
|
|
- block.getBlockId(), block.getGenerationStamp(),
|
|
|
- nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
|
|
|
- block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
|
|
|
- checksum.writeHeader(out);
|
|
|
- out.flush();
|
|
|
-
|
|
|
- // receive ack for connect
|
|
|
- pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
|
|
|
- firstBadLink = Text.readString(blockReplyStream);
|
|
|
- if (pipelineStatus != SUCCESS) {
|
|
|
- if (pipelineStatus == ERROR_ACCESS_TOKEN) {
|
|
|
- throw new InvalidAccessTokenException(
|
|
|
- "Got access token error for connect ack with firstBadLink as "
|
|
|
- + firstBadLink);
|
|
|
- } else {
|
|
|
- throw new IOException("Bad connect ack with firstBadLink as "
|
|
|
- + firstBadLink);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- blockStream = out;
|
|
|
- return true; // success
|
|
|
-
|
|
|
- } catch (IOException ie) {
|
|
|
-
|
|
|
- LOG.info("Exception in createBlockOutputStream " + ie);
|
|
|
-
|
|
|
- // find the datanode that matches
|
|
|
- if (firstBadLink.length() != 0) {
|
|
|
- for (int i = 0; i < nodes.length; i++) {
|
|
|
- if (nodes[i].getName().equals(firstBadLink)) {
|
|
|
- errorIndex = i;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- errorIndex = 0;
|
|
|
- }
|
|
|
- hasError = true;
|
|
|
- setLastException(ie);
|
|
|
- blockReplyStream = null;
|
|
|
- return false; // error
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private LocatedBlock locateFollowingBlock(long start,
|
|
|
- DatanodeInfo[] excludedNodes) throws IOException {
|
|
|
- int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
|
|
|
- long sleeptime = 400;
|
|
|
- while (true) {
|
|
|
- long localstart = System.currentTimeMillis();
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- return namenode.addBlock(src, clientName, block, excludedNodes);
|
|
|
- } catch (RemoteException e) {
|
|
|
- IOException ue =
|
|
|
- e.unwrapRemoteException(FileNotFoundException.class,
|
|
|
- AccessControlException.class,
|
|
|
- NSQuotaExceededException.class,
|
|
|
- DSQuotaExceededException.class);
|
|
|
- if (ue != e) {
|
|
|
- throw ue; // no need to retry these exceptions
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if (NotReplicatedYetException.class.getName().
|
|
|
- equals(e.getClassName())) {
|
|
|
- if (retries == 0) {
|
|
|
- throw e;
|
|
|
- } else {
|
|
|
- --retries;
|
|
|
- LOG.info(StringUtils.stringifyException(e));
|
|
|
- if (System.currentTimeMillis() - localstart > 5000) {
|
|
|
- LOG.info("Waiting for replication for "
|
|
|
- + (System.currentTimeMillis() - localstart) / 1000
|
|
|
- + " seconds");
|
|
|
- }
|
|
|
- try {
|
|
|
- LOG.warn("NotReplicatedYetException sleeping " + src
|
|
|
- + " retries left " + retries);
|
|
|
- Thread.sleep(sleeptime);
|
|
|
- sleeptime *= 2;
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw e;
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Block getBlock() {
|
|
|
- return block;
|
|
|
- }
|
|
|
-
|
|
|
- DatanodeInfo[] getNodes() {
|
|
|
- return nodes;
|
|
|
- }
|
|
|
-
|
|
|
- BlockAccessToken getAccessToken() {
|
|
|
- return accessToken;
|
|
|
- }
|
|
|
-
|
|
|
- private void setLastException(IOException e) {
|
|
|
- if (lastException == null) {
|
|
|
- lastException = e;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void isClosed() throws IOException {
|
|
|
- if (closed) {
|
|
|
- IOException e = lastException;
|
|
|
- throw e != null ? e : new IOException("DFSOutputStream is closed");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // returns the list of targets, if any, that is being currently used.
|
|
|
- //
|
|
|
- synchronized DatanodeInfo[] getPipeline() {
|
|
|
- if (streamer == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- DatanodeInfo[] currentNodes = streamer.getNodes();
|
|
|
- if (currentNodes == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
|
|
|
- for (int i = 0; i < currentNodes.length; i++) {
|
|
|
- value[i] = currentNodes[i];
|
|
|
- }
|
|
|
- return value;
|
|
|
- }
|
|
|
-
|
|
|
- private DFSOutputStream(String src, long blockSize, Progressable progress,
|
|
|
- int bytesPerChecksum) throws IOException {
|
|
|
- super(new PureJavaCrc32(), bytesPerChecksum, 4);
|
|
|
- this.src = src;
|
|
|
- this.blockSize = blockSize;
|
|
|
- this.progress = progress;
|
|
|
- if (progress != null) {
|
|
|
- LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
|
|
|
- }
|
|
|
-
|
|
|
- if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
|
|
|
- throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
|
|
|
- ") and blockSize(" + blockSize +
|
|
|
- ") do not match. " + "blockSize should be a " +
|
|
|
- "multiple of io.bytes.per.checksum");
|
|
|
-
|
|
|
- }
|
|
|
- checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
|
|
|
- bytesPerChecksum);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a new output stream to the given DataNode.
|
|
|
- * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
|
|
|
- */
|
|
|
- DFSOutputStream(String src, FsPermission masked, EnumSet<CreateFlag> flag,
|
|
|
- boolean createParent, short replication, long blockSize, Progressable progress,
|
|
|
- int buffersize, int bytesPerChecksum) throws IOException {
|
|
|
- this(src, blockSize, progress, bytesPerChecksum);
|
|
|
-
|
|
|
- computePacketChunkSize(writePacketSize, bytesPerChecksum);
|
|
|
-
|
|
|
- try {
|
|
|
- namenode.create(
|
|
|
- src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
|
|
|
- } catch(RemoteException re) {
|
|
|
- throw re.unwrapRemoteException(AccessControlException.class,
|
|
|
- FileAlreadyExistsException.class,
|
|
|
- FileNotFoundException.class,
|
|
|
- NSQuotaExceededException.class,
|
|
|
- DSQuotaExceededException.class);
|
|
|
- }
|
|
|
- streamer = new DataStreamer();
|
|
|
- streamer.start();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a new output stream to the given DataNode.
|
|
|
- * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
|
|
|
- */
|
|
|
- DFSOutputStream(String src, int buffersize, Progressable progress,
|
|
|
- LocatedBlock lastBlock, FileStatus stat,
|
|
|
- int bytesPerChecksum) throws IOException {
|
|
|
- this(src, stat.getBlockSize(), progress, bytesPerChecksum);
|
|
|
- initialFileSize = stat.getLen(); // length of file when opened
|
|
|
-
|
|
|
- //
|
|
|
- // The last partial block of the file has to be filled.
|
|
|
- //
|
|
|
- if (lastBlock != null) {
|
|
|
- // indicate that we are appending to an existing block
|
|
|
- bytesCurBlock = lastBlock.getBlockSize();
|
|
|
- streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
|
|
|
- } else {
|
|
|
- computePacketChunkSize(writePacketSize, bytesPerChecksum);
|
|
|
- streamer = new DataStreamer();
|
|
|
- }
|
|
|
- streamer.start();
|
|
|
- }
|
|
|
-
|
|
|
- private void computePacketChunkSize(int psize, int csize) {
|
|
|
- int chunkSize = csize + checksum.getChecksumSize();
|
|
|
- int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
|
|
|
- chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
|
|
|
- packetSize = n + chunkSize*chunksPerPacket;
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("computePacketChunkSize: src=" + src +
|
|
|
- ", chunkSize=" + chunkSize +
|
|
|
- ", chunksPerPacket=" + chunksPerPacket +
|
|
|
- ", packetSize=" + packetSize);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void queuePacket(Packet packet) {
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.addLast(packet);
|
|
|
- dataQueue.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void waitAndQueuePacket(Packet packet) throws IOException {
|
|
|
- synchronized (dataQueue) {
|
|
|
- // If queue is full, then wait till we have enough space
|
|
|
- while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
|
|
|
- try {
|
|
|
- dataQueue.wait();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- }
|
|
|
- isClosed();
|
|
|
- queuePacket(packet);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // @see FSOutputSummer#writeChunk()
|
|
|
- @Override
|
|
|
- protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
|
|
|
- throws IOException {
|
|
|
- checkOpen();
|
|
|
- isClosed();
|
|
|
-
|
|
|
- int cklen = checksum.length;
|
|
|
- int bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
|
|
- if (len > bytesPerChecksum) {
|
|
|
- throw new IOException("writeChunk() buffer size is " + len +
|
|
|
- " is larger than supported bytesPerChecksum " +
|
|
|
- bytesPerChecksum);
|
|
|
- }
|
|
|
- if (checksum.length != this.checksum.getChecksumSize()) {
|
|
|
- throw new IOException("writeChunk() checksum size is supposed to be " +
|
|
|
- this.checksum.getChecksumSize() +
|
|
|
- " but found to be " + checksum.length);
|
|
|
- }
|
|
|
-
|
|
|
- if (currentPacket == null) {
|
|
|
- currentPacket = new Packet(packetSize, chunksPerPacket,
|
|
|
- bytesCurBlock);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
|
|
- currentPacket.seqno +
|
|
|
- ", src=" + src +
|
|
|
- ", packetSize=" + packetSize +
|
|
|
- ", chunksPerPacket=" + chunksPerPacket +
|
|
|
- ", bytesCurBlock=" + bytesCurBlock);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- currentPacket.writeChecksum(checksum, 0, cklen);
|
|
|
- currentPacket.writeData(b, offset, len);
|
|
|
- currentPacket.numChunks++;
|
|
|
- bytesCurBlock += len;
|
|
|
-
|
|
|
- // If packet is full, enqueue it for transmission
|
|
|
- //
|
|
|
- if (currentPacket.numChunks == currentPacket.maxChunks ||
|
|
|
- bytesCurBlock == blockSize) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DFSClient writeChunk packet full seqno=" +
|
|
|
- currentPacket.seqno +
|
|
|
- ", src=" + src +
|
|
|
- ", bytesCurBlock=" + bytesCurBlock +
|
|
|
- ", blockSize=" + blockSize +
|
|
|
- ", appendChunk=" + appendChunk);
|
|
|
- }
|
|
|
- waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
-
|
|
|
- // If the reopened file did not end at chunk boundary and the above
|
|
|
- // write filled up its partial chunk. Tell the summer to generate full
|
|
|
- // crc chunks from now on.
|
|
|
- if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
|
|
|
- appendChunk = false;
|
|
|
- resetChecksumChunk(bytesPerChecksum);
|
|
|
- }
|
|
|
-
|
|
|
- if (!appendChunk) {
|
|
|
- int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
|
|
|
- computePacketChunkSize(psize, bytesPerChecksum);
|
|
|
- }
|
|
|
- //
|
|
|
- // if encountering a block boundary, send an empty packet to
|
|
|
- // indicate the end of block and reset bytesCurBlock.
|
|
|
- //
|
|
|
- if (bytesCurBlock == blockSize) {
|
|
|
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
|
|
|
- bytesCurBlock);
|
|
|
- currentPacket.lastPacketInBlock = true;
|
|
|
- waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
- bytesCurBlock = 0;
|
|
|
- lastFlushOffset = -1;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- @Deprecated
|
|
|
- public synchronized void sync() throws IOException {
|
|
|
- hflush();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * flushes out to all replicas of the block.
|
|
|
- * The data is in the buffers of the DNs
|
|
|
- * but not neccessary on the DN's OS buffers.
|
|
|
- *
|
|
|
- * It is a synchronous operation. When it returns,
|
|
|
- * it gurantees that flushed data become visible to new readers.
|
|
|
- * It is not guaranteed that data has been flushed to
|
|
|
- * persistent store on the datanode.
|
|
|
- * Block allocations are persisted on namenode.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void hflush() throws IOException {
|
|
|
- checkOpen();
|
|
|
- isClosed();
|
|
|
- try {
|
|
|
- /* Record current blockOffset. This might be changed inside
|
|
|
- * flushBuffer() where a partial checksum chunk might be flushed.
|
|
|
- * After the flush, reset the bytesCurBlock back to its previous value,
|
|
|
- * any partial checksum chunk will be sent now and in next packet.
|
|
|
- */
|
|
|
- long saveOffset = bytesCurBlock;
|
|
|
-
|
|
|
- // flush checksum buffer, but keep checksum buffer intact
|
|
|
- flushBuffer(true);
|
|
|
-
|
|
|
- LOG.debug("DFSClient flush() : saveOffset " + saveOffset +
|
|
|
- " bytesCurBlock " + bytesCurBlock +
|
|
|
- " lastFlushOffset " + lastFlushOffset);
|
|
|
-
|
|
|
- // Flush only if we haven't already flushed till this offset.
|
|
|
- if (lastFlushOffset != bytesCurBlock) {
|
|
|
-
|
|
|
- // record the valid offset of this flush
|
|
|
- lastFlushOffset = bytesCurBlock;
|
|
|
-
|
|
|
- // wait for all packets to be sent and acknowledged
|
|
|
- flushInternal();
|
|
|
- } else {
|
|
|
- // just discard the current packet since it is already been sent.
|
|
|
- currentPacket = null;
|
|
|
- }
|
|
|
-
|
|
|
- // Restore state of stream. Record the last flush offset
|
|
|
- // of the last full chunk that was flushed.
|
|
|
- //
|
|
|
- bytesCurBlock = saveOffset;
|
|
|
-
|
|
|
- // If any new blocks were allocated since the last flush,
|
|
|
- // then persist block locations on namenode.
|
|
|
- //
|
|
|
- if (persistBlocks.getAndSet(false)) {
|
|
|
- namenode.fsync(src, clientName);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- lastException = new IOException("IOException flush:" + e);
|
|
|
- closeThreads(true);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The expected semantics is all data have flushed out to all replicas
|
|
|
- * and all replicas have done posix fsync equivalent - ie the OS has
|
|
|
- * flushed it to the disk device (but the disk may have it in its cache).
|
|
|
- *
|
|
|
- * Right now by default it is implemented as hflush
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void hsync() throws IOException {
|
|
|
- hflush();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Waits till all existing data is flushed and confirmations
|
|
|
- * received from datanodes.
|
|
|
- */
|
|
|
- private synchronized void flushInternal() throws IOException {
|
|
|
- checkOpen();
|
|
|
- isClosed();
|
|
|
- //
|
|
|
- // If there is data in the current buffer, send it across
|
|
|
- //
|
|
|
- if (currentPacket != null) {
|
|
|
- queuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (dataQueue) {
|
|
|
- while (!closed && dataQueue.size() + ackQueue.size() > 0) {
|
|
|
- try {
|
|
|
- dataQueue.wait();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- }
|
|
|
- isClosed();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Aborts this output stream and releases any system
|
|
|
- * resources associated with this stream.
|
|
|
- */
|
|
|
- synchronized void abort() throws IOException {
|
|
|
- if (closed) {
|
|
|
- return;
|
|
|
- }
|
|
|
- streamer.setLastException(new IOException("Lease timeout of " +
|
|
|
- (hdfsTimeout/1000) + " seconds expired."));
|
|
|
- closeThreads(true);
|
|
|
- }
|
|
|
-
|
|
|
- // shutdown datastreamer and responseprocessor threads.
|
|
|
- // interrupt datastreamer if force is true
|
|
|
- private void closeThreads(boolean force) throws IOException {
|
|
|
- try {
|
|
|
- streamer.close(force);
|
|
|
- streamer.join();
|
|
|
- if (s != null) {
|
|
|
- s.close();
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new IOException("Failed to shutdown streamer");
|
|
|
- } finally {
|
|
|
- streamer = null;
|
|
|
- s = null;
|
|
|
- closed = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Closes this output stream and releases any system
|
|
|
- * resources associated with this stream.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- if (closed) {
|
|
|
- IOException e = lastException;
|
|
|
- if (e == null)
|
|
|
- return;
|
|
|
- else
|
|
|
- throw e;
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- flushBuffer(); // flush from all upper layers
|
|
|
-
|
|
|
- if (currentPacket != null) {
|
|
|
- waitAndQueuePacket(currentPacket);
|
|
|
- }
|
|
|
-
|
|
|
- if (bytesCurBlock != 0) {
|
|
|
- // send an empty packet to mark the end of the block
|
|
|
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
|
|
|
- bytesCurBlock);
|
|
|
- currentPacket.lastPacketInBlock = true;
|
|
|
- }
|
|
|
-
|
|
|
- flushInternal(); // flush all data to Datanodes
|
|
|
- // get last block before destroying the streamer
|
|
|
- Block lastBlock = streamer.getBlock();
|
|
|
- closeThreads(false);
|
|
|
- completeFile(lastBlock);
|
|
|
- leasechecker.remove(src);
|
|
|
- } finally {
|
|
|
- closed = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // should be called holding (this) lock since setTestFilename() may
|
|
|
- // be called during unit tests
|
|
|
- private void completeFile(Block last) throws IOException {
|
|
|
- long localstart = System.currentTimeMillis();
|
|
|
- boolean fileComplete = false;
|
|
|
- while (!fileComplete) {
|
|
|
- fileComplete = namenode.complete(src, clientName, last);
|
|
|
- if (!fileComplete) {
|
|
|
- if (!clientRunning ||
|
|
|
- (hdfsTimeout > 0 &&
|
|
|
- localstart + hdfsTimeout < System.currentTimeMillis())) {
|
|
|
- String msg = "Unable to close file because dfsclient " +
|
|
|
- " was unable to contact the HDFS servers." +
|
|
|
- " clientRunning " + clientRunning +
|
|
|
- " hdfsTimeout " + hdfsTimeout;
|
|
|
- LOG.info(msg);
|
|
|
- throw new IOException(msg);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(400);
|
|
|
- if (System.currentTimeMillis() - localstart > 5000) {
|
|
|
- LOG.info("Could not complete file " + src + " retrying...");
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void setArtificialSlowdown(long period) {
|
|
|
- artificialSlowdown = period;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void setChunksPerPacket(int value) {
|
|
|
- chunksPerPacket = Math.min(chunksPerPacket, value);
|
|
|
- packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
|
|
|
- (checksum.getBytesPerChecksum() +
|
|
|
- checksum.getChecksumSize()) * chunksPerPacket;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void setTestFilename(String newname) {
|
|
|
- src = newname;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the size of a file as it was when this stream was opened
|
|
|
- */
|
|
|
- long getInitialLen() {
|
|
|
- return initialFileSize;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the access token currently used by streamer, for testing only
|
|
|
- */
|
|
|
- BlockAccessToken getAccessToken() {
|
|
|
- return streamer.getAccessToken();
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
|
|
|
void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {
|