Skip to main content
deleted 10 characters in body; edited title
Source Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238

Asynchronous Object Destruction Serviceobject destruction service

Inspired by the recent Stack OverflowStack Overflow question “Safely release a resource on a different thread”, I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxxdestruction_service.hxx

syncio.hxxsyncio.hxx

main.cxxmain.cxx

Asynchronous Object Destruction Service

Inspired by the recent Stack Overflow question “Safely release a resource on a different thread”, I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxx

syncio.hxx

main.cxx

Asynchronous object destruction service

Inspired by the recent Stack Overflow question “Safely release a resource on a different thread”, I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxx

syncio.hxx

main.cxx

Source Link
5gon12eder
  • 4.3k
  • 14
  • 29

Asynchronous Object Destruction Service

Inspired by the recent Stack Overflow question “Safely release a resource on a different thread”, I wrote the following destruction_service<T>. Please refer to the class template's DocString for more information (and let me know in your review if it is insufficient).

destruction_service.hxx

/**
 * @file destruction_service.hxx
 *
 * @brief
 *         Provides the `my::destruction_service` class template for
 *         asynchronous object destruction.
 *
 */

#ifndef DESTRUCTION_SERVICE_HXX
#define DESTRUCTION_SERVICE_HXX

#include <cassert>             // assert
#include <condition_variable>  // std::condition_variable
#include <mutex>               // std::mutex, std::unique_lock
#include <thread>              // std::thread
#include <type_traits>         // std::enable_if_t, std::is_nothrow_{move_constructible,destructible}
#include <utility>             // std::move
#include <vector>              // std::vector

#include "syncio.hxx"          // syncio::print_log


namespace my
{

  /**
   * @brief
   *         An asynchronous object destruction service.
   *
   * An instance of this class owns a worker thread that asynchronously
   * destroys objects.  This might be useful for threads that have high
   * responsiveness requirements and objects with (potentially) long-running
   * destructors.
   *
   * To-be-destroyed objects can be scheduled for destruction by means of the
   * `schedule_destruction` member function.  It takes its argument by rvalue
   * reference assuming that destructing a moved-away-from object is
   * considerably cheaper.  If you are using this service to destroy smart
   * pointers, be aware that only scheduling the *last* instance of a shared
   * pointer will cause the managed object to be destroyed asynchronously.  `T`
   * is required to have a non-throwing move constructor and its destructor
   * must not throw either.
   *
   * If you have to destruct objects of different types, consider using a
   * polymorphic wrapper to perform type erasure.  The default is
   * non-polymorphic, however, because type erasure requires additional dynamic
   * memory allocation which is obnoxious for destruction.  Also, smart
   * pointers already provide some degree of polymorphism, provided that all
   * types you need to destruct derive from a common base class that has a
   * `virtual` destructor.
   *
   * @tparam T
   *         type of object to be destroyed
   *
   */
  template
  <
    typename T,
    typename = std::enable_if_t<std::is_nothrow_move_constructible<T>::value>,
    typename = std::enable_if_t<std::is_nothrow_destructible<T>::value>
  >
  class destruction_service final
  {

  public:

    /** @brief Type that is destructed by this service. */
    using object_type = T;

  private:

    /** @brief Worker thread destroying objects. */
    std::thread worker_ {};

    /** @brief Mutex to protect the object queue. */
    mutable std::mutex mutex_ {};

    /** @brief Condition variable to signal changes to the object queue. */
    mutable std::condition_variable condvar_ {};

    /** @brief Object queue of to-be-destructed items. */
    std::vector<object_type> queue_ {};

    /** @brief Signal that no more objects will be scheduled for destruction. */
    bool done_ {};

  public:

    /**
     * @brief
     *         Creates a new destruction service with its own worker thread.
     *
     */
    destruction_service()
    {
      this->worker_ = std::thread {&destruction_service::do_work_, this};
    }

    /**
     * @brief
     *         Destroys all remaining objects and then shuts down the service.
     *
     */
    ~destruction_service() noexcept
    {
      syncio::print_log(__PRETTY_FUNCTION__);
      auto lck = this->lock_queue_();
      this->done_ = true;
      lck.unlock();
      this->condvar_.notify_all();
      if (this->worker_.joinable())
        this->worker_.join();
      assert(this->queue_.empty());
    }

    /**
     * @brief
     *         Schedules an object for asynchronous destruction.
     *
     * This function may safely be called concurrently.
     *
     * @param object
     *         object to be destructed
     *
     */
    void
    schedule_destruction(object_type&& object)
    {
      auto lck = this->lock_queue_();
      this->queue_.push_back(std::move(object));
      lck.unlock();
      this->condvar_.notify_all();
    }

  private:

    /**
     * @brief
     *         Worker thread main loop.
     *
     * Until the `done_` flag is not set, this function waits on the object
     * queue and removes and destructs any items in it.  It is guaranteeds that
     * the queue be empty after this function returns unless items were added
     * after the `done_` flag has been set.
     *
     */
    void
    do_work_()
    {
      syncio::print_log("destruction thread");
      auto things = std::vector<object_type> {};
      for (auto stop = false; true; stop = this->is_done_())
        {
          auto lck = this->lock_queue_();
          while (this->queue_.empty() && !this->done_)
            this->condvar_.wait(lck);
          this->queue_.swap(things);
          lck.unlock();
          syncio::print_log("about to destroy ", things.size(), " objects...");
          things.clear();
          if (stop)
            break;
        }
      assert(this->queue_.empty());
      assert(things.empty());
      syncio::print_log("good bye");
    }

    /**
     * @brief
     *         Fetches the value of the `done_` member in a synchronized
     *         manner.
     *
     * @returns
     *         value of the `done_` member variable
     *
     */
    bool
    is_done_() const noexcept
    {
      auto lck = this->lock_queue_();
      return this->done_;
    }

    /**
     * @brief
     *         Returns a lock for the object queue.
     *
     * @returns
     *         lock for the object queue
     *
     */
    std::unique_lock<std::mutex>
    lock_queue_() const noexcept
    {
      return std::unique_lock<std::mutex> {this->mutex_};
    }

  };  // struct destruction_service

}  // namespace my


#endif  // #ifndef DESTRUCTION_SERVICE_HXX

A few design decisions (please critique if you disagree):

  • The type of objects that can be destructed is not erased for the reasons mentioned in the DocString. This also means that I cannot use queue poisoning in the destructor but instead have to use the done_ flag. I'm somewhat uncomfortable with the resulting control logic of the main loop in the do_work_ function.
  • The schedule_destruction function might have to resize a std::vector while holding to a lock. I normally wouldn't do this but in this case, it seemed preferable to have the main thread execute as fast as possible. I don't care if the destruction (worker) thread is blocked for a short while. Its work isn't very urgent anyway. It is also assumed that reallocations will be infrequent (see next item).
  • The object queue is implemented by a pair of std::vectors that grow dynamically as needed but never shrink. I thought about using a std::deque instead to avoid potentially keeping large amounts of memory that are no longer needed after a burst of submissions but the extra capacity of the std::vectors reducing the number of memory allocations in the critical section convinced me that this is the lesser evil. On the other hand, resizing a std::deque is a constant-time operation, while resizing a std::vector is linear.

For completeness, I'll also show the syncio.hxx header that provides the thread-safe syncio::print_log function and a small example usage in main.cxx. Feel free to comment about these files as well if you want to but please focus on the destruction_service.hxx file.

syncio.hxx

/**
 * @file syncio.hxx
 *
 * @brief
 *         Simple synchronized (thread-safe) I/O functions.
 *
 * The functions provided by this header are only thread-safe if they are not
 * mixed with other I/O functions.  Synchronization happens via an internal
 * mutex.  Locking is pessimistic, using the same mutex for all streams,
 * assuming that all streams might interleave.
 *
 */

#ifndef SYNCIO_HXX
#define SYNCIO_HXX

#include <iostream>  // std::ostream, std::clog, std::endl
#include <mutex>     // std::mutex, std::lock_guard
#include <thread>    // std::this_thread::get_id


namespace syncio
{

  /** @brief Internal private `namespace`. */
  namespace syncio_hxx
  {

    /**
     * @brief
     *         Returns a reference to the singleton instance of the I/O mutex.
     *
     * @returns
     *         reference to the mutex protecting I/O operations
     *
     */
    std::mutex&
    get_iomutex() noexcept
    {
      static std::mutex iomutex {};
      return iomutex;
    }

    /**
     * @brief
     *         Helper struct to print a variadic number of arguments.
     *
     * @tparam ...
     *         ignored
     *
     */
    template <typename...>
    struct printer_helper;

    /**
     * @brief
     *         Specialization for one or more arguments.
     *
     * @tparam HeadT
     *         type of the first argument
     *
     * @tparam TailTs...
     *         types of the remaining arguments (if any)
     *
     */
    template <typename HeadT, typename... TailTs>
    struct printer_helper<HeadT, TailTs...>
    {

      /**
       * @brief
       *         Inserts all arguments followed by a newline into the given
       *         stream and flushes it.
       *
       * @param os
       *         stream to print to
       *
       * @param head
       *         first argument to print
       *
       * @param tail...
       *         remaining arguments to print
       *
       */
      static void
      print(std::ostream& os, const HeadT& head, const TailTs&... tail)
      {
        os << head;
        printer_helper<TailTs...>::print(os, tail...);
      }

    };

    /**
     * @brief
     *         Specialization for zero arguments.
     *
     */
    template <>
    struct printer_helper<>
    {

      /**
       * @brief
       *         Inserts a newline into the given stream and flushes it.
       *
       * @param os
       *         stream to print to
       *
       */
      static void
      print(std::ostream& os)
      {
        os << std::endl;
      }

    };

  }  // namespace syncio_hxx

  /**
   * @brief
   *         Prints a message to the given stream in a synchronized
   *         (thread-safe) manner.
   *
   * The arguments are inserted as if by successively streaming them with
   * `operator<<`.  After the last item, a newline is inserted and the stream
   * flushed.
   *
   * @param os
   *         stream to print to
   *
   * @param items...
   *         items to make up the message
   *
   */
  template <typename... Ts>
  void
  print(std::ostream& os, const Ts&... items)
  {
    std::lock_guard<std::mutex> guard {syncio_hxx::get_iomutex()};
    syncio_hxx::printer_helper<Ts...>::print(os, items...);
  }

  /**
   * @brief
   *         Prints a log message to `std::log` in a synchronized (thread-safe)
   *         manner.
   *
   * The arguments are prefixed with the ID of the current thread and
   * terminated by a newline.  The stream is flushed after printing the
   * message.
   *
   * @param items...
   *         items to make up the log message
   *
   */
  template <typename... Ts>
  void
  print_log(const Ts&... items)
  {
    print(std::clog, "[", std::this_thread::get_id(), "] ", items...);
  }

}  // namespace syncio


#endif  // #ifndef SYNCIO_HXX

main.cxx

/**
 * @file main.cxx
 *
 * @brief
 *         Example usage of the `my::destruction_service`.
 *
 */


#include <atomic>                   // std::atomic_int
#include <thread>                   // std::this_thread::{get_id,yield}
#include <utility>                  // std::exchange

#include "destruction_service.hxx"  // my::destruction_service
#include "syncio.hxx"               // syncio::print_log


namespace /* anonymous */
{

  /** @brief Number of `example` objects created thus far. */
  std::atomic_int example_count {};

  /** @brief ID of the main thread. */
  std::thread::id main_thread_id {};

  /**
   * @brief
   *         A dummy class.
   *
   * The destructor if this class `assert()`s that it is invoked on a thread
   * different from the main thread.  This is not the case for destructing
   * moved-away-from objects.
   *
   */
  class example
  {

  private:

    /** @brief ID of this object. */
    int id_ {-1};

  public:

    /**
     * @brief
     *         Creates a new object with a new ID.
     *
     */
    example() : id_ {example_count.fetch_add(1)}
    {
      syncio::print_log("creating object ", this->id_);
      std::this_thread::yield();
    }

    /**
     * @brief
     *         Creates a copy of another object with the same ID.
     *
     * @param other
     *         object to copy
     *
     */
    example(const example& other) : id_ {other.id_}
    {
    }

    /**
     * @brief
     *         Creates a copy of another object, stealing its ID.
     *
     * The moved-away-from object will get a negative ID.
     *
     * @param other
     *         object to move away from
     *
     */
    example(example&& other) noexcept : id_ {std::exchange(other.id_, -1)}
    {
    }

    /**
     * @brief
     *         Asserts that the object is either in a moved-away-from state or
     *         the current thread is different from the main thread.
     *
     */
    ~example() noexcept
    {
      syncio::print_log("destroying object ", this->id_);
      assert(this->id_ < 0 || std::this_thread::get_id() != main_thread_id);
      std::this_thread::yield();
    }

  };  // struct example

}  // namespace /* anonymous */


/**
 * @brief
 *         Launches a `my::destruction_service` and submits a dozen `example`
 *         objects to it.
 *
 */
int
main()
{
  main_thread_id = std::this_thread::get_id();
  syncio::print_log("main thread");
  my::destruction_service<example> destructor {};
  for (int i = 0; i < 12; ++i)
    {
      auto thing = example {};
      destructor.schedule_destruction(std::move(thing));
    }
}

I'm interested in feedback of any kind but in particular:

  • Correctness, especially concurrency bugs. It took me a while to get this into what I believe is now race and dead-lock free.
  • Compliance with the C++14 standard and making good use of its features.
  • Design of the interface.
  • Performance issues.