123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- */
- #include "tools_common.h"
- namespace hdfs {
- std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout) {
- hdfs::Options options;
- //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
- hdfs::ConfigurationLoader loader;
- //Loading default config files core-site.xml and hdfs-site.xml from the config path
- hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
- //TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
- if(config){
- //Loading options from the config
- options = config->GetOptions();
- }
- if(max_timeout){
- //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
- options.rpc_timeout = std::numeric_limits<int>::max();
- }
- IoService * io_service = IoService::New();
- //Wrapping fs into a shared pointer to guarantee deletion
- std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
- if (!fs) {
- std::cerr << "Could not create FileSystem object. " << std::endl;
- exit(EXIT_FAILURE);
- }
- Status status;
- //Check if the user supplied the host
- if(!uri.get_host().empty()){
- //If port is supplied we use it, otherwise we use the empty string so that it will be looked up in configs.
- std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : "";
- status = fs->Connect(uri.get_host(), port);
- if (!status.ok()) {
- std::cerr << "Could not connect to " << uri.get_host() << ":" << port << ". " << status.ToString() << std::endl;
- exit(EXIT_FAILURE);
- }
- } else {
- status = fs->ConnectToDefaultFs();
- if (!status.ok()) {
- if(!options.defaultFS.get_host().empty()){
- std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
- } else {
- std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
- }
- exit(EXIT_FAILURE);
- }
- }
- return fs;
- }
- #define BUF_SIZE 1048576 //1 MB
- static char input_buffer[BUF_SIZE];
- void readFile(std::shared_ptr<hdfs::FileSystem> fs, std::string path, off_t offset, std::FILE* dst_file, bool to_delete) {
- ssize_t total_bytes_read = 0;
- size_t last_bytes_read = 0;
- hdfs::FileHandle *file_raw = nullptr;
- hdfs::Status status = fs->Open(path, &file_raw);
- if (!status.ok()) {
- std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl;
- exit(EXIT_FAILURE);
- }
- //wrapping file_raw into a unique pointer to guarantee deletion
- std::unique_ptr<hdfs::FileHandle> file(file_raw);
- do{
- //Reading file chunks
- status = file->PositionRead(input_buffer, sizeof(input_buffer), offset, &last_bytes_read);
- if(status.ok()) {
- //Writing file chunks to stdout
- fwrite(input_buffer, last_bytes_read, 1, dst_file);
- total_bytes_read += last_bytes_read;
- offset += last_bytes_read;
- } else {
- if(status.is_invalid_offset()){
- //Reached the end of the file
- if(to_delete) {
- //Deleting the file (recursive set to false)
- hdfs::Status status = fs->Delete(path, false);
- if (!status.ok()) {
- std::cerr << "Error deleting the source file: " << path
- << " " << status.ToString() << std::endl;
- exit(EXIT_FAILURE);
- }
- }
- break;
- } else {
- std::cerr << "Error reading the file: " << status.ToString() << std::endl;
- exit(EXIT_FAILURE);
- }
- }
- } while (last_bytes_read > 0);
- return;
- }
- }
|