1
0

tools_common.cc 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. /*
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing,
  11. software distributed under the License is distributed on an
  12. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. KIND, either express or implied. See the License for the
  14. specific language governing permissions and limitations
  15. under the License.
  16. */
  17. #include "tools_common.h"
  18. namespace hdfs {
  19. std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout) {
  20. hdfs::Options options;
  21. //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
  22. hdfs::ConfigurationLoader loader;
  23. //Loading default config files core-site.xml and hdfs-site.xml from the config path
  24. hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
  25. //TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
  26. if(config){
  27. //Loading options from the config
  28. options = config->GetOptions();
  29. }
  30. if(max_timeout){
  31. //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
  32. options.rpc_timeout = std::numeric_limits<int>::max();
  33. }
  34. IoService * io_service = IoService::New();
  35. //Wrapping fs into a shared pointer to guarantee deletion
  36. std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
  37. if (!fs) {
  38. std::cerr << "Could not create FileSystem object. " << std::endl;
  39. exit(EXIT_FAILURE);
  40. }
  41. Status status;
  42. //Check if the user supplied the host
  43. if(!uri.get_host().empty()){
  44. //If port is supplied we use it, otherwise we use the empty string so that it will be looked up in configs.
  45. std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : "";
  46. status = fs->Connect(uri.get_host(), port);
  47. if (!status.ok()) {
  48. std::cerr << "Could not connect to " << uri.get_host() << ":" << port << ". " << status.ToString() << std::endl;
  49. exit(EXIT_FAILURE);
  50. }
  51. } else {
  52. status = fs->ConnectToDefaultFs();
  53. if (!status.ok()) {
  54. if(!options.defaultFS.get_host().empty()){
  55. std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
  56. } else {
  57. std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
  58. }
  59. exit(EXIT_FAILURE);
  60. }
  61. }
  62. return fs;
  63. }
  64. #define BUF_SIZE 1048576 //1 MB
  65. static char input_buffer[BUF_SIZE];
  66. void readFile(std::shared_ptr<hdfs::FileSystem> fs, std::string path, off_t offset, std::FILE* dst_file, bool to_delete) {
  67. ssize_t total_bytes_read = 0;
  68. size_t last_bytes_read = 0;
  69. hdfs::FileHandle *file_raw = nullptr;
  70. hdfs::Status status = fs->Open(path, &file_raw);
  71. if (!status.ok()) {
  72. std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl;
  73. exit(EXIT_FAILURE);
  74. }
  75. //wrapping file_raw into a unique pointer to guarantee deletion
  76. std::unique_ptr<hdfs::FileHandle> file(file_raw);
  77. do{
  78. //Reading file chunks
  79. status = file->PositionRead(input_buffer, sizeof(input_buffer), offset, &last_bytes_read);
  80. if(status.ok()) {
  81. //Writing file chunks to stdout
  82. fwrite(input_buffer, last_bytes_read, 1, dst_file);
  83. total_bytes_read += last_bytes_read;
  84. offset += last_bytes_read;
  85. } else {
  86. if(status.is_invalid_offset()){
  87. //Reached the end of the file
  88. if(to_delete) {
  89. //Deleting the file (recursive set to false)
  90. hdfs::Status status = fs->Delete(path, false);
  91. if (!status.ok()) {
  92. std::cerr << "Error deleting the source file: " << path
  93. << " " << status.ToString() << std::endl;
  94. exit(EXIT_FAILURE);
  95. }
  96. }
  97. break;
  98. } else {
  99. std::cerr << "Error reading the file: " << status.ToString() << std::endl;
  100. exit(EXIT_FAILURE);
  101. }
  102. }
  103. } while (last_bytes_read > 0);
  104. return;
  105. }
  106. }