|
@@ -80,6 +80,13 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
return new InetSocketAddress(host, port);
|
|
|
}
|
|
|
|
|
|
+ private static String stringifyException(Exception e) {
|
|
|
+ StringWriter stm = new StringWriter();
|
|
|
+ PrintWriter wrt = new PrintWriter(stm);
|
|
|
+ e.printStackTrace(wrt);
|
|
|
+ wrt.close();
|
|
|
+ return stm.toString();
|
|
|
+ }
|
|
|
|
|
|
private static Vector subThreadList = null;
|
|
|
DatanodeProtocol namenode;
|
|
@@ -91,7 +98,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
Daemon dataXceiveServer = null;
|
|
|
long blockReportInterval;
|
|
|
private long datanodeStartupPeriod;
|
|
|
- private Configuration fConf;
|
|
|
|
|
|
/**
|
|
|
* Create the DataNode given a configuration and a dataDir.
|
|
@@ -160,10 +166,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* forever calling remote NameNode functions.
|
|
|
*/
|
|
|
public void offerService() throws Exception {
|
|
|
- long wakeups = 0;
|
|
|
long lastHeartbeat = 0, lastBlockReport = 0;
|
|
|
long sendStart = System.currentTimeMillis();
|
|
|
- int heartbeatsSent = 0;
|
|
|
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
|
|
|
|
|
|
//
|
|
@@ -321,275 +325,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
try {
|
|
|
byte op = (byte) in.read();
|
|
|
if (op == OP_WRITE_BLOCK) {
|
|
|
- //
|
|
|
- // Read in the header
|
|
|
- //
|
|
|
- DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
|
|
|
- try {
|
|
|
- boolean shouldReportBlock = in.readBoolean();
|
|
|
- Block b = new Block();
|
|
|
- b.readFields(in);
|
|
|
- int numTargets = in.readInt();
|
|
|
- if (numTargets <= 0) {
|
|
|
- throw new IOException("Mislabelled incoming datastream.");
|
|
|
- }
|
|
|
- DatanodeInfo targets[] = new DatanodeInfo[numTargets];
|
|
|
- for (int i = 0; i < targets.length; i++) {
|
|
|
- DatanodeInfo tmp = new DatanodeInfo();
|
|
|
- tmp.readFields(in);
|
|
|
- targets[i] = tmp;
|
|
|
- }
|
|
|
- byte encodingType = (byte) in.read();
|
|
|
- long len = in.readLong();
|
|
|
-
|
|
|
- //
|
|
|
- // Make sure curTarget is equal to this machine
|
|
|
- //
|
|
|
- DatanodeInfo curTarget = targets[0];
|
|
|
-
|
|
|
- //
|
|
|
- // Track all the places we've successfully written the block
|
|
|
- //
|
|
|
- Vector mirrors = new Vector();
|
|
|
-
|
|
|
- //
|
|
|
- // Open local disk out
|
|
|
- //
|
|
|
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
|
|
|
- InetSocketAddress mirrorTarget = null;
|
|
|
- try {
|
|
|
- //
|
|
|
- // Open network conn to backup machine, if
|
|
|
- // appropriate
|
|
|
- //
|
|
|
- DataInputStream in2 = null;
|
|
|
- DataOutputStream out2 = null;
|
|
|
- if (targets.length > 1) {
|
|
|
- // Connect to backup machine
|
|
|
- mirrorTarget = createSocketAddr(targets[1].getName().toString());
|
|
|
- try {
|
|
|
- Socket s2 = new Socket();
|
|
|
- s2.connect(mirrorTarget, READ_TIMEOUT);
|
|
|
- s2.setSoTimeout(READ_TIMEOUT);
|
|
|
- out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
|
|
|
- in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
|
|
|
-
|
|
|
- // Write connection header
|
|
|
- out2.write(OP_WRITE_BLOCK);
|
|
|
- out2.writeBoolean(shouldReportBlock);
|
|
|
- b.write(out2);
|
|
|
- out2.writeInt(targets.length - 1);
|
|
|
- for (int i = 1; i < targets.length; i++) {
|
|
|
- targets[i].write(out2);
|
|
|
- }
|
|
|
- out2.write(encodingType);
|
|
|
- out2.writeLong(len);
|
|
|
- } catch (IOException ie) {
|
|
|
- if (out2 != null) {
|
|
|
- try {
|
|
|
- out2.close();
|
|
|
- in2.close();
|
|
|
- } catch (IOException out2close) {
|
|
|
- } finally {
|
|
|
- out2 = null;
|
|
|
- in2 = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Process incoming data, copy to disk and
|
|
|
- // maybe to network.
|
|
|
- //
|
|
|
- try {
|
|
|
- boolean anotherChunk = len != 0;
|
|
|
- byte buf[] = new byte[BUFFER_SIZE];
|
|
|
-
|
|
|
- while (anotherChunk) {
|
|
|
- while (len > 0) {
|
|
|
- int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
|
|
|
- if (bytesRead < 0) {
|
|
|
- throw new EOFException("EOF reading from "+s.toString());
|
|
|
- }
|
|
|
- if (bytesRead > 0) {
|
|
|
- try {
|
|
|
- out.write(buf, 0, bytesRead);
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
- }
|
|
|
- if (out2 != null) {
|
|
|
- try {
|
|
|
- out2.write(buf, 0, bytesRead);
|
|
|
- } catch (IOException out2e) {
|
|
|
- //
|
|
|
- // If stream-copy fails, continue
|
|
|
- // writing to disk. We shouldn't
|
|
|
- // interrupt client write.
|
|
|
- //
|
|
|
- try {
|
|
|
- out2.close();
|
|
|
- in2.close();
|
|
|
- } catch (IOException out2close) {
|
|
|
- } finally {
|
|
|
- out2 = null;
|
|
|
- in2 = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- len -= bytesRead;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (encodingType == RUNLENGTH_ENCODING) {
|
|
|
- anotherChunk = false;
|
|
|
- } else if (encodingType == CHUNKED_ENCODING) {
|
|
|
- len = in.readLong();
|
|
|
- if (out2 != null) {
|
|
|
- out2.writeLong(len);
|
|
|
- }
|
|
|
- if (len == 0) {
|
|
|
- anotherChunk = false;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (out2 == null) {
|
|
|
- LOG.info("Received block " + b + " from " + s.getInetAddress());
|
|
|
- } else {
|
|
|
- out2.flush();
|
|
|
- long complete = in2.readLong();
|
|
|
- if (complete != WRITE_COMPLETE) {
|
|
|
- LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
|
|
|
- }
|
|
|
- LocatedBlock newLB = new LocatedBlock();
|
|
|
- newLB.readFields(in2);
|
|
|
- DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
|
|
|
- for (int k = 0; k < mirrorsSoFar.length; k++) {
|
|
|
- mirrors.add(mirrorsSoFar[k]);
|
|
|
- }
|
|
|
- LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (out2 != null) {
|
|
|
- out2.close();
|
|
|
- in2.close();
|
|
|
- }
|
|
|
- }
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- out.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
- }
|
|
|
- }
|
|
|
- data.finalizeBlock(b);
|
|
|
-
|
|
|
- //
|
|
|
- // Tell the namenode that we've received this block
|
|
|
- // in full, if we've been asked to. This is done
|
|
|
- // during NameNode-directed block transfers, but not
|
|
|
- // client writes.
|
|
|
- //
|
|
|
- if (shouldReportBlock) {
|
|
|
- synchronized (receivedBlockList) {
|
|
|
- receivedBlockList.add(b);
|
|
|
- receivedBlockList.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Tell client job is done, and reply with
|
|
|
- // the new LocatedBlock.
|
|
|
- //
|
|
|
- reply.writeLong(WRITE_COMPLETE);
|
|
|
- mirrors.add(curTarget);
|
|
|
- LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
|
|
|
- newLB.write(reply);
|
|
|
- } finally {
|
|
|
- reply.close();
|
|
|
- }
|
|
|
+ writeBlock(in);
|
|
|
} else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {
|
|
|
- //
|
|
|
- // Read in the header
|
|
|
- //
|
|
|
- Block b = new Block();
|
|
|
- b.readFields(in);
|
|
|
-
|
|
|
- long toSkip = 0;
|
|
|
- if (op == OP_READSKIP_BLOCK) {
|
|
|
- toSkip = in.readLong();
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Open reply stream
|
|
|
- //
|
|
|
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
|
|
|
- try {
|
|
|
- //
|
|
|
- // Write filelen of -1 if error
|
|
|
- //
|
|
|
- if (! data.isValidBlock(b)) {
|
|
|
- out.writeLong(-1);
|
|
|
- } else {
|
|
|
- //
|
|
|
- // Get blockdata from disk
|
|
|
- //
|
|
|
- long len = data.getLength(b);
|
|
|
- DataInputStream in2 = new DataInputStream(data.getBlockData(b));
|
|
|
- out.writeLong(len);
|
|
|
-
|
|
|
- if (op == OP_READSKIP_BLOCK) {
|
|
|
- if (toSkip > len) {
|
|
|
- toSkip = len;
|
|
|
- }
|
|
|
- long amtSkipped = 0;
|
|
|
- try {
|
|
|
- amtSkipped = in2.skip(toSkip);
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
- }
|
|
|
- out.writeLong(amtSkipped);
|
|
|
- }
|
|
|
-
|
|
|
- byte buf[] = new byte[BUFFER_SIZE];
|
|
|
- try {
|
|
|
- int bytesRead = 0;
|
|
|
- try {
|
|
|
- bytesRead = in2.read(buf);
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
- }
|
|
|
- while (bytesRead >= 0) {
|
|
|
- out.write(buf, 0, bytesRead);
|
|
|
- len -= bytesRead;
|
|
|
- try {
|
|
|
- bytesRead = in2.read(buf);
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (SocketException se) {
|
|
|
- // This might be because the reader
|
|
|
- // closed the stream early
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- in2.close();
|
|
|
- } catch (IOException iex) {
|
|
|
- shutdown();
|
|
|
- throw iex;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.info("Served block " + b + " to " + s.getInetAddress());
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
- }
|
|
|
+ readBlock(in, op);
|
|
|
} else {
|
|
|
while (op >= 0) {
|
|
|
System.out.println("Faulty op: " + op);
|
|
@@ -609,6 +347,326 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read a block from the disk
|
|
|
+ * @param in The stream to read from
|
|
|
+ * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void readBlock(DataInputStream in, byte op) throws IOException {
|
|
|
+ //
|
|
|
+ // Read in the header
|
|
|
+ //
|
|
|
+ Block b = new Block();
|
|
|
+ b.readFields(in);
|
|
|
+
|
|
|
+ long toSkip = 0;
|
|
|
+ if (op == OP_READSKIP_BLOCK) {
|
|
|
+ toSkip = in.readLong();
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Open reply stream
|
|
|
+ //
|
|
|
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
|
|
|
+ try {
|
|
|
+ //
|
|
|
+ // Write filelen of -1 if error
|
|
|
+ //
|
|
|
+ if (! data.isValidBlock(b)) {
|
|
|
+ out.writeLong(-1);
|
|
|
+ } else {
|
|
|
+ //
|
|
|
+ // Get blockdata from disk
|
|
|
+ //
|
|
|
+ long len = data.getLength(b);
|
|
|
+ DataInputStream in2 = new DataInputStream(data.getBlockData(b));
|
|
|
+ out.writeLong(len);
|
|
|
+
|
|
|
+ if (op == OP_READSKIP_BLOCK) {
|
|
|
+ if (toSkip > len) {
|
|
|
+ toSkip = len;
|
|
|
+ }
|
|
|
+ long amtSkipped = 0;
|
|
|
+ try {
|
|
|
+ amtSkipped = in2.skip(toSkip);
|
|
|
+ } catch (IOException iex) {
|
|
|
+ shutdown();
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+ out.writeLong(amtSkipped);
|
|
|
+ }
|
|
|
+
|
|
|
+ byte buf[] = new byte[BUFFER_SIZE];
|
|
|
+ try {
|
|
|
+ int bytesRead = 0;
|
|
|
+ try {
|
|
|
+ bytesRead = in2.read(buf);
|
|
|
+ } catch (IOException iex) {
|
|
|
+ shutdown();
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+ while (bytesRead >= 0) {
|
|
|
+ out.write(buf, 0, bytesRead);
|
|
|
+ len -= bytesRead;
|
|
|
+ try {
|
|
|
+ bytesRead = in2.read(buf);
|
|
|
+ } catch (IOException iex) {
|
|
|
+ shutdown();
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (SocketException se) {
|
|
|
+ // This might be because the reader
|
|
|
+ // closed the stream early
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ in2.close();
|
|
|
+ } catch (IOException iex) {
|
|
|
+ shutdown();
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Served block " + b + " to " + s.getInetAddress());
|
|
|
+ } finally {
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Write a block to disk.
|
|
|
+ * @param in The stream to read from
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void writeBlock(DataInputStream in) throws IOException {
|
|
|
+ //
|
|
|
+ // Read in the header
|
|
|
+ //
|
|
|
+ DataOutputStream reply =
|
|
|
+ new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
|
|
|
+ try {
|
|
|
+ boolean shouldReportBlock = in.readBoolean();
|
|
|
+ Block b = new Block();
|
|
|
+ b.readFields(in);
|
|
|
+ int numTargets = in.readInt();
|
|
|
+ if (numTargets <= 0) {
|
|
|
+ throw new IOException("Mislabelled incoming datastream.");
|
|
|
+ }
|
|
|
+ DatanodeInfo targets[] = new DatanodeInfo[numTargets];
|
|
|
+ for (int i = 0; i < targets.length; i++) {
|
|
|
+ DatanodeInfo tmp = new DatanodeInfo();
|
|
|
+ tmp.readFields(in);
|
|
|
+ targets[i] = tmp;
|
|
|
+ }
|
|
|
+ byte encodingType = (byte) in.read();
|
|
|
+ long len = in.readLong();
|
|
|
+
|
|
|
+ //
|
|
|
+ // Make sure curTarget is equal to this machine
|
|
|
+ //
|
|
|
+ DatanodeInfo curTarget = targets[0];
|
|
|
+
|
|
|
+ //
|
|
|
+ // Track all the places we've successfully written the block
|
|
|
+ //
|
|
|
+ Vector mirrors = new Vector();
|
|
|
+
|
|
|
+ //
|
|
|
+ // Open local disk out
|
|
|
+ //
|
|
|
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
|
|
|
+ InetSocketAddress mirrorTarget = null;
|
|
|
+ String mirrorNode = null;
|
|
|
+ try {
|
|
|
+ //
|
|
|
+ // Open network conn to backup machine, if
|
|
|
+ // appropriate
|
|
|
+ //
|
|
|
+ DataInputStream in2 = null;
|
|
|
+ DataOutputStream out2 = null;
|
|
|
+ if (targets.length > 1) {
|
|
|
+ // Connect to backup machine
|
|
|
+ mirrorNode = targets[1].getName().toString();
|
|
|
+ mirrorTarget = createSocketAddr(mirrorNode);
|
|
|
+ try {
|
|
|
+ Socket s2 = new Socket();
|
|
|
+ s2.connect(mirrorTarget, READ_TIMEOUT);
|
|
|
+ s2.setSoTimeout(READ_TIMEOUT);
|
|
|
+ out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
|
|
|
+ in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
|
|
|
+
|
|
|
+ // Write connection header
|
|
|
+ out2.write(OP_WRITE_BLOCK);
|
|
|
+ out2.writeBoolean(shouldReportBlock);
|
|
|
+ b.write(out2);
|
|
|
+ out2.writeInt(targets.length - 1);
|
|
|
+ for (int i = 1; i < targets.length; i++) {
|
|
|
+ targets[i].write(out2);
|
|
|
+ }
|
|
|
+ out2.write(encodingType);
|
|
|
+ out2.writeLong(len);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ if (out2 != null) {
|
|
|
+ LOG.info("Exception connecting to mirror " + mirrorNode
|
|
|
+ + "\n" + stringifyException(ie));
|
|
|
+ try {
|
|
|
+ out2.close();
|
|
|
+ in2.close();
|
|
|
+ } catch (IOException out2close) {
|
|
|
+ } finally {
|
|
|
+ out2 = null;
|
|
|
+ in2 = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Process incoming data, copy to disk and
|
|
|
+ // maybe to network.
|
|
|
+ //
|
|
|
+ boolean anotherChunk = len != 0;
|
|
|
+ byte buf[] = new byte[BUFFER_SIZE];
|
|
|
+
|
|
|
+ while (anotherChunk) {
|
|
|
+ while (len > 0) {
|
|
|
+ int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
|
|
|
+ if (bytesRead < 0) {
|
|
|
+ throw new EOFException("EOF reading from "+s.toString());
|
|
|
+ }
|
|
|
+ if (bytesRead > 0) {
|
|
|
+ try {
|
|
|
+ out.write(buf, 0, bytesRead);
|
|
|
+ } catch (IOException iex) {
|
|
|
+ shutdown();
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+ if (out2 != null) {
|
|
|
+ try {
|
|
|
+ out2.write(buf, 0, bytesRead);
|
|
|
+ } catch (IOException out2e) {
|
|
|
+ LOG.info("Exception writing to mirror " + mirrorNode
|
|
|
+ + "\n" + stringifyException(out2e));
|
|
|
+ //
|
|
|
+ // If stream-copy fails, continue
|
|
|
+ // writing to disk. We shouldn't
|
|
|
+ // interrupt client write.
|
|
|
+ //
|
|
|
+ try {
|
|
|
+ out2.close();
|
|
|
+ in2.close();
|
|
|
+ } catch (IOException out2close) {
|
|
|
+ } finally {
|
|
|
+ out2 = null;
|
|
|
+ in2 = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ len -= bytesRead;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (encodingType == RUNLENGTH_ENCODING) {
|
|
|
+ anotherChunk = false;
|
|
|
+ } else if (encodingType == CHUNKED_ENCODING) {
|
|
|
+ len = in.readLong();
|
|
|
+ if (out2 != null) {
|
|
|
+ try {
|
|
|
+ out2.writeLong(len);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.info("Exception writing to mirror " + mirrorNode
|
|
|
+ + "\n" + stringifyException(ie));
|
|
|
+ try {
|
|
|
+ out2.close();
|
|
|
+ in2.close();
|
|
|
+ } catch (IOException ie2) {
|
|
|
+ // NOTHING
|
|
|
+ } finally {
|
|
|
+ out2 = null;
|
|
|
+ in2 = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (len == 0) {
|
|
|
+ anotherChunk = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (out2 != null) {
|
|
|
+ try {
|
|
|
+ out2.flush();
|
|
|
+ long complete = in2.readLong();
|
|
|
+ if (complete != WRITE_COMPLETE) {
|
|
|
+ LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
|
|
|
+ }
|
|
|
+ LocatedBlock newLB = new LocatedBlock();
|
|
|
+ newLB.readFields(in2);
|
|
|
+ in2.close();
|
|
|
+ out2.close();
|
|
|
+ DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
|
|
|
+ for (int k = 0; k < mirrorsSoFar.length; k++) {
|
|
|
+ mirrors.add(mirrorsSoFar[k]);
|
|
|
+ }
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.info("Exception writing to mirror " + mirrorNode
|
|
|
+ + "\n" + stringifyException(ie));
|
|
|
+ try {
|
|
|
+ out2.close();
|
|
|
+ in2.close();
|
|
|
+ } catch (IOException ie2) {
|
|
|
+ // NOTHING
|
|
|
+ } finally {
|
|
|
+ out2 = null;
|
|
|
+ in2 = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (out2 == null) {
|
|
|
+ LOG.info("Received block " + b + " from " +
|
|
|
+ s.getInetAddress());
|
|
|
+ } else {
|
|
|
+ LOG.info("Received block " + b + " from " +
|
|
|
+ s.getInetAddress() +
|
|
|
+ " and mirrored to " + mirrorTarget);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ out.close();
|
|
|
+ } catch (IOException iex) {
|
|
|
+ shutdown();
|
|
|
+ throw iex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ data.finalizeBlock(b);
|
|
|
+
|
|
|
+ //
|
|
|
+ // Tell the namenode that we've received this block
|
|
|
+ // in full, if we've been asked to. This is done
|
|
|
+ // during NameNode-directed block transfers, but not
|
|
|
+ // client writes.
|
|
|
+ //
|
|
|
+ if (shouldReportBlock) {
|
|
|
+ synchronized (receivedBlockList) {
|
|
|
+ receivedBlockList.add(b);
|
|
|
+ receivedBlockList.notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Tell client job is done, and reply with
|
|
|
+ // the new LocatedBlock.
|
|
|
+ //
|
|
|
+ reply.writeLong(WRITE_COMPLETE);
|
|
|
+ mirrors.add(curTarget);
|
|
|
+ LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
|
|
|
+ newLB.write(reply);
|
|
|
+ } finally {
|
|
|
+ reply.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|