hdfs_chgrp.cc 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing,
  11. software distributed under the License is distributed on an
  12. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. KIND, either express or implied. See the License for the
  14. specific language governing permissions and limitations
  15. under the License.
  16. */
  17. #include <google/protobuf/stubs/common.h>
  18. #include <unistd.h>
  19. #include <future>
  20. #include "tools_common.h"
  21. void usage(){
  22. std::cout << "Usage: hdfs_chgrp [OPTION] GROUP FILE"
  23. << std::endl
  24. << std::endl << "Change the group association of each FILE to GROUP."
  25. << std::endl << "The user must be the owner of files. Additional information is in the Permissions Guide:"
  26. << std::endl << "https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html"
  27. << std::endl
  28. << std::endl << " -R operate on files and directories recursively"
  29. << std::endl << " -h display this help and exit"
  30. << std::endl
  31. << std::endl << "Examples:"
  32. << std::endl << "hdfs_chgrp -R new_group hdfs://localhost.localdomain:8020/dir/file"
  33. << std::endl << "hdfs_chgrp new_group /dir/file"
  34. << std::endl;
  35. }
  36. struct SetOwnerState {
  37. const std::string username;
  38. const std::string groupname;
  39. const std::function<void(const hdfs::Status &)> handler;
  40. //The request counter is incremented once every time SetOwner async call is made
  41. uint64_t request_counter;
  42. //This boolean will be set when find returns the last result
  43. bool find_is_done;
  44. //Final status to be returned
  45. hdfs::Status status;
  46. //Shared variables will need protection with a lock
  47. std::mutex lock;
  48. SetOwnerState(const std::string & username_, const std::string & groupname_,
  49. const std::function<void(const hdfs::Status &)> & handler_,
  50. uint64_t request_counter_, bool find_is_done_)
  51. : username(username_),
  52. groupname(groupname_),
  53. handler(handler_),
  54. request_counter(request_counter_),
  55. find_is_done(find_is_done_),
  56. status(),
  57. lock() {
  58. }
  59. };
  60. int main(int argc, char *argv[]) {
  61. //We should have 3 or 4 parameters
  62. if (argc != 3 && argc != 4) {
  63. usage();
  64. exit(EXIT_FAILURE);
  65. }
  66. bool recursive = false;
  67. int input;
  68. //Using GetOpt to read in the values
  69. opterr = 0;
  70. while ((input = getopt(argc, argv, "Rh")) != -1) {
  71. switch (input)
  72. {
  73. case 'R':
  74. recursive = 1;
  75. break;
  76. case 'h':
  77. usage();
  78. exit(EXIT_SUCCESS);
  79. case '?':
  80. if (isprint(optopt))
  81. std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
  82. else
  83. std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
  84. usage();
  85. exit(EXIT_FAILURE);
  86. default:
  87. exit(EXIT_FAILURE);
  88. }
  89. }
  90. std::string group = argv[optind];
  91. //Owner stays the same, just group association changes.
  92. std::string owner = "";
  93. std::string uri_path = argv[optind + 1];
  94. //Building a URI object from the given uri_path
  95. hdfs::optional<hdfs::URI> uri = hdfs::URI::parse_from_string(uri_path);
  96. if (!uri) {
  97. std::cerr << "Malformed URI: " << uri_path << std::endl;
  98. exit(EXIT_FAILURE);
  99. }
  100. std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri.value(), true);
  101. if (!fs) {
  102. std::cerr << "Could not connect the file system. " << std::endl;
  103. exit(EXIT_FAILURE);
  104. }
  105. /* wrap async FileSystem::SetOwner with promise to make it a blocking call */
  106. std::shared_ptr<std::promise<hdfs::Status>> promise = std::make_shared<std::promise<hdfs::Status>>();
  107. std::future<hdfs::Status> future(promise->get_future());
  108. auto handler = [promise](const hdfs::Status &s) {
  109. promise->set_value(s);
  110. };
  111. if(!recursive){
  112. fs->SetOwner(uri->get_path(), owner, group, handler);
  113. }
  114. else {
  115. //Allocating shared state, which includes:
  116. //username and groupname to be set, handler to be called, request counter, and a boolean to keep track if find is done
  117. std::shared_ptr<SetOwnerState> state = std::make_shared<SetOwnerState>(owner, group, handler, 0, false);
  118. // Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0.
  119. // Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind.
  120. auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector<hdfs::StatInfo> & stat_infos, bool has_more_results) -> bool {
  121. //For each result returned by Find we call async SetOwner with the handler below.
  122. //SetOwner DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetOwner.
  123. auto handlerSetOwner = [state](const hdfs::Status &status_set_owner) {
  124. std::lock_guard<std::mutex> guard(state->lock);
  125. //Decrement the counter once since we are done with this async call
  126. if (!status_set_owner.ok() && state->status.ok()){
  127. //We make sure we set state->status only on the first error.
  128. state->status = status_set_owner;
  129. }
  130. state->request_counter--;
  131. if(state->request_counter == 0 && state->find_is_done){
  132. state->handler(state->status); //exit
  133. }
  134. };
  135. if(!stat_infos.empty() && state->status.ok()) {
  136. for (hdfs::StatInfo const& s : stat_infos) {
  137. //Launch an asynchronous call to SetOwner for every returned result
  138. state->request_counter++;
  139. fs->SetOwner(s.full_path, state->username, state->groupname, handlerSetOwner);
  140. }
  141. }
  142. //Lock this section because handlerSetOwner might be accessing the same
  143. //shared variables simultaneously
  144. std::lock_guard<std::mutex> guard(state->lock);
  145. if (!status_find.ok() && state->status.ok()){
  146. //We make sure we set state->status only on the first error.
  147. state->status = status_find;
  148. }
  149. if(!has_more_results){
  150. state->find_is_done = true;
  151. if(state->request_counter == 0){
  152. state->handler(state->status); //exit
  153. }
  154. return false;
  155. }
  156. return true;
  157. };
  158. //Asynchronous call to Find
  159. fs->Find(uri->get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind);
  160. }
  161. /* block until promise is set */
  162. hdfs::Status status = future.get();
  163. if (!status.ok()) {
  164. std::cerr << "Error: " << status.ToString() << std::endl;
  165. exit(EXIT_FAILURE);
  166. }
  167. // Clean up static data and prevent valgrind memory leaks
  168. google::protobuf::ShutdownProtobufLibrary();
  169. return 0;
  170. }