hdfs_ioservice.cc 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hdfs_ioservice.h"
  19. #include <thread>
  20. #include <mutex>
  21. #include <vector>
  22. #include "common/logging.h"
  23. namespace hdfs {
  24. IoService::~IoService() {}
  25. IoService *IoService::New() {
  26. return new IoServiceImpl();
  27. }
  28. std::shared_ptr<IoService> IoService::MakeShared() {
  29. return std::make_shared<IoServiceImpl>();
  30. }
  31. unsigned int IoServiceImpl::InitDefaultWorkers() {
  32. LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << " called.");
  33. unsigned int logical_thread_count = std::thread::hardware_concurrency();
  34. #ifndef DISABLE_CONCURRENT_WORKERS
  35. if(logical_thread_count < 1) {
  36. LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not detect any logical processors. Defaulting to 1 worker thread.");
  37. } else {
  38. LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << logical_thread_count << " logical threads and will spawn a worker for each.");
  39. }
  40. #else
  41. if(logical_thread_count > 0) {
  42. LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << logical_thread_count << " threads available. Concurrent workers are disabled so 1 worker thread will be used");
  43. }
  44. logical_thread_count = 1;
  45. #endif
  46. return InitWorkers(logical_thread_count);
  47. }
  48. unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) {
  49. #ifdef DISABLED_CONCURRENT_WORKERS
  50. LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count << " threads specified but concurrent workers are disabled so 1 will be used");
  51. thread_count = 1;
  52. #endif
  53. unsigned int created_threads = 0;
  54. for(unsigned int i=0; i<thread_count; i++) {
  55. bool created = AddWorkerThread();
  56. if(created) {
  57. created_threads++;
  58. } else {
  59. LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers failed to create a worker thread");
  60. }
  61. }
  62. if(created_threads != thread_count) {
  63. LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers attempted to create "
  64. << thread_count << " but only created " << created_threads
  65. << " worker threads. Make sure this process has adequate resources.");
  66. }
  67. return created_threads;
  68. }
  69. bool IoServiceImpl::AddWorkerThread() {
  70. mutex_guard state_lock(state_lock_);
  71. auto async_worker = [this]() {
  72. this->ThreadStartHook();
  73. this->Run();
  74. this->ThreadExitHook();
  75. };
  76. worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) );
  77. return true;
  78. }
  79. void IoServiceImpl::ThreadStartHook() {
  80. mutex_guard state_lock(state_lock_);
  81. LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " starting");
  82. }
  83. void IoServiceImpl::ThreadExitHook() {
  84. mutex_guard state_lock(state_lock_);
  85. LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting");
  86. }
  87. void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) {
  88. io_service_.post(asyncTask);
  89. }
  90. void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) {
  91. // It is far too easy to destroy the filesystem (and thus the threadpool)
  92. // from within one of the worker threads, leading to a deadlock. Let's
  93. // provide some explicit protection.
  94. if(t->get_id() == std::this_thread::get_id()) {
  95. LOG_ERROR(kAsyncRuntime, << "FileSystemImpl::WorkerDeleter::operator(treadptr="
  96. << t << ") : FATAL: Attempted to destroy a thread pool"
  97. "from within a callback of the thread pool!");
  98. }
  99. t->join();
  100. delete t;
  101. }
  102. // As long as this just forwards to an asio::io_service method it doesn't need a lock
  103. void IoServiceImpl::Run() {
  104. // The IoService executes callbacks provided by library users in the context of worker threads,
  105. // there is no way of preventing those callbacks from throwing but we can at least prevent them
  106. // from escaping this library and crashing the process.
  107. // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
  108. asio::io_service::work work(io_service_);
  109. while(true)
  110. {
  111. try
  112. {
  113. io_service_.run();
  114. break;
  115. } catch (const std::exception & e) {
  116. LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what());
  117. } catch (...) {
  118. LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread");
  119. }
  120. }
  121. }
  122. unsigned int IoServiceImpl::get_worker_thread_count() {
  123. mutex_guard state_lock(state_lock_);
  124. return worker_threads_.size();
  125. }
  126. }