2015年11月17日火曜日

Lock-free multi-threading task system with boost::lockfree::queue.

I would rewrite a task-system from an original old buggy code with std libraries only to a new code with boost::lockfree::queue. Thus, I write the experimental code:

#include <iostream>
#include <string>
#include <functional>
#include <chrono>
#include <thread>
#include <atomic>
#include <cstdint>
#include <boost/lockfree/queue.hpp>

struct task_system
{
  using functor_type = std::function< auto () -> void >;
  using functor_pointer_type = functor_type*;

  enum class state_type: std::uint_least8_t
  { prepare
  , run
  , exit
  };

  boost::lockfree::queue< functor_pointer_type > q;

  std::atomic< state_type > state;

  task_system( const std::size_t capacity = 8192 )
    : q( capacity )
    , state( state_type::prepare )
  { }

  auto push_task( functor_type&& f )
  { q.push( new functor_type( f ) ); }

  auto run( const std::size_t number_of_workers = 4 )
  {
    std::vector< std::thread > workers;
    workers.reserve( number_of_workers );

    for ( auto n = static_cast<std::size_t>( 0 ); n < number_of_workers; ++n )
      workers.emplace_back( std::thread( [this]{ worker(); } ) );

    state = state_type::run;

    for ( auto& worker : workers )
      worker.join();
  }

  auto worker( ) -> void
  {
    const auto processor = [] ( auto* pf )
    {
      (*pf)();
      delete pf;
    };

    while ( state != state_type::run )
      std::this_thread::sleep_for( std::chrono::nanoseconds(1) );

    while ( state == state_type::run )
      q.consume_all( processor );
  }
};

auto main() -> int
{
  boost::lockfree::queue< decltype( std::this_thread::get_id() ) > results( 8192 );

  auto ts = std::make_shared<task_system>();
  auto t = std::thread( [=]{ ts->run(); } );

  for ( const auto start = std::chrono::steady_clock::now(); std::chrono::steady_clock::now() - start < std::chrono::milliseconds( 20 ); )
  {
    ts->push_task( [&]{ results.push( std::this_thread::get_id() ); std::this_thread::sleep_for( std::chrono::milliseconds(10) ); } );
    std::this_thread::sleep_for( std::chrono::milliseconds(1) );
  }

  ts->state = task_system::state_type::exit;

  t.join();

  std::cout << std::hex;
  const auto consumed = results.consume_all( []( const auto& v ){ std::cout << v << "\n"; } );
  std::cout << std::to_string( consumed ) << " consumed. empty: " << std::boolalpha << results.empty();
}

Result:

7f80a1a71700
7f80a0a6f700
7f80a1270700
7f80a2272700
7f80a1a71700
7f80a0a6f700
7f80a1270700
7f80a2272700
7f80a1a71700
7f80a0a6f700
7f80a1270700
7f80a2272700
7f80a1a71700
7f80a0a6f700
14 consumed. empty: true

It’s nice :)

Note:

boost::lockfree::queue<T> is require “T is trivial”. Then It cannot use T = std::function< auto () -> void > directly. Of course, you can use T = auto (*)() -> void if you are not require use state-full lambda expression. But, I need state-full lambda expression now. Therefore, I use pointer trick for use easily. Of course, this is an experimental code, I’ll hide codes to private as new / delete and etc. implementation detail in a product code.

Reference:


(なんか気分的に久しぶりに英語で書いてしまったので軽く日本語でも)

boost::lockfree::queue<T> Tは trivial じゃないとならないので std::function をそのまま放り込むわけにいかない。もし、ステートフルラムダ式を扱わないのなら関数ポインター型を放り込めばいいのだけど、ステートフルラムダ式を使いたいんだよね。そういうわけだから、簡単に使うためにポインタートリックを使ったよ。もちろん、これは実験用のコードで、製品で使う時には newdelete ほかアレコレは private に隠蔽するだろうけどね。

0 件のコメント:

コメントを投稿