Browse Source

HDFS-7017. Implement OutputStream for libhdfs3. Contributed by Zhanwei Wang.

Haohui Mai 10 years ago
parent
commit
4bacfb5e71

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystem.cc

@@ -180,8 +180,7 @@ Status FileSystem::connect(const std::string &uri, const std::string &username,
                            const std::string &token) {
     AuthMethod auth;
     std::string principal;
-
-    THROW(HdfsIOException, "FileSystem: already connected.");
+    CHECK_PARAMETER(!impl, EIO, "FileSystem: already connected.");
 
     try {
         SessionConfig sconf(*conf.impl);

+ 5 - 19
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.cc

@@ -97,7 +97,7 @@ static const std::string CanonicalizePath(const std::string &path) {
 FileSystemImpl::FileSystemImpl(const FileSystemKey &key, const Config &c)
     : conf(c),
       key(key),
-      openedOutputStream(0),
+      leaseRenewer(this),
       nn(NULL),
       sconf(*c.impl),
       user(key.getUser()) {
@@ -715,19 +715,13 @@ bool FileSystemImpl::getListing(const std::string &src,
     return nn->getListing(src, startAfter, needLocation, dl);
 }
 
-bool FileSystemImpl::renewLease() {
+void FileSystemImpl::renewLease() {
     if (!nn) {
         THROW(HdfsIOException, "FileSystemImpl: not connected.");
     }
 
-    // protected by LeaseRenewer's lock
-    if (0 == openedOutputStream) {
-        return false;
-    }
-
     try {
         nn->renewLease(clientName);
-        return true;
     } catch (const HdfsException &e) {
         LOG(LOG_ERROR,
             "Failed to renew lease for filesystem which client name "
@@ -739,22 +733,14 @@ bool FileSystemImpl::renewLease() {
             "%s, since:\n%s",
             getClientName(), e.what());
     }
-
-    return false;
 }
 
 void FileSystemImpl::registerOpenedOutputStream() {
-    // protected by LeaseRenewer's lock
-    ++openedOutputStream;
+    leaseRenewer.StartRenew();
 }
 
-bool FileSystemImpl::unregisterOpenedOutputStream() {
-    // protected by LeaseRenewer's lock
-    if (openedOutputStream > 0) {
-        --openedOutputStream;
-    }
-
-    return openedOutputStream == 0;
+void FileSystemImpl::unregisterOpenedOutputStream() {
+    leaseRenewer.StopRenew();
 }
 }
 }

+ 4 - 5
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/FileSystemImpl.h

@@ -25,6 +25,7 @@
 #include "FileStatus.h"
 #include "FileSystemKey.h"
 #include "FileSystemStats.h"
+#include "LeaseRenewer.h"
 #include "server/LocatedBlocks.h"
 #include "server/Namenode.h"
 #include "SessionConfig.h"
@@ -416,7 +417,7 @@ public:
     /**
      * unregister the output stream from filespace when it is closed.
      */
-    bool unregisterOpenedOutputStream();
+    void unregisterOpenedOutputStream();
 
     /**
      * Get the configuration used in filesystem.
@@ -448,10 +449,8 @@ public:
 
     /**
      * To renew the lease.
-     *
-     * @return return false if the filesystem no long needs to renew lease.
      */
-    bool renewLease();
+    void renewLease();
 
 private:
     FileSystemImpl(const FileSystemImpl &other);
@@ -459,7 +458,7 @@ private:
 
     Config conf;
     FileSystemKey key;
-    int openedOutputStream;
+    LeaseRenewer leaseRenewer;
     mutex mutWorkingDir;
     Namenode *nn;
     SessionConfig sconf;

+ 130 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LeaseRenewer.cc

@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemImpl.h"
+#include "LeaseRenewer.h"
+#include "Logger.h"
+
+#include <string>
+
+#define DEFAULT_LEASE_RENEW_INTERVAL (60 * 1000)
+
+using std::map;
+using std::string;
+
+namespace hdfs {
+namespace internal {
+
+LeaseRenewer::LeaseRenewer()
+    : stop(true),
+      filesystem(NULL),
+      interval(DEFAULT_LEASE_RENEW_INTERVAL),
+      openedOutputStream(0) {
+}
+
+LeaseRenewer::LeaseRenewer(FileSystemImpl *fs)
+    : stop(true),
+      filesystem(fs),
+      interval(DEFAULT_LEASE_RENEW_INTERVAL),
+      openedOutputStream(0) {
+}
+
+LeaseRenewer::~LeaseRenewer() {
+    stop = true;
+    cond.notify_all();
+
+    if (worker.joinable()) {
+        worker.join();
+    }
+}
+
+int LeaseRenewer::getInterval() const {
+    return interval;
+}
+
+void LeaseRenewer::setInterval(int interval) {
+    this->interval = interval;
+}
+
+void LeaseRenewer::StartRenew() {
+    lock_guard<mutex> lock(mut);
+
+    ++openedOutputStream;
+
+    if (stop && openedOutputStream > 0) {
+        if (worker.joinable()) {
+            worker.join();
+        }
+
+        stop = false;
+        CREATE_THREAD(worker, bind(&LeaseRenewer::renewer, this));
+    }
+}
+
+void LeaseRenewer::StopRenew() {
+    lock_guard<mutex> lock(mut);
+    assert(openedOutputStream > 0);
+    openedOutputStream -= 1;
+    /*
+     * do not exit lease renew thread immediately if openedOutputStream is 0,
+     * it will idle at most "interval" milliseconds before exit.
+     */
+}
+
+void LeaseRenewer::renewer() {
+    assert(stop == false);
+
+    while (!stop) {
+        try {
+            unique_lock<mutex> lock(mut);
+            cond.wait_for(lock, milliseconds(interval));
+
+            if (stop || openedOutputStream <= 0) {
+                break;
+            }
+
+            filesystem->renewLease();
+            continue;
+        } catch (const std::bad_alloc &e) {
+            try {
+                LOG(LOG_ERROR,
+                    "Lease renewer will exit caused by out of memory");
+            } catch (...) {
+                /*
+                 * We really can do nothing more here,
+                 * ignore error and prevent the process from dying in background
+                 * thread.
+                 */
+            }
+
+            break;
+        } catch (const std::exception &e) {
+            LOG(LOG_ERROR,
+                "Lease renewer will exit since unexpected exception: %s",
+                e.what());
+            break;
+        }
+    }
+
+    stop = true;
+}
+}
+}

+ 60 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LeaseRenewer.h

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_
+#define _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_
+
+#include "Atomic.h"
+#include "DateTime.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+
+#include <map>
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class FileSystemImpl;
+
+class LeaseRenewer {
+public:
+    LeaseRenewer(FileSystemImpl *fs);
+    ~LeaseRenewer();
+    int getInterval() const;
+    void setInterval(int interval);
+    void StartRenew();
+    void StopRenew();
+
+private:
+    LeaseRenewer();  // for unit test
+    LeaseRenewer(const LeaseRenewer &other);
+    LeaseRenewer &operator=(const LeaseRenewer &other);
+    void renewer();
+
+    atomic<bool> stop;
+    condition_variable cond;
+    FileSystemImpl *filesystem;
+    int interval;
+    int openedOutputStream;
+    mutex mut;
+    thread worker;
+};
+}
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_ */

+ 106 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/OutputStream.cc

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Atomic.h"
+#include "FileSystemImpl.h"
+#include "OutputStream.h"
+#include "OutputStreamImpl.h"
+#include "SharedPtr.h"
+#include "StatusInternal.h"
+
+#include <stdint.h>
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+
+OutputStream::OutputStream() {
+    impl = new internal::OutputStreamImpl;
+}
+
+OutputStream::~OutputStream() {
+    delete impl;
+}
+
+Status OutputStream::open(FileSystem &fs, const std::string &path, int flag,
+                          const Permission permission, bool createParent,
+                          int replication, int64_t blockSize) {
+    CHECK_PARAMETER(fs.impl, EIO, "FileSystem: not connected.");
+
+    try {
+        impl->open(fs.impl, path.c_str(), flag, permission, createParent,
+                   replication, blockSize);
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status OutputStream::append(const char *buf, uint32_t size) {
+    try {
+        impl->append(buf, size);
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status OutputStream::flush() {
+    try {
+        impl->flush();
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status OutputStream::tell(int64_t *output) {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->tell();
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status OutputStream::sync() {
+    try {
+        impl->sync();
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status OutputStream::close() {
+    try {
+        impl->close();
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+}

+ 137 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/OutputStream.h

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_
+#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_
+
+#include "FileSystem.h"
+#include "Status.h"
+
+namespace hdfs {
+
+namespace internal {
+class OutputStreamImpl;
+}
+
+/**
+ * Use the CreateFlag as follows:
+ * <ol>
+ * <li> CREATE - to create a file if it does not exist,
+ * else throw FileAlreadyExists.</li>
+ * <li> APPEND - to append to a file if it exists,
+ * else throw FileNotFoundException.</li>
+ * <li> OVERWRITE - to truncate a file if it exists,
+ * else throw FileNotFoundException.</li>
+ * <li> CREATE|APPEND - to create a file if it does not exist,
+ * else append to an existing file.</li>
+ * <li> CREATE|OVERWRITE - to create a file if it does not exist,
+ * else overwrite an existing file.</li>
+ * <li> SyncBlock - to force closed blocks to the disk device.
+ * In addition {@link OutputStream::sync()} should be called after each write,
+ * if true synchronous behavior is required.</li>
+ * </ol>
+ *
+ * Following combination is not valid and will result in
+ * {@link InvalidParameter}:
+ * <ol>
+ * <li> APPEND|OVERWRITE</li>
+ * <li> CREATE|APPEND|OVERWRITE</li>
+ * </ol>
+ */
+enum CreateFlag {
+    Create = 0x01,
+    Overwrite = 0x02,
+    Append = 0x04,
+    SyncBlock = 0x08
+};
+
+using hdfs::internal::shared_ptr;
+
+/**
+ * A output stream used to write data to hdfs.
+ */
+class OutputStream {
+public:
+    /**
+     * Construct a new OutputStream.
+     */
+    OutputStream();
+    /**
+     * Destroy an OutputStream instance.
+     */
+    ~OutputStream();
+
+    /**
+     * To create or append a file.
+     * @param fs hdfs file system.
+     * @param path the file path.
+     * @param flag creation flag, can be Create, Append or Create|Overwrite.
+     * @param permission create a new file with given permission.
+     * @param createParent if the parent does not exist, create it.
+     * @param replication create a file with given number of replication.
+     * @param blockSize  create a file with given block size.
+     * @return the result status of this operation
+     */
+    Status open(FileSystem &fs, const std::string &path, int flag = Create,
+                const Permission permission = Permission(0644),
+                bool createParent = false, int replication = 0,
+                int64_t blockSize = 0);
+
+    /**
+     * To append data to file.
+     * @param buf the data used to append.
+     * @param size the data size.
+     * @return the result status of this operation
+     */
+    Status append(const char *buf, uint32_t size);
+
+    /**
+     * Flush all data in buffer and waiting for ack.
+     * Will block until get all acks.
+     * @return the result status of this operation
+     */
+    Status flush();
+
+    /**
+     * Flush all data in buffer and let the Datanode sync the data.
+     * @return the result status of this operation
+     */
+    Status sync();
+
+    /**
+     * return the current file length.
+     * @param output the pointer of the output parameter
+     * @return the result status of this operation
+     */
+    Status tell(int64_t *output);
+
+    /**
+     * close the stream.
+     * @return the result status of this operation
+     */
+    Status close();
+
+private:
+    OutputStream(const OutputStream &other);
+    OutputStream &operator=(const OutputStream &other);
+
+    internal::OutputStreamImpl *impl;
+};
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_ */

+ 542 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/OutputStreamImpl.cc

@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Atomic.h"
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemImpl.h"
+#include "HWCrc32c.h"
+#include "LeaseRenewer.h"
+#include "Logger.h"
+#include "OutputStream.h"
+#include "OutputStreamImpl.h"
+#include "Packet.h"
+#include "PacketHeader.h"
+#include "SWCrc32c.h"
+
+#include <cassert>
+#include <inttypes.h>
+
+using std::string;
+
+namespace hdfs {
+namespace internal {
+
+OutputStreamImpl::OutputStreamImpl()
+    : closed(true),
+      isAppend(false),
+      syncBlock(false),
+      checksumSize(0),
+      chunkSize(0),
+      chunksPerPacket(0),
+      closeTimeout(0),
+      heartBeatInterval(0),
+      packetSize(0),
+      position(0),
+      replication(0),
+      blockSize(0),
+      bytesWritten(0),
+      cursor(0),
+      lastFlushed(0),
+      nextSeqNo(0) {
+    if (HWCrc32c::available()) {
+        checksum = shared_ptr<Checksum>(new HWCrc32c());
+    } else {
+        checksum = shared_ptr<Checksum>(new SWCrc32c());
+    }
+
+    checksumSize = sizeof(int32_t);
+    lastSend = steady_clock::now();
+#ifdef MOCK
+    stub = NULL;
+#endif
+}
+
+OutputStreamImpl::~OutputStreamImpl() {
+    if (!closed) {
+        try {
+            close();
+        } catch (...) {
+        }
+    }
+}
+
+void OutputStreamImpl::checkStatus() {
+    if (closed) {
+        THROW(HdfsIOException, "OutputStreamImpl: stream is not opened.");
+    }
+    lock_guard<mutex> lock(mut);
+
+    if (lastError != exception_ptr()) {
+        rethrow_exception(lastError);
+    }
+}
+
+void OutputStreamImpl::setError(const exception_ptr &error) {
+    try {
+        lock_guard<mutex> lock(mut);
+        lastError = error;
+    } catch (...) {
+    }
+}
+
+void OutputStreamImpl::open(shared_ptr<FileSystemImpl> fs, const char *path,
+                            int flag, const Permission &permission,
+                            bool createParent, int replication,
+                            int64_t blockSize) {
+    if (NULL == path || 0 == strlen(path) || replication < 0 || blockSize < 0) {
+        THROW(InvalidParameter, "Invalid parameter.");
+    }
+
+    if (!(flag == Create || flag == (Create | SyncBlock) || flag == Overwrite ||
+          flag == (Overwrite | SyncBlock) || flag == Append ||
+          flag == (Append | SyncBlock) || flag == (Create | Overwrite) ||
+          flag == (Create | Overwrite | SyncBlock) ||
+          flag == (Create | Append) || flag == (Create | Append | SyncBlock))) {
+        THROW(InvalidParameter, "Invalid flag.");
+    }
+
+    try {
+        openInternal(fs, path, flag, permission, createParent, replication,
+                     blockSize);
+    } catch (...) {
+        reset();
+        throw;
+    }
+}
+
+void OutputStreamImpl::computePacketChunkSize() {
+    int chunkSizeWithChecksum = chunkSize + checksumSize;
+    static const int packetHeaderSize = PacketHeader::GetPkgHeaderSize();
+    chunksPerPacket =
+        (packetSize - packetHeaderSize + chunkSizeWithChecksum - 1) /
+        chunkSizeWithChecksum;
+    chunksPerPacket = chunksPerPacket > 1 ? chunksPerPacket : 1;
+    packetSize = chunksPerPacket * chunkSizeWithChecksum + packetHeaderSize;
+    buffer.resize(chunkSize);
+}
+
+void OutputStreamImpl::initAppend() {
+    FileStatus fileInfo;
+    fileInfo = filesystem->getFileStatus(this->path.c_str());
+    lastBlock = filesystem->append(this->path);
+    closed = false;
+
+    try {
+        this->blockSize = fileInfo.getBlockSize();
+        cursor = fileInfo.getLength();
+
+        if (lastBlock) {
+            isAppend = true;
+            bytesWritten = lastBlock->getNumBytes();
+            int64_t usedInLastBlock = fileInfo.getLength() % blockSize;
+            int64_t freeInLastBlock = blockSize - usedInLastBlock;
+
+            if (freeInLastBlock == this->blockSize) {
+                THROW(HdfsIOException,
+                      "OutputStreamImpl: the last block for file %s is full.",
+                      this->path.c_str());
+            }
+
+            int usedInCksum = cursor % chunkSize;
+            int freeInCksum = chunkSize - usedInCksum;
+
+            if (usedInCksum > 0 && freeInCksum > 0) {
+                /*
+                 * if there is space in the last partial chunk, then
+                 * setup in such a way that the next packet will have only
+                 * one chunk that fills up the partial chunk.
+                 */
+                packetSize = 0;
+                chunkSize = freeInCksum;
+            } else {
+                /*
+                 * if the remaining space in the block is smaller than
+                 * that expected size of of a packet, then create
+                 * smaller size packet.
+                 */
+                packetSize = packetSize < freeInLastBlock
+                                 ? packetSize
+                                 : static_cast<int>(freeInLastBlock);
+            }
+        }
+    } catch (...) {
+        reset();
+        throw;
+    }
+
+    computePacketChunkSize();
+}
+
+void OutputStreamImpl::openInternal(shared_ptr<FileSystemImpl> fs,
+                                    const char *path, int flag,
+                                    const Permission &permission,
+                                    bool createParent, int replication,
+                                    int64_t blockSize) {
+    filesystem = fs;
+    this->path = fs->getStandardPath(path);
+    this->replication = replication;
+    this->blockSize = blockSize;
+    syncBlock = flag & SyncBlock;
+    conf = shared_ptr<SessionConfig>(new SessionConfig(fs->getConf()));
+    LOG(DEBUG2, "open file %s for %s", this->path.c_str(),
+        (flag & Append ? "append" : "write"));
+
+    if (0 == replication) {
+        this->replication = conf->getDefaultReplica();
+    } else {
+        this->replication = replication;
+    }
+
+    if (0 == blockSize) {
+        this->blockSize = conf->getDefaultBlockSize();
+    } else {
+        this->blockSize = blockSize;
+    }
+
+    chunkSize = conf->getDefaultChunkSize();
+    packetSize = conf->getDefaultPacketSize();
+    heartBeatInterval = conf->getHeartBeatInterval();
+    closeTimeout = conf->getCloseFileTimeout();
+
+    if (packetSize < chunkSize) {
+        THROW(InvalidParameter,
+              "OutputStreamImpl: packet size %d is less than the "
+              "chunk size %d.",
+              packetSize, chunkSize);
+    }
+
+    if (0 != this->blockSize % chunkSize) {
+        THROW(InvalidParameter, "OutputStreamImpl: block size %" PRId64
+                                " is not a multiple of chunk size %d.",
+              this->blockSize, chunkSize);
+    }
+
+    try {
+        if (flag & Append) {
+            initAppend();
+            filesystem->registerOpenedOutputStream();
+            return;
+        }
+    } catch (const FileNotFoundException &e) {
+        if (!(flag & Create)) {
+            throw;
+        }
+    }
+
+    assert((flag & Create) || (flag & Overwrite));
+    fs->create(this->path, permission, flag, createParent, this->replication,
+               this->blockSize);
+    closed = false;
+    computePacketChunkSize();
+    filesystem->registerOpenedOutputStream();
+}
+
+void OutputStreamImpl::append(const char *buf, uint32_t size) {
+    if (NULL == buf) {
+        THROW(InvalidParameter, "Invalid parameter.");
+    }
+
+    checkStatus();
+
+    try {
+        appendInternal(buf, size);
+    } catch (...) {
+        setError(current_exception());
+        throw;
+    }
+}
+
+void OutputStreamImpl::appendInternal(const char *buf, uint32_t size) {
+    int64_t todo = size;
+
+    while (todo > 0) {
+        int batch = buffer.size() - position;
+        batch = batch < todo ? batch : static_cast<int>(todo);
+
+        /*
+         * bypass buffer.
+         */
+        if (0 == position && todo >= static_cast<int64_t>(buffer.size())) {
+            checksum->update(buf + size - todo, batch);
+            appendChunkToPacket(buf + size - todo, batch);
+            bytesWritten += batch;
+            checksum->reset();
+        } else {
+            checksum->update(buf + size - todo, batch);
+            memcpy(&buffer[position], buf + size - todo, batch);
+            position += batch;
+
+            if (position == static_cast<int>(buffer.size())) {
+                appendChunkToPacket(&buffer[0], buffer.size());
+                bytesWritten += buffer.size();
+                checksum->reset();
+                position = 0;
+            }
+        }
+
+        todo -= batch;
+
+        if (currentPacket &&
+            (currentPacket->isFull() || bytesWritten == blockSize)) {
+            sendPacket(currentPacket);
+
+            if (isAppend) {
+                isAppend = false;
+                chunkSize = conf->getDefaultChunkSize();
+                packetSize = conf->getDefaultPacketSize();
+                computePacketChunkSize();
+            }
+
+            if (bytesWritten == blockSize) {
+                closePipeline();
+            }
+        }
+    }
+
+    cursor += size;
+}
+
+void OutputStreamImpl::appendChunkToPacket(const char *buf, int size) {
+    assert(NULL != buf && size > 0);
+
+    if (!currentPacket) {
+        currentPacket = shared_ptr<Packet>(
+            new Packet(packetSize, chunksPerPacket, bytesWritten, nextSeqNo++,
+                       checksumSize));
+    }
+
+    currentPacket->addChecksum(checksum->getValue());
+    currentPacket->addData(buf, size);
+    currentPacket->increaseNumChunks();
+}
+
+void OutputStreamImpl::sendPacket(shared_ptr<Packet> packet) {
+    if (!pipeline) {
+        setupPipeline();
+    }
+
+    pipeline->send(currentPacket);
+    currentPacket.reset();
+    lastSend = steady_clock::now();
+}
+
+void OutputStreamImpl::setupPipeline() {
+    assert(currentPacket);
+#ifdef MOCK
+    pipeline = stub->getPipeline();
+#else
+    pipeline = shared_ptr<Pipeline>(new Pipeline(
+        isAppend, path.c_str(), *conf, filesystem, CHECKSUM_TYPE_CRC32C,
+        conf->getDefaultChunkSize(), replication,
+        currentPacket->getOffsetInBlock(), lastBlock));
+#endif
+    lastSend = steady_clock::now();
+}
+
+void OutputStreamImpl::flush() {
+    LOG(DEBUG3, "flush file %s at offset %" PRId64, path.c_str(), cursor);
+    checkStatus();
+
+    try {
+        flushInternal(false);
+    } catch (...) {
+        setError(current_exception());
+        throw;
+    }
+}
+
+void OutputStreamImpl::flushInternal(bool needSync) {
+    if (lastFlushed == cursor && !needSync) {
+        return;
+    } else {
+        lastFlushed = cursor;
+    }
+
+    if (position > 0) {
+        appendChunkToPacket(&buffer[0], position);
+    }
+
+    /*
+     * if the pipeline and currentPacket are both NULL,
+     * that means the pipeline has been closed and no more data in
+     * buffer/packet.
+     * already synced when closing pipeline.
+     */
+    if (!currentPacket && needSync && pipeline) {
+        currentPacket = shared_ptr<Packet>(
+            new Packet(packetSize, chunksPerPacket, bytesWritten, nextSeqNo++,
+                       checksumSize));
+    }
+
+    lock_guard<mutex> lock(mut);
+
+    if (currentPacket) {
+        currentPacket->setSyncFlag(needSync);
+        sendPacket(currentPacket);
+    }
+
+    if (pipeline) {
+        pipeline->flush();
+    }
+}
+
+int64_t OutputStreamImpl::tell() {
+    checkStatus();
+    return cursor;
+}
+
+void OutputStreamImpl::sync() {
+    LOG(DEBUG3, "sync file %s at offset %" PRId64, path.c_str(), cursor);
+    checkStatus();
+
+    try {
+        flushInternal(true);
+    } catch (...) {
+        setError(current_exception());
+        throw;
+    }
+}
+
+void OutputStreamImpl::completeFile() {
+    steady_clock::time_point start = steady_clock::now();
+
+    while (true) {
+        try {
+            bool success;
+            success = filesystem->complete(path, lastBlock.get());
+
+            if (success) {
+                return;
+            }
+        } catch (HdfsIOException &e) {
+            NESTED_THROW(HdfsIOException,
+                         "OutputStreamImpl: failed to complete file %s.",
+                         path.c_str());
+        }
+        if (closeTimeout > 0) {
+            steady_clock::time_point end = steady_clock::now();
+
+            if (ToMilliSeconds(start, end) >= closeTimeout) {
+                THROW(HdfsIOException,
+                      "OutputStreamImpl: timeout when complete file %s, "
+                      "timeout interval %d ms.",
+                      path.c_str(), closeTimeout);
+            }
+        }
+        try {
+            sleep_for(milliseconds(400));
+        } catch (...) {
+        }
+    }
+}
+
+void OutputStreamImpl::closePipeline() {
+    lock_guard<mutex> lock(mut);
+    if (!pipeline) {
+        return;
+    }
+    if (currentPacket) {
+        sendPacket(currentPacket);
+    }
+    currentPacket = shared_ptr<Packet>(new Packet(
+        packetSize, chunksPerPacket, bytesWritten, nextSeqNo++, checksumSize));
+    if (syncBlock) {
+        currentPacket->setSyncFlag(syncBlock);
+    }
+    lastBlock = pipeline->close(currentPacket);
+    assert(lastBlock);
+    currentPacket.reset();
+    pipeline.reset();
+    filesystem->fsync(path);
+    bytesWritten = 0;
+}
+
+void OutputStreamImpl::close() {
+    exception_ptr e;
+
+    if (closed) {
+        return;
+    }
+
+    try {
+        // pipeline may be broken
+        if (!lastError) {
+            if (lastFlushed != cursor && position > 0) {
+                appendChunkToPacket(&buffer[0], position);
+            }
+
+            if (lastFlushed != cursor && currentPacket) {
+                sendPacket(currentPacket);
+            }
+
+            closePipeline();
+            completeFile();
+        }
+    } catch (...) {
+        e = current_exception();
+        LOG(LOG_ERROR, "OutputStreamImpl: failed to close file %s, %s",
+            path.c_str(), GetExceptionDetail(e));
+    }
+
+    filesystem->unregisterOpenedOutputStream();
+    LOG(DEBUG3, "close file %s for write with length %" PRId64, path.c_str(),
+        cursor);
+    reset();
+
+    if (e) {
+        rethrow_exception(e);
+    }
+}
+
+void OutputStreamImpl::reset() {
+    blockSize = 0;
+    bytesWritten = 0;
+    checksum->reset();
+    chunkSize = 0;
+    chunksPerPacket = 0;
+    closed = true;
+    closeTimeout = 0;
+    conf.reset();
+    currentPacket.reset();
+    cursor = 0;
+    filesystem.reset();
+    heartBeatInterval = 0;
+    isAppend = false;
+    lastBlock.reset();
+    lastError = exception_ptr();
+    lastFlushed = 0;
+    nextSeqNo = 0;
+    packetSize = 0;
+    path.clear();
+    pipeline.reset();
+    position = 0;
+    replication = 0;
+    syncBlock = false;
+}
+
+std::string OutputStreamImpl::toString() {
+    if (path.empty()) {
+        return string("OutputStream for path ") + path;
+    } else {
+        return string("OutputStream (not opened)");
+    }
+}
+}
+}

+ 167 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/OutputStreamImpl.h

@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
+#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
+
+#include "Atomic.h"
+#include "Checksum.h"
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "FileSystem.h"
+#include "Permission.h"
+#include "Pipeline.h"
+#include "SessionConfig.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "server/LocatedBlock.h"
+#ifdef MOCK
+#include "PipelineStub.h"
+#endif
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * An output stream used to write data to hdfs.
+ */
+class OutputStreamImpl {
+public:
+    OutputStreamImpl();
+
+    ~OutputStreamImpl();
+
+    /**
+     * To create or append a file.
+     * @param fs hdfs file system.
+     * @param path the file path.
+     * @param flag creation flag, can be Create, Append or Create|Overwrite.
+     * @param permission create a new file with given permission.
+     * @param createParent if the parent does not exist, create it.
+     * @param replication create a file with given number of replication.
+     * @param blockSize  create a file with given block size.
+     */
+    void open(shared_ptr<FileSystemImpl> fs, const char *path, int flag,
+              const Permission &permission, bool createParent, int replication,
+              int64_t blockSize);
+
+    /**
+     * To append data to file.
+     * @param buf the data used to append.
+     * @param size the data size.
+     */
+    void append(const char *buf, uint32_t size);
+
+    /**
+     * Flush all data in buffer and waiting for ack.
+     * Will block until get all acks.
+     */
+    void flush();
+
+    /**
+     * return the current file length.
+     * @return current file length.
+     */
+    int64_t tell();
+
+    /**
+     * Flush all data in buffer and let the Datanode sync the data.
+     */
+    void sync();
+
+    /**
+     * close the stream.
+     */
+    void close();
+
+    /**
+     * Output a readable string of this output stream.
+     */
+    std::string toString();
+
+    /**
+     * Keep the last error of this stream.
+     * @error the error to be kept.
+     */
+    void setError(const exception_ptr &error);
+
+private:
+    void appendChunkToPacket(const char *buf, int size);
+    void appendInternal(const char *buf, uint32_t size);
+    void checkStatus();
+    void closePipeline();
+    void completeFile();
+    void computePacketChunkSize();
+    void flushInternal(bool needSync);
+    // void heartBeatSenderRoutine();
+    void initAppend();
+    void openInternal(shared_ptr<FileSystemImpl> fs, const char *path, int flag,
+                      const Permission &permission, bool createParent,
+                      int replication, int64_t blockSize);
+    void reset();
+    void sendPacket(shared_ptr<Packet> packet);
+    void setupPipeline();
+
+private:
+    OutputStreamImpl(const OutputStreamImpl &other);
+    OutputStreamImpl &operator=(const OutputStreamImpl &other);
+
+    // atomic<bool> heartBeatStop;
+    bool closed;
+    bool isAppend;
+    bool syncBlock;
+    // condition_variable condHeartBeatSender;
+    exception_ptr lastError;
+    int checksumSize;
+    int chunkSize;
+    int chunksPerPacket;
+    int closeTimeout;
+    int heartBeatInterval;
+    int packetSize;
+    int position;  // cursor in buffer
+    int replication;
+    int64_t blockSize;  // max size of block
+
+    // The size of bytes to be written into the message.  This does not include
+    // the data in the chunk buffer.
+    int64_t bytesWritten;
+
+    int64_t cursor;       // cursor in file.
+    int64_t lastFlushed;  // the position last flushed
+    int64_t nextSeqNo;
+    mutex mut;
+    shared_ptr<Checksum> checksum;
+    shared_ptr<FileSystemImpl> filesystem;
+    shared_ptr<LocatedBlock> lastBlock;
+    shared_ptr<Packet> currentPacket;
+    shared_ptr<Pipeline> pipeline;
+    shared_ptr<SessionConfig> conf;
+    std::string path;
+    std::vector<char> buffer;
+    steady_clock::time_point lastSend;
+// thread heartBeatSender;
+
+#ifdef MOCK
+private:
+    Hdfs::Mock::PipelineStub *stub;
+#endif
+};
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ */

+ 840 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Pipeline.cc

@@ -0,0 +1,840 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DateTime.h"
+#include "Pipeline.h"
+#include "Logger.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemImpl.h"
+#include "DataTransferProtocolSender.h"
+#include "datatransfer.pb.h"
+
+#include <inttypes.h>
+#include <vector>
+
+using std::vector;
+
+using namespace ::hadoop::common;
+using namespace ::hadoop::hdfs;
+
+namespace hdfs {
+namespace internal {
+
+Pipeline::Pipeline(bool append, const char *path, const SessionConfig &conf,
+                   shared_ptr<FileSystemImpl> filesystem, int checksumType,
+                   int chunkSize, int replication, int64_t bytesSent,
+                   shared_ptr<LocatedBlock> lastBlock)
+    : checksumType(checksumType),
+      chunkSize(chunkSize),
+      errorIndex(-1),
+      replication(replication),
+      bytesAcked(bytesSent),
+      bytesSent(bytesSent),
+      filesystem(filesystem),
+      lastBlock(lastBlock),
+      path(path) {
+    canAddDatanode = conf.canAddDatanode();
+    blockWriteRetry = conf.getBlockWriteRetry();
+    connectTimeout = conf.getOutputConnTimeout();
+    readTimeout = conf.getOutputReadTimeout();
+    writeTimeout = conf.getOutputWriteTimeout();
+    clientName = filesystem->getClientName();
+
+    if (append) {
+        LOG(DEBUG2,
+            "create pipeline for file %s to append to %s at "
+            "position %" PRId64,
+            path, lastBlock->toString().c_str(), lastBlock->getNumBytes());
+        stage = PIPELINE_SETUP_APPEND;
+        assert(lastBlock);
+        nodes = lastBlock->getLocations();
+        storageIDs = lastBlock->getStorageIDs();
+        buildForAppendOrRecovery(false);
+        stage = DATA_STREAMING;
+    } else {
+        LOG(DEBUG2, "create pipeline for file %s to write to a new block",
+            path);
+        stage = PIPELINE_SETUP_CREATE;
+        buildForNewBlock();
+        stage = DATA_STREAMING;
+    }
+}
+
+int Pipeline::findNewDatanode(const std::vector<DatanodeInfo> &original) {
+    if (nodes.size() != original.size() + 1) {
+        THROW(HdfsIOException,
+              "Failed to acquire a datanode for block "
+              "%s from namenode.",
+              lastBlock->toString().c_str());
+    }
+
+    for (size_t i = 0; i < nodes.size(); i++) {
+        size_t j = 0;
+
+        for (; j < original.size() && !(nodes[i] == original[j]); j++)
+            ;
+
+        if (j == original.size()) {
+            return i;
+        }
+    }
+
+    THROW(HdfsIOException, "Cannot add new datanode for block %s.",
+          lastBlock->toString().c_str());
+}
+
+void Pipeline::transfer(const ExtendedBlock &blk, const DatanodeInfo &src,
+                        const vector<DatanodeInfo> &targets,
+                        const Token &token) {
+    shared_ptr<Socket> so(new TcpSocketImpl);
+    shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so));
+    so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout);
+    DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress());
+    sender.transferBlock(blk, token, clientName.c_str(), targets);
+    int size;
+    size = in->readVarint32(readTimeout);
+    std::vector<char> buf(size);
+    in->readFully(&buf[0], size, readTimeout);
+    BlockOpResponseProto resp;
+
+    if (!resp.ParseFromArray(&buf[0], size)) {
+        THROW(HdfsIOException,
+              "cannot parse datanode response from %s "
+              "for block %s.",
+              src.formatAddress().c_str(), lastBlock->toString().c_str());
+    }
+
+    if (::hadoop::hdfs::Status::SUCCESS != resp.status()) {
+        THROW(HdfsIOException,
+              "Failed to transfer block to a new datanode "
+              "%s for block %s.",
+              targets[0].formatAddress().c_str(),
+              lastBlock->toString().c_str());
+    }
+}
+
+bool Pipeline::addDatanodeToPipeline(
+    const vector<DatanodeInfo> &excludedNodes) {
+    try {
+        /*
+         * get a new datanode
+         */
+        std::vector<DatanodeInfo> original = nodes;
+        shared_ptr<LocatedBlock> lb;
+        lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes,
+                                               storageIDs, excludedNodes, 1);
+        nodes = lb->getLocations();
+        storageIDs = lb->getStorageIDs();
+
+        /*
+         * failed to add new datanode into pipeline.
+         */
+        if (original.size() == nodes.size()) {
+            LOG(LOG_ERROR,
+                "Failed to add new datanode into pipeline for block: %s "
+                "file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+        } else {
+            /*
+             * find the new datanode
+             */
+            int d = findNewDatanode(original);
+            /*
+             * in case transfer block fail.
+             */
+            errorIndex = d;
+            /*
+             * transfer replica
+             */
+            DatanodeInfo &src = d == 0 ? nodes[1] : nodes[d - 1];
+            std::vector<DatanodeInfo> targets;
+            targets.push_back(nodes[d]);
+            LOG(INFO, "Replicate block %s from %s to %s for file %s.",
+                lastBlock->toString().c_str(), src.formatAddress().c_str(),
+                targets[0].formatAddress().c_str(), path.c_str());
+            transfer(*lastBlock, src, targets, lb->getToken());
+            errorIndex = -1;
+            return true;
+        }
+    } catch (const HdfsCanceled &e) {
+        throw;
+    } catch (const HdfsFileSystemClosed &e) {
+        throw;
+    } catch (const SafeModeException &e) {
+        throw;
+    } catch (const HdfsException &e) {
+        LOG(LOG_ERROR,
+            "Failed to add a new datanode into pipeline for block: "
+            "%s file %s.\n%s",
+            lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e));
+    }
+
+    return false;
+}
+
+void Pipeline::checkPipelineWithReplicas() {
+    if (static_cast<int>(nodes.size()) < replication) {
+        std::stringstream ss;
+        int size = nodes.size();
+
+        for (int i = 0; i < size - 1; ++i) {
+            ss << nodes[i].formatAddress() << ", ";
+        }
+
+        if (nodes.empty()) {
+            ss << "Empty";
+        } else {
+            ss << nodes.back().formatAddress();
+        }
+
+        LOG(WARNING,
+            "the number of nodes in pipeline is %d [%s], is less than the "
+            "expected number of replica %d for block %s file %s",
+            static_cast<int>(nodes.size()), ss.str().c_str(), replication,
+            lastBlock->toString().c_str(), path.c_str());
+    }
+}
+
+void Pipeline::buildForAppendOrRecovery(bool recovery) {
+    int64_t gs = 0;
+    int retry = blockWriteRetry;
+    exception_ptr lastException;
+    std::vector<DatanodeInfo> excludedNodes;
+    shared_ptr<LocatedBlock> lb;
+
+    do {
+        /*
+         * Remove bad datanode from list of datanodes.
+         * If errorIndex was not set (i.e. appends), then do not remove
+         * any datanodes
+         */
+        if (errorIndex >= 0) {
+            assert(lastBlock);
+            LOG(LOG_ERROR,
+                "Pipeline: node %s is invalid and removed from "
+                "pipeline when %s block %s for file %s, stage = %s.",
+                nodes[errorIndex].formatAddress().c_str(),
+                (recovery ? "recovery" : "append to"),
+                lastBlock->toString().c_str(), path.c_str(),
+                StageToString(stage));
+            excludedNodes.push_back(nodes[errorIndex]);
+            nodes.erase(nodes.begin() + errorIndex);
+
+            if (!storageIDs.empty()) {
+                storageIDs.erase(storageIDs.begin() + errorIndex);
+            }
+
+            if (nodes.empty()) {
+                THROW(HdfsIOException,
+                      "Build pipeline to %s block %s failed: all datanodes "
+                      "are bad.",
+                      (recovery ? "recovery" : "append to"),
+                      lastBlock->toString().c_str());
+            }
+
+            errorIndex = -1;
+        }
+
+        try {
+            gs = 0;
+
+            /*
+             * Check if the number of datanodes in pipeline satisfy the
+             * replication requirement,
+             * add new datanode if not
+             */
+            if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE &&
+                static_cast<int>(nodes.size()) < replication &&
+                canAddDatanode) {
+                if (!addDatanodeToPipeline(excludedNodes)) {
+                    THROW(HdfsIOException,
+                          "Failed to add new datanode "
+                          "to pipeline for block: %s file %s, set "
+                          "\"output.replace-datanode-on-failure\" to "
+                          "\"false\" to disable this feature.",
+                          lastBlock->toString().c_str(), path.c_str());
+                }
+            }
+
+            if (errorIndex >= 0) {
+                continue;
+            }
+
+            checkPipelineWithReplicas();
+            /*
+             * Update generation stamp and access token
+             */
+            lb = filesystem->updateBlockForPipeline(*lastBlock);
+            gs = lb->getGenerationStamp();
+            /*
+             * Try to build pipeline
+             */
+            createBlockOutputStream(lb->getToken(), gs, recovery);
+            /*
+             * everything is ok, reset errorIndex.
+             */
+            errorIndex = -1;
+            lastException = exception_ptr();
+            break;  // break on success
+        } catch (const HdfsInvalidBlockToken &e) {
+            lastException = current_exception();
+            recovery = true;
+            LOG(LOG_ERROR,
+                "Pipeline: Failed to build pipeline for block %s file %s, "
+                "new generation stamp is %" PRId64 ",\n%s",
+                lastBlock->toString().c_str(), path.c_str(), gs,
+                GetExceptionDetail(e));
+            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+        } catch (const HdfsTimeoutException &e) {
+            lastException = current_exception();
+            recovery = true;
+            LOG(LOG_ERROR,
+                "Pipeline: Failed to build pipeline for block %s file %s, "
+                "new generation stamp is %" PRId64 ",\n%s",
+                lastBlock->toString().c_str(), path.c_str(), gs,
+                GetExceptionDetail(e));
+            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+        } catch (const HdfsIOException &e) {
+            lastException = current_exception();
+            /*
+             * Set recovery flag to true in case of failed to create a pipeline
+             * for appending.
+             */
+            recovery = true;
+            LOG(LOG_ERROR,
+                "Pipeline: Failed to build pipeline for block %s file %s, "
+                "new generation stamp is %" PRId64 ",\n%s",
+                lastBlock->toString().c_str(), path.c_str(), gs,
+                GetExceptionDetail(e));
+            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+        }
+
+        /*
+         * we don't known what happened, no datanode is reported failure, reduce
+         * retry count in case infinite loop.
+         * it may caused by rpc call throw HdfsIOException
+         */
+        if (errorIndex < 0) {
+            --retry;
+        }
+    } while (retry > 0);
+
+    if (lastException) {
+        rethrow_exception(lastException);
+    }
+
+    /*
+     * Update pipeline at the namenode, non-idempotent RPC call.
+     */
+    lb->setPoolId(lastBlock->getPoolId());
+    lb->setBlockId(lastBlock->getBlockId());
+    lb->setLocations(nodes);
+    lb->setNumBytes(lastBlock->getNumBytes());
+    lb->setOffset(lastBlock->getOffset());
+    filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs);
+    lastBlock = lb;
+}
+
+void Pipeline::locateNextBlock(const std::vector<DatanodeInfo> &excludedNodes) {
+    milliseconds sleeptime(400);
+    int retry = blockWriteRetry;
+
+    while (true) {
+        try {
+            lastBlock =
+                filesystem->addBlock(path, lastBlock.get(), excludedNodes);
+            assert(lastBlock);
+            return;
+        } catch (const NotReplicatedYetException &e) {
+            LOG(DEBUG1,
+                "Got NotReplicatedYetException when try to addBlock "
+                "for block %s, already retry %d times, max retry %d times",
+                lastBlock->toString().c_str(), blockWriteRetry - retry,
+                blockWriteRetry);
+
+            if (retry--) {
+                try {
+                    sleep_for(sleeptime);
+                } catch (...) {
+                }
+
+                sleeptime *= 2;
+            } else {
+                throw;
+            }
+        }
+    }
+}
+
+static std::string FormatExcludedNodes(
+    const std::vector<DatanodeInfo> &excludedNodes) {
+    std::stringstream ss;
+    ss << "[";
+    int size = excludedNodes.size();
+
+    for (int i = 0; i < size - 1; ++i) {
+        ss << excludedNodes[i].formatAddress() << ", ";
+    }
+
+    if (excludedNodes.empty()) {
+        ss << "Empty";
+    } else {
+        ss << excludedNodes.back().formatAddress();
+    }
+
+    ss << "]";
+    return ss.str();
+}
+
+void Pipeline::buildForNewBlock() {
+    int retryAllocNewBlock = 0, retry = blockWriteRetry;
+    LocatedBlock lb;
+    std::vector<DatanodeInfo> excludedNodes;
+    shared_ptr<LocatedBlock> block = lastBlock;
+
+    do {
+        errorIndex = -1;
+        lastBlock = block;
+
+        try {
+            locateNextBlock(excludedNodes);
+            lastBlock->setNumBytes(0);
+            nodes = lastBlock->getLocations();
+        } catch (const HdfsRpcException &e) {
+            const char *lastBlockName =
+                lastBlock ? lastBlock->toString().c_str() : "Null";
+            LOG(LOG_ERROR,
+                "Failed to allocate a new empty block for file %s, last "
+                "block %s, excluded nodes %s.\n%s",
+                path.c_str(), lastBlockName,
+                FormatExcludedNodes(excludedNodes).c_str(),
+                GetExceptionDetail(e));
+
+            if (retryAllocNewBlock > blockWriteRetry) {
+                throw;
+            }
+
+            LOG(INFO,
+                "Retry to allocate a new empty block for file %s, last "
+                "block %s, excluded nodes %s.",
+                path.c_str(), lastBlockName,
+                FormatExcludedNodes(excludedNodes).c_str());
+            ++retryAllocNewBlock;
+            continue;
+        } catch (const HdfsException &e) {
+            const char *lastBlockName =
+                lastBlock ? lastBlock->toString().c_str() : "Null";
+            LOG(LOG_ERROR,
+                "Failed to allocate a new empty block for file %s, "
+                "last block %s, excluded nodes %s.\n%s",
+                path.c_str(), lastBlockName,
+                FormatExcludedNodes(excludedNodes).c_str(),
+                GetExceptionDetail(e));
+            throw;
+        }
+
+        retryAllocNewBlock = 0;
+        checkPipelineWithReplicas();
+
+        if (nodes.empty()) {
+            THROW(HdfsIOException,
+                  "No datanode is available to create a pipeline for "
+                  "block %s file %s.",
+                  lastBlock->toString().c_str(), path.c_str());
+        }
+
+        try {
+            createBlockOutputStream(lastBlock->getToken(), 0, false);
+            break;  // break on success
+        } catch (const HdfsInvalidBlockToken &e) {
+            LOG(LOG_ERROR,
+                "Failed to setup the pipeline for new block %s file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(),
+                GetExceptionDetail(e));
+        } catch (const HdfsTimeoutException &e) {
+            LOG(LOG_ERROR,
+                "Failed to setup the pipeline for new block %s file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(),
+                GetExceptionDetail(e));
+        } catch (const HdfsIOException &e) {
+            LOG(LOG_ERROR,
+                "Failed to setup the pipeline for new block %s file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(),
+                GetExceptionDetail(e));
+        }
+
+        LOG(INFO, "Abandoning block: %s for file %s.",
+            lastBlock->toString().c_str(), path.c_str());
+
+        try {
+            filesystem->abandonBlock(*lastBlock, path);
+        } catch (const HdfsException &e) {
+            LOG(LOG_ERROR,
+                "Failed to abandon useless block %s for file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(),
+                GetExceptionDetail(e));
+            throw;
+        }
+
+        if (errorIndex >= 0) {
+            LOG(INFO,
+                "Excluding invalid datanode: %s for block %s for "
+                "file %s",
+                nodes[errorIndex].formatAddress().c_str(),
+                lastBlock->toString().c_str(), path.c_str());
+            excludedNodes.push_back(nodes[errorIndex]);
+        } else {
+            /*
+             * we don't known what happened, no datanode is reported failure,
+             * reduce retry count in case of infinite loop.
+             */
+            --retry;
+        }
+    } while (retry);
+}
+
+/*
+ * bad link node must be either empty or a "IP:PORT"
+ */
+void Pipeline::checkBadLinkFormat(const std::string &n) {
+    std::string node = n;
+
+    if (node.empty()) {
+        return;
+    }
+
+    do {
+        const char *host = &node[0], *port;
+        size_t pos = node.find_last_of(":");
+
+        if (pos == node.npos || pos + 1 == node.length()) {
+            break;
+        }
+
+        node[pos] = 0;
+        port = &node[pos + 1];
+        struct addrinfo hints, *addrs;
+        memset(&hints, 0, sizeof(hints));
+        hints.ai_family = PF_UNSPEC;
+        hints.ai_socktype = SOCK_STREAM;
+        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
+        int p;
+        char *end;
+        p = strtol(port, &end, 0);
+
+        if (p >= 65536 || p <= 0 || end != port + strlen(port)) {
+            break;
+        }
+
+        if (getaddrinfo(host, port, &hints, &addrs)) {
+            break;
+        }
+
+        freeaddrinfo(addrs);
+        return;
+    } while (0);
+
+    LOG(FATAL, "Cannot parser the firstBadLink string %s", n.c_str());
+    THROW(HdfsException, "Cannot parse the firstBadLink string %s.", n.c_str());
+}
+
+void Pipeline::createBlockOutputStream(const Token &token, int64_t gs,
+                                       bool recovery) {
+    std::string firstBadLink;
+    exception_ptr lastError;
+    bool needWrapException = true;
+
+    try {
+        sock = shared_ptr<Socket>(new TcpSocketImpl);
+        reader = shared_ptr<BufferedSocketReader>(
+            new BufferedSocketReaderImpl(*sock));
+        sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(),
+                      connectTimeout);
+        std::vector<DatanodeInfo> targets;
+
+        for (size_t i = 1; i < nodes.size(); ++i) {
+            targets.push_back(nodes[i]);
+        }
+
+        DataTransferProtocolSender sender(*sock, writeTimeout,
+                                          nodes[0].formatAddress());
+        sender.writeBlock(*lastBlock, token, clientName.c_str(), targets,
+                          (recovery ? (stage | 0x1) : stage), nodes.size(),
+                          lastBlock->getNumBytes(), bytesSent, gs, checksumType,
+                          chunkSize);
+        int size;
+        size = reader->readVarint32(readTimeout);
+        std::vector<char> buf(size);
+        reader->readFully(&buf[0], size, readTimeout);
+        BlockOpResponseProto resp;
+
+        if (!resp.ParseFromArray(&buf[0], size)) {
+            THROW(HdfsIOException,
+                  "cannot parse datanode response from %s "
+                  "for block %s.",
+                  nodes[0].formatAddress().c_str(),
+                  lastBlock->toString().c_str());
+        }
+
+        ::hadoop::hdfs::Status pipelineStatus = resp.status();
+        firstBadLink = resp.firstbadlink();
+
+        if (::hadoop::hdfs::Status::SUCCESS != pipelineStatus) {
+            needWrapException = false;
+
+            if (::hadoop::hdfs::Status::ERROR_ACCESS_TOKEN == pipelineStatus) {
+                THROW(HdfsInvalidBlockToken,
+                      "Got access token error for connect ack with "
+                      "firstBadLink as %s for block %s",
+                      firstBadLink.c_str(), lastBlock->toString().c_str());
+            } else {
+                THROW(HdfsIOException,
+                      "Bad connect ack with firstBadLink "
+                      "as %s for block %s",
+                      firstBadLink.c_str(), lastBlock->toString().c_str());
+            }
+        }
+
+        return;
+    } catch (...) {
+        errorIndex = 0;
+        lastError = current_exception();
+    }
+
+    checkBadLinkFormat(firstBadLink);
+
+    if (!firstBadLink.empty()) {
+        for (size_t i = 0; i < nodes.size(); ++i) {
+            if (nodes[i].getXferAddr() == firstBadLink) {
+                errorIndex = i;
+                break;
+            }
+        }
+    }
+
+    assert(lastError);
+
+    if (!needWrapException) {
+        rethrow_exception(lastError);
+    }
+
+    try {
+        rethrow_exception(lastError);
+    } catch (const HdfsException &e) {
+        NESTED_THROW(HdfsIOException,
+                     "Cannot create block output stream for block %s, "
+                     "recovery flag: %s, with last generate stamp %" PRId64 ".",
+                     lastBlock->toString().c_str(),
+                     (recovery ? "true" : "false"), gs);
+    }
+}
+
+void Pipeline::resend() {
+    assert(stage != PIPELINE_CLOSE);
+
+    for (size_t i = 0; i < packets.size(); ++i) {
+        ConstPacketBuffer b = packets[i]->getBuffer();
+        sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout);
+        int64_t tmp = packets[i]->getLastByteOffsetBlock();
+        bytesSent = bytesSent > tmp ? bytesSent : tmp;
+    }
+}
+
+void Pipeline::send(shared_ptr<Packet> packet) {
+    ConstPacketBuffer buffer = packet->getBuffer();
+
+    if (!packet->isHeartbeat()) {
+        packets.push_back(packet);
+    }
+
+    bool failover = false;
+
+    do {
+        try {
+            if (failover) {
+                resend();
+            } else {
+                assert(sock);
+                sock->writeFully(buffer.getBuffer(), buffer.getSize(),
+                                 writeTimeout);
+                int64_t tmp = packet->getLastByteOffsetBlock();
+                bytesSent = bytesSent > tmp ? bytesSent : tmp;
+            }
+
+            checkResponse(false);
+            return;
+        } catch (const HdfsIOException &e) {
+            if (errorIndex < 0) {
+                errorIndex = 0;
+            }
+
+            sock.reset();
+        }
+
+        buildForAppendOrRecovery(true);
+        failover = true;
+
+        if (stage == PIPELINE_CLOSE) {
+            assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
+            packets.clear();
+            break;
+        }
+    } while (true);
+}
+
+void Pipeline::processAck(PipelineAck &ack) {
+    assert(!ack.isInvalid());
+    int64_t seqno = ack.getSeqno();
+
+    if (HEART_BEAT_SEQNO == seqno) {
+        return;
+    }
+
+    assert(!packets.empty());
+    Packet &packet = *packets[0];
+
+    if (ack.isSuccess()) {
+        if (packet.getSeqno() != seqno) {
+            THROW(HdfsIOException,
+                  "processAck: pipeline ack expecting seqno %" PRId64
+                  "  but received %" PRId64 " for block %s.",
+                  packet.getSeqno(), seqno, lastBlock->toString().c_str());
+        }
+
+        int64_t tmp = packet.getLastByteOffsetBlock();
+        bytesAcked = tmp > bytesAcked ? tmp : bytesAcked;
+        assert(lastBlock);
+        lastBlock->setNumBytes(bytesAcked);
+
+        if (packet.isLastPacketInBlock()) {
+            sock.reset();
+        }
+
+        packets[0].reset();
+        packets.pop_front();
+    } else {
+        for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) {
+            if (::hadoop::hdfs::Status::SUCCESS != ack.getReply(i)) {
+                errorIndex = i;
+                /*
+                 * handle block token expiry the same as HdfsIOException.
+                 */
+                THROW(HdfsIOException,
+                      "processAck: ack report error at node: %s for block %s.",
+                      nodes[i].formatAddress().c_str(),
+                      lastBlock->toString().c_str());
+            }
+        }
+    }
+}
+
+void Pipeline::processResponse() {
+    PipelineAck ack;
+    std::vector<char> buf;
+    int size = reader->readVarint32(readTimeout);
+    ack.reset();
+    buf.resize(size);
+    reader->readFully(&buf[0], size, readTimeout);
+    ack.readFrom(&buf[0], size);
+
+    if (ack.isInvalid()) {
+        THROW(HdfsIOException,
+              "processAllAcks: get an invalid DataStreamer packet ack "
+              "for block %s",
+              lastBlock->toString().c_str());
+    }
+
+    processAck(ack);
+}
+
+void Pipeline::checkResponse(bool wait) {
+    int timeout = wait ? readTimeout : 0;
+    bool readable = reader->poll(timeout);
+
+    if (readable) {
+        processResponse();
+    } else if (wait) {
+        THROW(HdfsIOException,
+              "Timed out reading response from datanode "
+              "%s for block %s.",
+              nodes[0].formatAddress().c_str(), lastBlock->toString().c_str());
+    }
+}
+
+void Pipeline::flush() {
+    waitForAcks();
+}
+
+void Pipeline::waitForAcks() {
+    bool failover = false;
+
+    while (!packets.empty()) {
+        try {
+            if (failover) {
+                resend();
+            }
+            checkResponse(true);
+            failover = false;
+        } catch (const HdfsIOException &e) {
+            if (errorIndex < 0) {
+                errorIndex = 0;
+            }
+
+            LOG(LOG_ERROR,
+                "Failed to flush pipeline on datanode %s for block "
+                "%s file %s.\n%s",
+                nodes[errorIndex].formatAddress().c_str(),
+                lastBlock->toString().c_str(), path.c_str(),
+                GetExceptionDetail(e));
+            LOG(INFO, "Rebuild pipeline to flush for block %s file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+            sock.reset();
+            failover = true;
+        }
+
+        if (failover) {
+            buildForAppendOrRecovery(true);
+
+            if (stage == PIPELINE_CLOSE) {
+                assert(packets.size() == 1 &&
+                       packets[0]->isLastPacketInBlock());
+                packets.clear();
+                break;
+            }
+        }
+    }
+}
+
+shared_ptr<LocatedBlock> Pipeline::close(shared_ptr<Packet> lastPacket) {
+    waitForAcks();
+    lastPacket->setLastPacketInBlock(true);
+    stage = PIPELINE_CLOSE;
+    send(lastPacket);
+    waitForAcks();
+    sock.reset();
+    lastBlock->setNumBytes(bytesAcked);
+    LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64,
+        path.c_str(), lastBlock->toString().c_str(), lastBlock->getNumBytes());
+    return lastBlock;
+}
+}
+}

+ 169 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Pipeline.h

@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_
+#define _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_
+
+#include "Packet.h"
+#include "PipelineAck.h"
+#include "SessionConfig.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+#include "server/DatanodeInfo.h"
+#include "server/LocatedBlock.h"
+#include "server/Namenode.h"
+
+#include <deque>
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+class FileSystemImpl;
+
+enum BlockConstructionStage {
+    /**
+     * The enumerates are always listed as regular stage followed by the
+     * recovery stage.
+     * Changing this order will make getRecoveryStage not working.
+     */
+    // pipeline set up for block append
+    PIPELINE_SETUP_APPEND = 0,
+    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+    PIPELINE_SETUP_APPEND_RECOVERY = 1,
+    // data streaming
+    DATA_STREAMING = 2,
+    // pipeline setup for failed data streaming recovery
+    PIPELINE_SETUP_STREAMING_RECOVERY = 3,
+    // close the block and pipeline
+    PIPELINE_CLOSE = 4,
+    // Recover a failed PIPELINE_CLOSE
+    PIPELINE_CLOSE_RECOVERY = 5,
+    // pipeline set up for block creation
+    PIPELINE_SETUP_CREATE = 6
+};
+
+static inline const char *StageToString(BlockConstructionStage stage) {
+    switch (stage) {
+        case PIPELINE_SETUP_APPEND:
+            return "PIPELINE_SETUP_APPEND";
+
+        case PIPELINE_SETUP_APPEND_RECOVERY:
+            return "PIPELINE_SETUP_APPEND_RECOVERY";
+
+        case DATA_STREAMING:
+            return "DATA_STREAMING";
+
+        case PIPELINE_SETUP_STREAMING_RECOVERY:
+            return "PIPELINE_SETUP_STREAMING_RECOVERY";
+
+        case PIPELINE_CLOSE:
+            return "PIPELINE_CLOSE";
+
+        case PIPELINE_CLOSE_RECOVERY:
+            return "PIPELINE_CLOSE_RECOVERY";
+
+        case PIPELINE_SETUP_CREATE:
+            return "PIPELINE_SETUP_CREATE";
+
+        default:
+            return "UNKNOWN STAGE";
+    }
+}
+
+class Packet;
+class OutputStreamImpl;
+
+/**
+ * setup, data transfer, close, and failover.
+ */
+class Pipeline {
+public:
+    /**
+     * construct and setup the pipeline for append.
+     */
+    Pipeline(bool append, const char *path, const SessionConfig &conf,
+             shared_ptr<FileSystemImpl> filesystem, int checksumType,
+             int chunkSize, int replication, int64_t bytesSent,
+             shared_ptr<LocatedBlock> lastBlock);
+
+    /**
+     * send all data and wait for all ack.
+     */
+    void flush();
+
+    /**
+     * send LastPacket and close the pipeline.
+     */
+    shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket);
+
+    /**
+     * send a packet, retry on error until fatal.
+     * @param packet
+     */
+    void send(shared_ptr<Packet> packet);
+
+private:
+    bool addDatanodeToPipeline(const std::vector<DatanodeInfo> &excludedNodes);
+    void buildForAppendOrRecovery(bool recovery);
+    void buildForNewBlock();
+    void checkPipelineWithReplicas();
+    void checkResponse(bool wait);
+    void createBlockOutputStream(const Token &token, int64_t gs, bool recovery);
+    void locateNextBlock(const std::vector<DatanodeInfo> &excludedNodes);
+    void processAck(PipelineAck &ack);
+    void processResponse();
+    void resend();
+    void waitForAcks();
+    void transfer(const ExtendedBlock &blk, const DatanodeInfo &src,
+                  const std::vector<DatanodeInfo> &targets, const Token &token);
+    int findNewDatanode(const std::vector<DatanodeInfo> &original);
+    static void checkBadLinkFormat(const std::string &node);
+
+private:
+    Pipeline(const Pipeline &other);
+    Pipeline &operator=(const Pipeline &other);
+
+    BlockConstructionStage stage;
+    bool canAddDatanode;
+    int blockWriteRetry;
+    int checksumType;
+    int chunkSize;
+    int connectTimeout;
+    int errorIndex;
+    int readTimeout;
+    int replication;
+    int writeTimeout;
+    int64_t bytesAcked;  // the size of bytes the ack received.
+    int64_t bytesSent;   // the size of bytes has sent.
+    shared_ptr<BufferedSocketReader> reader;
+    shared_ptr<FileSystemImpl> filesystem;
+    shared_ptr<LocatedBlock> lastBlock;
+    shared_ptr<Socket> sock;
+    std::deque<shared_ptr<Packet>> packets;
+    std::string clientName;
+    std::string path;
+    std::vector<DatanodeInfo> nodes;
+    std::vector<std::string> storageIDs;
+};
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ */

+ 80 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PipelineAck.h

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_
+#define _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_
+
+#include "datatransfer.pb.h"
+
+namespace hdfs {
+namespace internal {
+
+class PipelineAck {
+public:
+    PipelineAck() : invalid(true) {
+    }
+
+    PipelineAck(const char *buf, int size) : invalid(false) {
+        readFrom(buf, size);
+    }
+
+    bool isInvalid() {
+        return invalid;
+    }
+
+    int getNumOfReplies() {
+        return proto.status_size();
+    }
+
+    int64_t getSeqno() {
+        return proto.seqno();
+    }
+
+    ::hadoop::hdfs::Status getReply(int i) {
+        return proto.status(i);
+    }
+
+    bool isSuccess() {
+        int size = proto.status_size();
+
+        for (int i = 0; i < size; ++i) {
+            if (::hadoop::hdfs::Status::SUCCESS != proto.status(i)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    void readFrom(const char *buf, int size) {
+        invalid = !proto.ParseFromArray(buf, size);
+    }
+
+    void reset() {
+        proto.Clear();
+        invalid = true;
+    }
+
+private:
+    ::hadoop::hdfs::PipelineAckProto proto;
+    bool invalid;
+};
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_ */