|
@@ -744,9 +744,10 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* Read/write data from/to the DataXceiveServer.
|
|
|
*/
|
|
|
public void run() {
|
|
|
+ DataInputStream in=null;
|
|
|
try {
|
|
|
- DataInputStream in = new DataInputStream(
|
|
|
- new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
|
|
|
+ in = new DataInputStream(
|
|
|
+ new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
|
|
|
short version = in.readShort();
|
|
|
if ( version != DATA_TRANFER_VERSION ) {
|
|
|
throw new IOException( "Version Mismatch" );
|
|
@@ -770,12 +771,10 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
|
|
|
} finally {
|
|
|
- try {
|
|
|
- xceiverCount.decr();
|
|
|
- LOG.debug("Number of active connections is: "+xceiverCount);
|
|
|
- s.close();
|
|
|
- } catch (IOException ie2) {
|
|
|
- }
|
|
|
+ xceiverCount.decr();
|
|
|
+ LOG.debug("Number of active connections is: "+xceiverCount);
|
|
|
+ IOUtils.closeStream(in);
|
|
|
+ IOUtils.closeSocket(s);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -793,11 +792,23 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
long startOffset = in.readLong();
|
|
|
long length = in.readLong();
|
|
|
-
|
|
|
+
|
|
|
+ // send the block
|
|
|
+ DataOutputStream out = new DataOutputStream(
|
|
|
+ new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE));
|
|
|
+ BlockSender blockSender = null;
|
|
|
try {
|
|
|
- //XXX Buffered output stream?
|
|
|
- long read = sendBlock(s, block, startOffset, length, null );
|
|
|
- myMetrics.readBytes((int)read);
|
|
|
+ try {
|
|
|
+ blockSender = new BlockSender(block, startOffset, length, true, true);
|
|
|
+ } catch(IOException e) {
|
|
|
+ out.writeShort(OP_STATUS_ERROR);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+
|
|
|
+ out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
|
|
|
+ long read = blockSender.sendBlock(out, null); // send data
|
|
|
+
|
|
|
+ myMetrics.readBytes((int) read);
|
|
|
myMetrics.readBlocks(1);
|
|
|
LOG.info("Served block " + block + " to " + s.getInetAddress());
|
|
|
} catch ( SocketException ignored ) {
|
|
@@ -808,14 +819,18 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* Earlier version shutdown() datanode if there is disk error.
|
|
|
*/
|
|
|
LOG.warn( "Got exception while serving " + block + " to " +
|
|
|
- s.getInetAddress() + ": " +
|
|
|
+ s.getInetAddress() + ":\n" +
|
|
|
StringUtils.stringifyException(ioe) );
|
|
|
throw ioe;
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ IOUtils.closeStream(blockSender);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Write a block to disk.
|
|
|
+ *
|
|
|
* @param in The stream to read from
|
|
|
* @throws IOException
|
|
|
*/
|
|
@@ -823,62 +838,45 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
//
|
|
|
// Read in the header
|
|
|
//
|
|
|
- DataOutputStream reply = new DataOutputStream(s.getOutputStream());
|
|
|
- DataOutputStream out = null;
|
|
|
- DataOutputStream checksumOut = null;
|
|
|
- Socket mirrorSock = null;
|
|
|
- DataOutputStream mirrorOut = null;
|
|
|
- DataInputStream mirrorIn = null;
|
|
|
-
|
|
|
+ Block block = new Block(in.readLong(), 0);
|
|
|
+ int numTargets = in.readInt();
|
|
|
+ if (numTargets < 0) {
|
|
|
+ throw new IOException("Mislabelled incoming datastream.");
|
|
|
+ }
|
|
|
+ DatanodeInfo targets[] = new DatanodeInfo[numTargets];
|
|
|
+ for (int i = 0; i < targets.length; i++) {
|
|
|
+ DatanodeInfo tmp = new DatanodeInfo();
|
|
|
+ tmp.readFields(in);
|
|
|
+ targets[i] = tmp;
|
|
|
+ }
|
|
|
+
|
|
|
+ short opStatus = OP_STATUS_SUCCESS; // write operation status
|
|
|
+ DataOutputStream mirrorOut = null; // stream to next target
|
|
|
+ Socket mirrorSock = null; // socket to next target
|
|
|
+ BlockReceiver blockReceiver = null; // responsible for data handling
|
|
|
try {
|
|
|
- /* We need an estimate for block size to check if the
|
|
|
- * disk partition has enough space. For now we just increment
|
|
|
- * FSDataset.reserved by configured dfs.block.size
|
|
|
- * Other alternative is to include the block size in the header
|
|
|
- * sent by DFSClient.
|
|
|
- */
|
|
|
- Block block = new Block( in.readLong(), 0 );
|
|
|
- int numTargets = in.readInt();
|
|
|
- if ( numTargets < 0 ) {
|
|
|
- throw new IOException("Mislabelled incoming datastream.");
|
|
|
- }
|
|
|
- DatanodeInfo targets[] = new DatanodeInfo[numTargets];
|
|
|
- for (int i = 0; i < targets.length; i++) {
|
|
|
- DatanodeInfo tmp = new DatanodeInfo();
|
|
|
- tmp.readFields(in);
|
|
|
- targets[i] = tmp;
|
|
|
- }
|
|
|
-
|
|
|
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
|
|
|
+ // open a block receiver and check if the block does not exist
|
|
|
+ blockReceiver = new BlockReceiver(block, in,
|
|
|
+ s.getRemoteSocketAddress().toString());
|
|
|
|
|
|
- //
|
|
|
- // Open local disk out
|
|
|
- //
|
|
|
- FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
|
|
|
- out = new DataOutputStream(
|
|
|
- new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));
|
|
|
- checksumOut = new DataOutputStream(
|
|
|
- new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
|
|
|
-
|
|
|
- InetSocketAddress mirrorTarget = null;
|
|
|
- String mirrorNode = null;
|
|
|
//
|
|
|
// Open network conn to backup machine, if
|
|
|
// appropriate
|
|
|
//
|
|
|
if (targets.length > 0) {
|
|
|
+ InetSocketAddress mirrorTarget = null;
|
|
|
+ String mirrorNode = null;
|
|
|
// Connect to backup machine
|
|
|
mirrorNode = targets[0].getName();
|
|
|
mirrorTarget = createSocketAddr(mirrorNode);
|
|
|
+ mirrorSock = new Socket();
|
|
|
try {
|
|
|
- mirrorSock = new Socket();
|
|
|
mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
|
|
|
- mirrorSock.setSoTimeout(READ_TIMEOUT);
|
|
|
+ mirrorSock.setSoTimeout(numTargets*READ_TIMEOUT);
|
|
|
mirrorOut = new DataOutputStream(
|
|
|
new BufferedOutputStream(mirrorSock.getOutputStream(),
|
|
|
BUFFER_SIZE));
|
|
|
- mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
|
|
|
- //Copied from DFSClient.java!
|
|
|
+ // Write header: Copied from DFSClient.java!
|
|
|
mirrorOut.writeShort( DATA_TRANFER_VERSION );
|
|
|
mirrorOut.write( OP_WRITE_BLOCK );
|
|
|
mirrorOut.writeLong( block.getBlockId() );
|
|
@@ -886,196 +884,73 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
for ( int i = 1; i < targets.length; i++ ) {
|
|
|
targets[i].write( mirrorOut );
|
|
|
}
|
|
|
- checksum.writeHeader( mirrorOut );
|
|
|
- myMetrics.replicatedBlocks(1);
|
|
|
- } catch (IOException ie) {
|
|
|
- if (mirrorOut != null) {
|
|
|
- LOG.info("Exception connecting to mirror " + mirrorNode
|
|
|
- + "\n" + StringUtils.stringifyException(ie));
|
|
|
- mirrorOut = null;
|
|
|
- }
|
|
|
+ } catch (IOException e) {
|
|
|
+ IOUtils.closeStream(mirrorOut);
|
|
|
+ mirrorOut = null;
|
|
|
+ IOUtils.closeSocket(mirrorSock);
|
|
|
+ mirrorSock = null;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // XXX The following code is similar on both sides...
|
|
|
-
|
|
|
- int bytesPerChecksum = checksum.getBytesPerChecksum();
|
|
|
- int checksumSize = checksum.getChecksumSize();
|
|
|
- byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
|
|
|
- long blockLen = 0;
|
|
|
- long lastOffset = 0;
|
|
|
- long lastLen = 0;
|
|
|
- short status = -1;
|
|
|
- boolean headerWritten = false;
|
|
|
-
|
|
|
- while ( true ) {
|
|
|
- // Read one data chunk in each loop.
|
|
|
-
|
|
|
- long offset = lastOffset + lastLen;
|
|
|
- int len = in.readInt();
|
|
|
- if ( len < 0 || len > bytesPerChecksum ) {
|
|
|
- LOG.warn( "Got wrong length during writeBlock(" +
|
|
|
- block + ") from " + s.getRemoteSocketAddress() +
|
|
|
- " at offset " + offset + ": " + len +
|
|
|
- " expected <= " + bytesPerChecksum );
|
|
|
- status = OP_STATUS_ERROR;
|
|
|
- break;
|
|
|
- }
|
|
|
|
|
|
- in.readFully( buf, 0, len + checksumSize );
|
|
|
-
|
|
|
- if ( len > 0 && checksumSize > 0 ) {
|
|
|
- /*
|
|
|
- * Verification is not included in the initial design.
|
|
|
- * For now, it at least catches some bugs. Later, we can
|
|
|
- * include this after showing that it does not affect
|
|
|
- * performance much.
|
|
|
- */
|
|
|
- checksum.update( buf, 0, len );
|
|
|
-
|
|
|
- if ( ! checksum.compare( buf, len ) ) {
|
|
|
- throw new IOException( "Unexpected checksum mismatch " +
|
|
|
- "while writing " + block +
|
|
|
- " from " +
|
|
|
- s.getRemoteSocketAddress() );
|
|
|
- }
|
|
|
-
|
|
|
- checksum.reset();
|
|
|
- }
|
|
|
+ String mirrorAddr = (mirrorSock == null) ? null :
|
|
|
+ mirrorSock.getRemoteSocketAddress().toString();
|
|
|
+ blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
|
|
|
|
|
|
- // First write to remote node before writing locally.
|
|
|
- if (mirrorOut != null) {
|
|
|
- try {
|
|
|
- mirrorOut.writeInt( len );
|
|
|
- mirrorOut.write( buf, 0, len + checksumSize );
|
|
|
- if (len == 0) {
|
|
|
- mirrorOut.flush();
|
|
|
- }
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.info( "Exception writing to mirror " + mirrorNode +
|
|
|
- "\n" + StringUtils.stringifyException(ioe) );
|
|
|
- //
|
|
|
- // If stream-copy fails, continue
|
|
|
- // writing to disk. We shouldn't
|
|
|
- // interrupt client write.
|
|
|
- //
|
|
|
- mirrorOut = null;
|
|
|
- }
|
|
|
- }
|
|
|
+ /*
|
|
|
+ * Informing the name node could take a long long time! Should we wait
|
|
|
+ * till namenode is informed before responding with success to the
|
|
|
+ * client? For now we don't.
|
|
|
+ */
|
|
|
+ synchronized (receivedBlockList) {
|
|
|
+ receivedBlockList.add(block);
|
|
|
+ receivedBlockList.notifyAll();
|
|
|
+ }
|
|
|
|
|
|
+ String msg = "Received block " + block + " from " +
|
|
|
+ s.getRemoteSocketAddress();
|
|
|
+
|
|
|
+ /* read response from next target in the pipeline.
|
|
|
+ * ignore the response for now. Will fix it in HADOOP-1927
|
|
|
+ */
|
|
|
+ if( mirrorSock != null ) {
|
|
|
+ short result = OP_STATUS_ERROR;
|
|
|
+ DataInputStream mirrorIn = null;
|
|
|
try {
|
|
|
- if ( !headerWritten ) {
|
|
|
- // First DATA_CHUNK.
|
|
|
- // Write the header even if checksumSize is 0.
|
|
|
- checksumOut.writeShort( FSDataset.METADATA_VERSION );
|
|
|
- checksum.writeHeader( checksumOut );
|
|
|
- headerWritten = true;
|
|
|
- }
|
|
|
-
|
|
|
- if ( len > 0 ) {
|
|
|
- out.write( buf, 0, len );
|
|
|
- // Write checksum
|
|
|
- checksumOut.write( buf, len, checksumSize );
|
|
|
- myMetrics.wroteBytes( len );
|
|
|
- } else {
|
|
|
- /* Should we sync() files here? It can add many millisecs of
|
|
|
- * latency. We did not sync before HADOOP-1134 either.
|
|
|
- */
|
|
|
- out.close();
|
|
|
- out = null;
|
|
|
- checksumOut.close();
|
|
|
- checksumOut = null;
|
|
|
- }
|
|
|
-
|
|
|
- } catch (IOException iex) {
|
|
|
- checkDiskError(iex);
|
|
|
- throw iex;
|
|
|
+ mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
|
|
|
+ result = mirrorIn.readShort();
|
|
|
+ } catch (IOException ignored) {
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(mirrorIn);
|
|
|
}
|
|
|
-
|
|
|
- if ( len == 0 ) {
|
|
|
-
|
|
|
- // We already have one successful write here. Should we
|
|
|
- // wait for response from next target? We will skip for now.
|
|
|
|
|
|
- block.setNumBytes( blockLen );
|
|
|
-
|
|
|
- //Does this fsync()?
|
|
|
- data.finalizeBlock( block );
|
|
|
- myMetrics.wroteBlocks(1);
|
|
|
-
|
|
|
- status = OP_STATUS_SUCCESS;
|
|
|
-
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- if ( lastLen > 0 && lastLen != bytesPerChecksum ) {
|
|
|
- LOG.warn( "Got wrong length during writeBlock(" +
|
|
|
- block + ") from " + s.getRemoteSocketAddress() +
|
|
|
- " : " + " got " + lastLen + " instead of " +
|
|
|
- bytesPerChecksum );
|
|
|
- status = OP_STATUS_ERROR;
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- lastOffset = offset;
|
|
|
- lastLen = len;
|
|
|
- blockLen += len;
|
|
|
+ msg += " and " + (( result != OP_STATUS_SUCCESS ) ?
|
|
|
+ "failed to mirror to " : " mirrored to ") +
|
|
|
+ mirrorAddr;
|
|
|
}
|
|
|
- // done with reading the data.
|
|
|
-
|
|
|
- if ( status == OP_STATUS_SUCCESS ) {
|
|
|
- /* Informing the name node could take a long long time!
|
|
|
- Should we wait till namenode is informed before responding
|
|
|
- with success to the client? For now we don't.
|
|
|
- */
|
|
|
- synchronized ( receivedBlockList ) {
|
|
|
- receivedBlockList.add( block );
|
|
|
- receivedBlockList.notifyAll();
|
|
|
- }
|
|
|
-
|
|
|
- String msg = "Received block " + block + " from " +
|
|
|
- s.getInetAddress();
|
|
|
-
|
|
|
- if ( mirrorOut != null ) {
|
|
|
- //Wait for the remote reply
|
|
|
- mirrorOut.flush();
|
|
|
- short result = OP_STATUS_ERROR;
|
|
|
- try {
|
|
|
- result = mirrorIn.readShort();
|
|
|
- } catch ( IOException ignored ) {}
|
|
|
|
|
|
- msg += " and " + (( result != OP_STATUS_SUCCESS ) ?
|
|
|
- "failed to mirror to " : " mirrored to ") +
|
|
|
- mirrorTarget;
|
|
|
-
|
|
|
- mirrorOut = null;
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info(msg);
|
|
|
- }
|
|
|
-
|
|
|
- if ( status >= 0 ) {
|
|
|
- try {
|
|
|
- reply.writeShort( status );
|
|
|
- reply.flush();
|
|
|
- } catch ( IOException ignored ) {}
|
|
|
- }
|
|
|
-
|
|
|
+ LOG.info(msg);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ opStatus = OP_STATUS_ERROR;
|
|
|
+ throw ioe;
|
|
|
} finally {
|
|
|
+ // send back reply
|
|
|
+ DataOutputStream reply = new DataOutputStream(s.getOutputStream());
|
|
|
try {
|
|
|
- if ( out != null )
|
|
|
- out.close();
|
|
|
- if ( checksumOut != null )
|
|
|
- checksumOut.close();
|
|
|
- if ( mirrorSock != null )
|
|
|
- mirrorSock.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
+ reply.writeShort(opStatus);
|
|
|
+ reply.flush();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()
|
|
|
+ + "for writing block " + block );
|
|
|
+ LOG.warn(StringUtils.stringifyException(ioe));
|
|
|
}
|
|
|
+ // close all opened streams
|
|
|
+ IOUtils.closeStream(reply);
|
|
|
+ IOUtils.closeStream(mirrorOut);
|
|
|
+ IOUtils.closeSocket(mirrorSock);
|
|
|
+ IOUtils.closeStream(blockReceiver);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Reads the metadata and sends the data in one 'DATA_CHUNK'
|
|
|
* @param in
|
|
@@ -1113,173 +988,401 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /* An interface to throttle the block transfers */
|
|
|
+ private interface Throttler {
|
|
|
+ /** Given the numOfBytes sent/received since last time throttle was called,
|
|
|
+ * make the current thread sleep if I/O rate is too fast
|
|
|
+ * compared to the given bandwidth
|
|
|
+ *
|
|
|
+ * @param numOfBytes
|
|
|
+ * number of bytes sent/received since last time throttle was called
|
|
|
+ */
|
|
|
+ void throttle(int numOfBytes);
|
|
|
+ }
|
|
|
|
|
|
- /** sendBlock() is used to read block and its metadata and stream
|
|
|
- * the data to either a client or to another datanode.
|
|
|
- * If argument targets is null, then it is assumed to be replying
|
|
|
- * to a client request (OP_BLOCK_READ). Otherwise, we are replicating
|
|
|
- * to another datanode.
|
|
|
- *
|
|
|
- * returns total bytes reads, including crc.
|
|
|
- */
|
|
|
- long sendBlock(Socket sock, Block block,
|
|
|
- long startOffset, long length, DatanodeInfo targets[] )
|
|
|
- throws IOException {
|
|
|
- DataOutputStream out = new DataOutputStream(
|
|
|
- new BufferedOutputStream(sock.getOutputStream(),
|
|
|
- BUFFER_SIZE));
|
|
|
- RandomAccessFile blockInFile = null;
|
|
|
- DataInputStream blockIn = null;
|
|
|
- DataInputStream checksumIn = null;
|
|
|
- long totalRead = 0;
|
|
|
-
|
|
|
- /* XXX This will affect inter datanode transfers during
|
|
|
- * a CRC upgrade. There should not be any replication
|
|
|
- * during crc upgrade since we are in safe mode, right?
|
|
|
- */
|
|
|
- boolean corruptChecksumOk = targets == null;
|
|
|
+ private class BlockSender implements java.io.Closeable {
|
|
|
+ private Block block; // the block to read from
|
|
|
+ private DataInputStream blockIn; // data strean
|
|
|
+ private DataInputStream checksumIn; // checksum datastream
|
|
|
+ private DataChecksum checksum; // checksum stream
|
|
|
+ private long offset; // starting position to read
|
|
|
+ private long endOffset; // ending position
|
|
|
+ private byte buf[]; // buffer to store data read from the block file & crc
|
|
|
+ private int bytesPerChecksum; // chunk size
|
|
|
+ private int checksumSize; // checksum size
|
|
|
+ private boolean corruptChecksumOk; // if need to verify checksum
|
|
|
+ private boolean chunkOffsetOK; // if need to send chunk offset
|
|
|
+
|
|
|
+ private Throttler throttler;
|
|
|
+ private DataOutputStream out;
|
|
|
+
|
|
|
+ BlockSender(Block block, long startOffset, long length,
|
|
|
+ boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException {
|
|
|
+ RandomAccessFile blockInFile = null;
|
|
|
|
|
|
- try {
|
|
|
- File blockFile = data.getBlockFile( block );
|
|
|
- blockInFile = new RandomAccessFile(blockFile, "r");
|
|
|
-
|
|
|
- File checksumFile = FSDataset.getMetaFile( blockFile );
|
|
|
- DataChecksum checksum = null;
|
|
|
-
|
|
|
- if ( !corruptChecksumOk || checksumFile.exists() ) {
|
|
|
- checksumIn = new DataInputStream(
|
|
|
- new BufferedInputStream(new FileInputStream(checksumFile),
|
|
|
- BUFFER_SIZE));
|
|
|
-
|
|
|
- //read and handle the common header here. For now just a version
|
|
|
- short version = checksumIn.readShort();
|
|
|
- if ( version != FSDataset.METADATA_VERSION ) {
|
|
|
- LOG.warn( "Wrong version (" + version +
|
|
|
- ") for metadata file for " + block + " ignoring ..." );
|
|
|
+ try {
|
|
|
+ this.block = block;
|
|
|
+ this.chunkOffsetOK = chunkOffsetOK;
|
|
|
+ this.corruptChecksumOk = corruptChecksumOk;
|
|
|
+ File blockFile = data.getBlockFile(block);
|
|
|
+ blockInFile = new RandomAccessFile(blockFile, "r");
|
|
|
+
|
|
|
+ File checksumFile = FSDataset.getMetaFile(blockFile);
|
|
|
+
|
|
|
+ if (!corruptChecksumOk || checksumFile.exists()) {
|
|
|
+ checksumIn = new DataInputStream(new BufferedInputStream(
|
|
|
+ new FileInputStream(checksumFile), BUFFER_SIZE));
|
|
|
+
|
|
|
+ // read and handle the common header here. For now just a version
|
|
|
+ short version = checksumIn.readShort();
|
|
|
+ if (version != FSDataset.METADATA_VERSION) {
|
|
|
+ LOG.warn("Wrong version (" + version + ") for metadata file for "
|
|
|
+ + block + " ignoring ...");
|
|
|
+ }
|
|
|
+ checksum = DataChecksum.newDataChecksum(checksumIn);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Could not find metadata file for " + block);
|
|
|
+ // This only decides the buffer size. Use BUFFER_SIZE?
|
|
|
+ checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
|
|
|
+ 16 * 1024);
|
|
|
}
|
|
|
- checksum = DataChecksum.newDataChecksum( checksumIn ) ;
|
|
|
- } else {
|
|
|
- LOG.warn( "Could not find metadata file for " + block );
|
|
|
- // This only decides the buffer size. Use BUFFER_SIZE?
|
|
|
- checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
|
|
|
- 16*1024 );
|
|
|
- }
|
|
|
|
|
|
- int bytesPerChecksum = checksum.getBytesPerChecksum();
|
|
|
- int checksumSize = checksum.getChecksumSize();
|
|
|
-
|
|
|
- if (length < 0) {
|
|
|
- length = data.getLength(block);
|
|
|
- }
|
|
|
+ bytesPerChecksum = checksum.getBytesPerChecksum();
|
|
|
+ checksumSize = checksum.getChecksumSize();
|
|
|
+
|
|
|
+ if (length < 0) {
|
|
|
+ length = data.getLength(block);
|
|
|
+ }
|
|
|
|
|
|
- long endOffset = data.getLength( block );
|
|
|
- if ( startOffset < 0 || startOffset > endOffset ||
|
|
|
- (length + startOffset) > endOffset ) {
|
|
|
- String msg = " Offset " + startOffset + " and length " + length +
|
|
|
- " don't match block " + block + " ( blockLen " +
|
|
|
- endOffset + " )";
|
|
|
- LOG.warn( "sendBlock() : " + msg );
|
|
|
- if ( targets != null ) {
|
|
|
+ endOffset = data.getLength(block);
|
|
|
+ if (startOffset < 0 || startOffset > endOffset
|
|
|
+ || (length + startOffset) > endOffset) {
|
|
|
+ String msg = " Offset " + startOffset + " and length " + length
|
|
|
+ + " don't match block " + block + " ( blockLen " + endOffset + " )";
|
|
|
+ LOG.warn("sendBlock() : " + msg);
|
|
|
throw new IOException(msg);
|
|
|
- } else {
|
|
|
- out.writeShort( OP_STATUS_ERROR_INVALID );
|
|
|
- return totalRead;
|
|
|
}
|
|
|
+
|
|
|
+ buf = new byte[bytesPerChecksum + checksumSize];
|
|
|
+ offset = (startOffset - (startOffset % bytesPerChecksum));
|
|
|
+ if (length >= 0) {
|
|
|
+ // Make sure endOffset points to end of a checksumed chunk.
|
|
|
+ long tmpLen = startOffset + length + (startOffset - offset);
|
|
|
+ if (tmpLen % bytesPerChecksum != 0) {
|
|
|
+ tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
|
|
|
+ }
|
|
|
+ if (tmpLen < endOffset) {
|
|
|
+ endOffset = tmpLen;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // seek to the right offsets
|
|
|
+ if (offset > 0) {
|
|
|
+ long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
|
|
|
+ blockInFile.seek(offset);
|
|
|
+ if (checksumSkip > 0) {
|
|
|
+ // Should we use seek() for checksum file as well?
|
|
|
+ IOUtils.skipFully(checksumIn, checksumSkip);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ blockIn = new DataInputStream(new BufferedInputStream(
|
|
|
+ new FileInputStream(blockInFile.getFD()), BUFFER_SIZE));
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ IOUtils.closeStream(this);
|
|
|
+ IOUtils.closeStream(blockInFile);
|
|
|
+ throw ioe;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
|
|
|
- long offset = (startOffset - (startOffset % bytesPerChecksum));
|
|
|
- if ( length >= 0 ) {
|
|
|
- // Make sure endOffset points to end of a checksumed chunk.
|
|
|
- long tmpLen = startOffset + length + (startOffset - offset);
|
|
|
- if ( tmpLen % bytesPerChecksum != 0 ) {
|
|
|
- tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
|
|
|
+ // close opened files
|
|
|
+ public void close() throws IOException {
|
|
|
+ IOException ioe = null;
|
|
|
+ // close checksum file
|
|
|
+ if(checksumIn!=null) {
|
|
|
+ try {
|
|
|
+ checksumIn.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ ioe = e;
|
|
|
+ }
|
|
|
+ checksumIn = null;
|
|
|
+ }
|
|
|
+ // close data file
|
|
|
+ if(blockIn!=null) {
|
|
|
+ try {
|
|
|
+ blockIn.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ ioe = e;
|
|
|
}
|
|
|
- if ( tmpLen < endOffset ) {
|
|
|
- endOffset = tmpLen;
|
|
|
+ blockIn = null;
|
|
|
+ }
|
|
|
+ // throw IOException if there is any
|
|
|
+ if(ioe!= null) {
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int sendChunk()
|
|
|
+ throws IOException {
|
|
|
+ int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
|
|
|
+ if (len == 0)
|
|
|
+ return 0;
|
|
|
+ blockIn.readFully(buf, 0, len);
|
|
|
+
|
|
|
+ if (checksumSize > 0 && checksumIn != null) {
|
|
|
+ try {
|
|
|
+ checksumIn.readFully(buf, len, checksumSize);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn(" Could not read checksum for data at offset " + offset
|
|
|
+ + " for block " + block + " got : "
|
|
|
+ + StringUtils.stringifyException(e));
|
|
|
+ IOUtils.closeStream(checksumIn);
|
|
|
+ checksumIn = null;
|
|
|
+ if (corruptChecksumOk) {
|
|
|
+ // Just fill the array with zeros.
|
|
|
+ Arrays.fill(buf, len, len + checksumSize, (byte) 0);
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // seek to the right offsets
|
|
|
- if ( offset > 0 ) {
|
|
|
- long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
|
|
|
- blockInFile.seek(offset);
|
|
|
- if (checksumSkip > 0) {
|
|
|
- //Should we use seek() for checksum file as well?
|
|
|
- IOUtils.skipFully(checksumIn, checksumSkip);
|
|
|
+ out.writeInt(len);
|
|
|
+ out.write(buf, 0, len + checksumSize);
|
|
|
+
|
|
|
+ if (throttler != null) { // rebalancing so throttle
|
|
|
+ throttler.throttle(len + checksumSize);
|
|
|
+ }
|
|
|
+
|
|
|
+ return len;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * sendBlock() is used to read block and its metadata and stream the data to
|
|
|
+ * either a client or to another datanode.
|
|
|
+ *
|
|
|
+ * @param out stream to which the block is written to
|
|
|
+ * returns total bytes reads, including crc.
|
|
|
+ */
|
|
|
+ long sendBlock(DataOutputStream out, Throttler throttler)
|
|
|
+ throws IOException {
|
|
|
+ if( out == null ) {
|
|
|
+ throw new IOException( "out stream is null" );
|
|
|
+ }
|
|
|
+ this.out = out;
|
|
|
+ this.throttler = throttler;
|
|
|
+
|
|
|
+ long totalRead = 0;
|
|
|
+ try {
|
|
|
+ checksum.writeHeader(out);
|
|
|
+ if ( chunkOffsetOK ) {
|
|
|
+ out.writeLong( offset );
|
|
|
+ }
|
|
|
+
|
|
|
+ while (endOffset > offset) {
|
|
|
+ // Write one data chunk per loop.
|
|
|
+ long len = sendChunk();
|
|
|
+ offset += len;
|
|
|
+ totalRead += len + checksumSize;
|
|
|
}
|
|
|
+ out.writeInt(0); // mark the end of block
|
|
|
+ out.flush();
|
|
|
+ } finally {
|
|
|
+ close();
|
|
|
}
|
|
|
-
|
|
|
- blockIn = new DataInputStream(new BufferedInputStream(
|
|
|
- new FileInputStream(blockInFile.getFD()),
|
|
|
- BUFFER_SIZE));
|
|
|
-
|
|
|
- if ( targets != null ) {
|
|
|
+
|
|
|
+ return totalRead;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* A class that receives a block and wites to its own disk, meanwhile
|
|
|
+ * may copies it to another site. If a throttler is provided,
|
|
|
+ * streaming throttling is also supported.
|
|
|
+ * */
|
|
|
+ private class BlockReceiver implements java.io.Closeable {
|
|
|
+ private Block block; // the block to receive
|
|
|
+ private DataInputStream in = null; // from where data are read
|
|
|
+ private DataChecksum checksum; // from where chunks of a block can be read
|
|
|
+ private DataOutputStream out = null; // to block file at local disk
|
|
|
+ private DataOutputStream checksumOut = null; // to crc file at local disk
|
|
|
+ private int bytesPerChecksum;
|
|
|
+ private int checksumSize;
|
|
|
+ private byte buf[];
|
|
|
+ private long offset;
|
|
|
+ final private String inAddr;
|
|
|
+ private String mirrorAddr;
|
|
|
+ private DataOutputStream mirrorOut;
|
|
|
+ private Throttler throttler;
|
|
|
+ private int lastLen = -1;
|
|
|
+ private int curLen = -1;
|
|
|
+
|
|
|
+ BlockReceiver(Block block, DataInputStream in, String inAddr)
|
|
|
+ throws IOException {
|
|
|
+ try{
|
|
|
+ this.block = block;
|
|
|
+ this.in = in;
|
|
|
+ this.inAddr = inAddr;
|
|
|
+ this.checksum = DataChecksum.newDataChecksum(in);
|
|
|
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
|
|
+ this.checksumSize = checksum.getChecksumSize();
|
|
|
+ this.buf = new byte[bytesPerChecksum + checksumSize];
|
|
|
+
|
|
|
//
|
|
|
- // Header info
|
|
|
+ // Open local disk out
|
|
|
//
|
|
|
- out.writeShort( DATA_TRANFER_VERSION );
|
|
|
- out.writeByte( OP_WRITE_BLOCK );
|
|
|
- out.writeLong( block.getBlockId() );
|
|
|
- out.writeInt(targets.length-1);
|
|
|
- for (int i = 1; i < targets.length; i++) {
|
|
|
- targets[i].write( out );
|
|
|
+ FSDataset.BlockWriteStreams streams = data.writeToBlock(block);
|
|
|
+ this.out = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ streams.dataOut, BUFFER_SIZE));
|
|
|
+ this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ streams.checksumOut, BUFFER_SIZE));
|
|
|
+ } catch(IOException ioe) {
|
|
|
+ IOUtils.closeStream(this);
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // close files
|
|
|
+ public void close() throws IOException {
|
|
|
+ IOException ioe = null;
|
|
|
+ // close checksum file
|
|
|
+ try {
|
|
|
+ if (checksumOut != null) {
|
|
|
+ checksumOut.close();
|
|
|
+ checksumOut = null;
|
|
|
+ }
|
|
|
+ } catch(IOException e) {
|
|
|
+ ioe = e;
|
|
|
+ }
|
|
|
+ // close block file
|
|
|
+ try {
|
|
|
+ if (out != null) {
|
|
|
+ out.close();
|
|
|
+ out = null;
|
|
|
}
|
|
|
- } else {
|
|
|
- out.writeShort( OP_STATUS_SUCCESS );
|
|
|
+ } catch (IOException e) {
|
|
|
+ ioe = e;
|
|
|
+ }
|
|
|
+ // disk check
|
|
|
+ if(ioe != null) {
|
|
|
+ checkDiskError(ioe);
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* receive a chunk: write it to disk & mirror it to another stream */
|
|
|
+ private void receiveChunk( int len ) throws IOException {
|
|
|
+ if (len <= 0 || len > bytesPerChecksum) {
|
|
|
+ throw new IOException("Got wrong length during writeBlock(" + block
|
|
|
+ + ") from " + inAddr + " at offset " + offset + ": " + len
|
|
|
+ + " expected <= " + bytesPerChecksum);
|
|
|
}
|
|
|
|
|
|
- checksum.writeHeader( out );
|
|
|
-
|
|
|
- if ( targets == null ) {
|
|
|
- out.writeLong( offset );
|
|
|
+ if (lastLen > 0 && lastLen != bytesPerChecksum) {
|
|
|
+ throw new IOException("Got wrong length during receiveBlock(" + block
|
|
|
+ + ") from " + inAddr + " : " + " got " + lastLen + " instead of "
|
|
|
+ + bytesPerChecksum);
|
|
|
}
|
|
|
-
|
|
|
- while ( endOffset >= offset ) {
|
|
|
- // Write one data chunk per loop.
|
|
|
- int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
|
|
|
- if ( len > 0 ) {
|
|
|
- blockIn.readFully( buf, 0, len );
|
|
|
- totalRead += len;
|
|
|
-
|
|
|
- if ( checksumSize > 0 && checksumIn != null ) {
|
|
|
- try {
|
|
|
- checksumIn.readFully( buf, len, checksumSize );
|
|
|
- totalRead += checksumSize;
|
|
|
- } catch ( IOException e ) {
|
|
|
- LOG.warn( " Could not read checksum for data at offset " +
|
|
|
- offset + " for block " + block + " got : " +
|
|
|
- StringUtils.stringifyException(e) );
|
|
|
- IOUtils.closeStream( checksumIn );
|
|
|
- checksumIn = null;
|
|
|
- if ( corruptChecksumOk ) {
|
|
|
- // Just fill the array with zeros.
|
|
|
- Arrays.fill( buf, len, len + checksumSize, (byte)0 );
|
|
|
- } else {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+
|
|
|
+ lastLen = curLen;
|
|
|
+ curLen = len;
|
|
|
+
|
|
|
+ in.readFully(buf, 0, len + checksumSize);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Verification is not included in the initial design. For now, it at
|
|
|
+ * least catches some bugs. Later, we can include this after showing that
|
|
|
+ * it does not affect performance much.
|
|
|
+ */
|
|
|
+ checksum.update(buf, 0, len);
|
|
|
+
|
|
|
+ if (!checksum.compare(buf, len)) {
|
|
|
+ throw new IOException("Unexpected checksum mismatch "
|
|
|
+ + "while writing " + block + " from " + inAddr);
|
|
|
+ }
|
|
|
+
|
|
|
+ checksum.reset();
|
|
|
+
|
|
|
+ // First write to remote node before writing locally.
|
|
|
+ if (mirrorOut != null) {
|
|
|
+ try {
|
|
|
+ mirrorOut.writeInt(len);
|
|
|
+ mirrorOut.write(buf, 0, len + checksumSize);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
|
|
|
+ + StringUtils.stringifyException(ioe));
|
|
|
+ //
|
|
|
+ // If stream-copy fails, continue
|
|
|
+ // writing to disk. We shouldn't
|
|
|
+ // interrupt client write.
|
|
|
+ //
|
|
|
+ mirrorOut = null;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- out.writeInt( len );
|
|
|
- out.write( buf, 0, len + checksumSize );
|
|
|
-
|
|
|
- if ( offset == endOffset ) {
|
|
|
- out.flush();
|
|
|
- // We are not waiting for response from target.
|
|
|
- break;
|
|
|
+ try {
|
|
|
+ out.write(buf, 0, len);
|
|
|
+ // Write checksum
|
|
|
+ checksumOut.write(buf, len, checksumSize);
|
|
|
+ myMetrics.wroteBytes(len);
|
|
|
+ } catch (IOException iex) {
|
|
|
+ checkDiskError(iex);
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (throttler != null) { // throttle I/O
|
|
|
+ throttler.throttle(len + checksumSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void receiveBlock(DataOutputStream mirrorOut,
|
|
|
+ String mirrorAddr, Throttler throttler) throws IOException {
|
|
|
+
|
|
|
+ this.mirrorOut = mirrorOut;
|
|
|
+ this.mirrorAddr = mirrorAddr;
|
|
|
+ this.throttler = throttler;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * We need an estimate for block size to check if the disk partition has
|
|
|
+ * enough space. For now we just increment FSDataset.reserved by
|
|
|
+ * configured dfs.block.size Other alternative is to include the block
|
|
|
+ * size in the header sent by DFSClient.
|
|
|
+ */
|
|
|
+
|
|
|
+ try {
|
|
|
+ // write data chunk header
|
|
|
+ checksumOut.writeShort(FSDataset.METADATA_VERSION);
|
|
|
+ checksum.writeHeader(checksumOut);
|
|
|
+ if (mirrorOut != null) {
|
|
|
+ checksum.writeHeader(mirrorOut);
|
|
|
+ this.mirrorAddr = mirrorAddr;
|
|
|
+ }
|
|
|
+
|
|
|
+ int len = in.readInt();
|
|
|
+ while (len != 0) {
|
|
|
+ receiveChunk( len );
|
|
|
+ offset += len;
|
|
|
+ len = in.readInt();
|
|
|
+ }
|
|
|
+
|
|
|
+ // flush the mirror out
|
|
|
+ if (mirrorOut != null) {
|
|
|
+ mirrorOut.writeInt(0); // mark the end of the stream
|
|
|
+ mirrorOut.flush();
|
|
|
}
|
|
|
- offset += len;
|
|
|
+
|
|
|
+ // close the block/crc files
|
|
|
+ close();
|
|
|
+
|
|
|
+ // Finalize the block. Does this fsync()?
|
|
|
+ block.setNumBytes(offset);
|
|
|
+ data.finalizeBlock(block);
|
|
|
+ myMetrics.wroteBlocks(1);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ IOUtils.closeStream(this);
|
|
|
+ throw ioe;
|
|
|
}
|
|
|
- } finally {
|
|
|
- IOUtils.closeStream( blockInFile );
|
|
|
- IOUtils.closeStream( checksumIn );
|
|
|
- IOUtils.closeStream( blockIn );
|
|
|
- IOUtils.closeStream( out );
|
|
|
}
|
|
|
-
|
|
|
- return totalRead;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1305,21 +1408,40 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
public void run() {
|
|
|
xmitsInProgress++;
|
|
|
Socket sock = null;
|
|
|
+ DataOutputStream out = null;
|
|
|
+ BlockSender blockSender = null;
|
|
|
|
|
|
try {
|
|
|
InetSocketAddress curTarget =
|
|
|
createSocketAddr(targets[0].getName());
|
|
|
sock = new Socket();
|
|
|
sock.connect(curTarget, READ_TIMEOUT);
|
|
|
- sock.setSoTimeout(READ_TIMEOUT);
|
|
|
- sendBlock( sock, b, 0, -1, targets );
|
|
|
- LOG.info( "Transmitted block " + b + " to " + curTarget );
|
|
|
-
|
|
|
- } catch ( IOException ie ) {
|
|
|
- LOG.warn( "Failed to transfer " + b + " to " +
|
|
|
- targets[0].getName() + " got " +
|
|
|
- StringUtils.stringifyException( ie ) );
|
|
|
+ sock.setSoTimeout(targets.length*READ_TIMEOUT);
|
|
|
+
|
|
|
+ out = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ sock.getOutputStream(), BUFFER_SIZE));
|
|
|
+ blockSender = new BlockSender(b, 0, -1, false, false);
|
|
|
+
|
|
|
+ //
|
|
|
+ // Header info
|
|
|
+ //
|
|
|
+ out.writeShort(DATA_TRANFER_VERSION);
|
|
|
+ out.writeByte(OP_WRITE_BLOCK);
|
|
|
+ out.writeLong(b.getBlockId());
|
|
|
+ // write targets
|
|
|
+ out.writeInt(targets.length - 1);
|
|
|
+ for (int i = 1; i < targets.length; i++) {
|
|
|
+ targets[i].write(out);
|
|
|
+ }
|
|
|
+ // send data & checksum
|
|
|
+ blockSender.sendBlock(out, null);
|
|
|
+ LOG.info("Transmitted block " + b + " to " + curTarget);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()
|
|
|
+ + " got " + StringUtils.stringifyException(ie));
|
|
|
} finally {
|
|
|
+ IOUtils.closeStream(blockSender);
|
|
|
+ IOUtils.closeStream(out);
|
|
|
IOUtils.closeSocket(sock);
|
|
|
xmitsInProgress--;
|
|
|
}
|