#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
#include <iterator>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( InputIt itBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
typedef typename std::iterator_traits<InputIt>::value_type value_type;
typedef typename std::vector<value_type> buffer_type;
typedef typename buffer_type::iterator buffer_iterator;
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::thread m_thrd;m_thread;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
value_type buffer_iterator *m_pBegin;m_itBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<value_type> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
~pool_thread();
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t calc_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_hardwareConcurrency;m_maxRightThreads;
std::vector<value_type>buffer_type m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
template<typename InputIt2>
void threaded_sort( InputIt2 itBegin, std::size_t n, value_typebuffer_iterator *pSortBufitSortBuf );
template<typename InputIt2>
void unthreaded_sort( InputIt2 itBegin, std::size_t n, value_typebuffer_iterator *pSortBufitSortBuf );
template<typename OutputIt>
void merge_back( OutputIt itUp, value_typebuffer_iterator *pLeftitLeft, value_typebuffer_iterator *pLeftEnditLeftEnd, value_typebuffer_iterator *pRightitRight, value_typebuffer_iterator *pRightEnditRightEnd );
};
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
unsigned hc = std::thread::hardware_concurrency();
m_maxRightThreads = hc != 0 ? (hc - 1) : 0;
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t minThreaded )
{
size_t const MIN_SIZE = 2;
if( n <=< 1MIN_SIZE )
return;
if( (m_minThreaded = minThreaded) < 4(2 * MIN_SIZE) )
m_minThreaded = 4;2 * MIN_SIZE;
try
{
std::size_t s = pool_thread::calc_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
threaded_sort( itBegin, n, m_callerSortBuf.begin() );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( itBegin, n, &m_callerSortBuf[0] );
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::~parallel_merge_sort()
{
assert(m_activeThreads.beginsize() == m_activeThreads.end()0);
}
template<typename InputIt, typename P>
inline
std::size_t parallel_merge_sort<InputIt, P>::pool_thread::calc_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right,
rest = right;
return n;
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::~pool_thread()
{
using namespace std;
unique_lock<mutex> threadLock( m_mtx );
m_cmd = pool_thread::CMD_STOP;
m_sigInitiate.notify_one();
threadLock.unlock();
m_thrdm_thread.join();
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::threaded_sort( InputIt2 itBegin, std::size_t n, value_typebuffer_iterator *pSortBufitSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
if( n < m_minThreaded || (m_standbyThreads.sizeempty() == 0 && m_activeThreads.size() >= (m_hardwareConcurrency - 1)m_maxRightThreads) )
{
poolLock.unlock();
unthreaded_sort( itBegin, n, pSortBufitSortBuf );
return;
}
typedef typename list<pool_thread>::iterator pt_it;
pt_it itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !m_standbyThreads.sizeempty() )
{
pt_it itPTScan;
size_t optimalSize = pool_thread::calc_buffer_size( right ),
bestFit = (size_t)(ptrdiff_t)-1,
size;
for( itPT = m_standbyThreads.end(), itPTScan = m_standbyThreads.begin();
itPTScan != m_standbyThreads.end(); ++itPTScan )
if( (size = itPTScan->m_sortBuf.size()) >= optimalSize && size < bestFit )
itPT = itPTScan,
bestFit = size;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
value_typebuffer_iterator *pMoveToitMoveTo = pSortBuf;itSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++*itMoveTo = move( *itMoveFrom ), ++itMoveTo, ++itMoveFrom );
value_typebuffer_iterator *pLeft itLeft = pSortBufitSortBuf,
*pRight itRight = pLeftitLeft + left;
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin>m_itBegin = pRight;itRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeftitLeft, left, pSortBufitSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
threadLock.unlock();
merge_back( itBegin, pLeftitLeft, pLeftitLeft + left, pRightitRight, pRightitRight + right );
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::unthreaded_sort( InputIt2 itBegin, std::size_t n, value_typebuffer_iterator *pSortBufitSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( itBegin[1], itBegin[0] ) )
{
value_type temp( move( itBegin[0] ) );
itBegin[0] = move( itBegin[1] );
itBegin[1] = move( temp );
}
return;
}
value_typebuffer_iterator *pMoveToitMoveTo = pSortBuf;itSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++*itMoveTo = move( *itMoveFrom ), ++itMoveTo, ++itMoveFrom );
size_t left = n / 2,
right = n - left;
value_typebuffer_iterator *pLeftitLeft = pSortBufitSortBuf,
*pRight itRight = pSortBufitLeft + left;
if( left >= 2 )
unthreaded_sort( pLeftitLeft, left, pSortBufitSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRightitRight, right, pSortBufitSortBuf + n );
merge_back( itBegin, pLeftitLeft, pLeftitLeft + left, pRightitRight, pRightitRight + right );
}
template<typename InputIt, typename P>
template<typename OutputIt>
inline
void parallel_merge_sort<InputIt, P>::merge_back( OutputIt itUp, value_typebuffer_iterator *pLeftitLeft, value_typebuffer_iterator *pLeftEnditLeftEnd, value_typebuffer_iterator *pRightitRight, value_typebuffer_iterator *pRightEnditRightEnd )
{
assert(pLeftitLeft !=< pLeftEnditLeftEnd && pRightitRight !=< pRightEnditRightEnd);
using namespace std;
for( ; ; )
if( m_p( *pLeft*itLeft, *pRight*itRight ) )
{
*itUp = move( *pLeft++*itLeft );
++itUp;++itUp, ++itLeft;
if( pLeftitLeft == pLeftEnditLeftEnd )
{
for( ; pRightitRight != pRightEnd;itRightEnd; *itUp = move( *pRight++*itRight ), ++itUp, ++itRight );
break;
}
}
else
{
*itUp = move( *pRight++*itRight );
++itUp;++itUp, ++itRight;
if( pRightitRight == pRightEnditRightEnd )
{
for( ; pLeftitLeft != pLeftEnd;itLeftEnd; *itUp = move( *pLeft++*itRight ), ++itUp, ++itLeft );
break;
}
}
}
template<typename InputIt, typename P>
std::size_t parallel_merge_sort<InputIt, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS ) :
m_mtx(),
m_sigInitiate(),
m_cmd( pool_thread::CMD_NONE ),
m_thrdm_thread( std::thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) )
{
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
threadLock.unlock();
bool success;
try
{
size_t size = calc_buffer_size( m_n );
if( m_sortBuf.size() < size )
m_sortBuf.resize( size );
pPMS->threaded_sort( m_pBeginm_itBegin, m_n, &m_sortBuf[0]m_sortBuf.begin() );
success = true;
}
catch( ... )
{
success = false;
}
threadLock.lock();
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class ref_parallel_merge_sort
{
private:
struct ref
{
InputIt it;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename InputIt, typename P>
inline
bool ref_parallel_merge_sort<InputIt, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.it, *right.it );
}
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_parallel_merge_sort( P const &p ) :
m_sorter( ref_predicate( p ) )
{
}
template<typename InputIt, typename P>
void ref_parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
typedef typename iterator_traits<InputIt>::value_type value_type;
vector<ref> refBuf;
InputIt it;
int i;
refBuf.resize( n );
;
for( i = 0, it = itBegin; i != n; refBuf[i].it = it, ++i, ++it );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<value_type> reorderBuf;
reorderBuf.resize( n );
for( i = 0, it = itBegin; i != n; reorderBuf[i] = move( *it ), ), ++i, ++it );
for( i = 0, it = itBegin; i != n; *it = move( reorderBuf[i] ), ++i, ++it );
}
catch( ... )
{
throw sort_exception();
}
}
template<typename InputIt, typename P>
inline
std::size_t ref_parallel_merge_sort<InputIt, P>::get_buffer_size()
{
return m_sorter.get_buffer_size();
}
template<typename InputIt, typename P>
inline
void ref_parallel_merge_sort<InputIt, P>::empty_buffers()
{
m_sorter.empty_buffers();
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#include <type_traits>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T, typename = typename template<typenameenable_if<is_unsigned<T>::value, T>::type>
string decimal_unsigned( T t );
int main()
{
typedef typename vector<double>::iterator it_type;
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
hc = hc ? hc : 1;
v.resize( SIZE );
parallel_merge_sort<it_type> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>T, typename>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
#include <iterator>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( InputIt itBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
typedef typename std::iterator_traits<InputIt>::value_type value_type;
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::thread m_thrd;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
value_type *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<value_type> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
~pool_thread();
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t calc_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_hardwareConcurrency;
std::vector<value_type> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
template<typename InputIt2>
void threaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf );
template<typename InputIt2>
void unthreaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf );
template<typename OutputIt>
void merge_back( OutputIt itUp, value_type *pLeft, value_type *pLeftEnd, value_type *pRight, value_type *pRightEnd );
};
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t minThreaded )
{
if( n <= 1 )
return;
if( (m_minThreaded = minThreaded) < 4 )
m_minThreaded = 4;
try
{
std::size_t s = pool_thread::calc_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( itBegin, n, &m_callerSortBuf[0] );
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::~parallel_merge_sort()
{
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename InputIt, typename P>
inline
std::size_t parallel_merge_sort<InputIt, P>::pool_thread::calc_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::~pool_thread()
{
using namespace std;
unique_lock<mutex> threadLock( m_mtx );
m_cmd = pool_thread::CMD_STOP;
m_sigInitiate.notify_one();
threadLock.unlock();
m_thrd.join();
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::threaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
if( n < m_minThreaded || (m_standbyThreads.size() == 0 && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( itBegin, n, pSortBuf );
return;
}
typedef typename list<pool_thread>::iterator pt_it;
pt_it itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !m_standbyThreads.size() )
{
pt_it itPTScan;
size_t optimalSize = pool_thread::calc_buffer_size( right ),
bestFit = (size_t)(ptrdiff_t)-1,
size;
for( itPT = m_standbyThreads.end(), itPTScan = m_standbyThreads.begin();
itPTScan != m_standbyThreads.end(); ++itPTScan )
if( (size = itPTScan->m_sortBuf.size()) >= optimalSize && size < bestFit )
itPT = itPTScan,
bestFit = size;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
value_type *pMoveTo = pSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++ = move( *itMoveFrom ), ++itMoveFrom );
value_type *pLeft = pSortBuf,
*pRight = pLeft + left;
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
threadLock.unlock();
merge_back( itBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::unthreaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( itBegin[1], itBegin[0] ) )
{
value_type temp( move( itBegin[0] ) );
itBegin[0] = move( itBegin[1] );
itBegin[1] = move( temp );
}
return;
}
value_type *pMoveTo = pSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++ = move( *itMoveFrom ), ++itMoveFrom );
size_t left = n / 2,
right = n - left;
value_type *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( itBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename InputIt, typename P>
template<typename OutputIt>
inline
void parallel_merge_sort<InputIt, P>::merge_back( OutputIt itUp, value_type *pLeft, value_type *pLeftEnd, value_type *pRight, value_type *pRightEnd )
{
assert(pLeft != pLeftEnd && pRight != pRightEnd);
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*itUp = move( *pLeft++ );
++itUp;
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *itUp = move( *pRight++ ), ++itUp );
break;
}
}
else
{
*itUp = move( *pRight++ );
++itUp;
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *itUp = move( *pLeft++ ), ++itUp );
break;
}
}
}
template<typename InputIt, typename P>
std::size_t parallel_merge_sort<InputIt, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS ) :
m_mtx(),
m_sigInitiate(),
m_cmd( pool_thread::CMD_NONE ),
m_thrd( std::thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) )
{
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
threadLock.unlock();
bool success;
try
{
size_t size = calc_buffer_size( m_n );
if( m_sortBuf.size() < size )
m_sortBuf.resize( size );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
threadLock.lock();
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class ref_parallel_merge_sort
{
private:
struct ref
{
InputIt it;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename InputIt, typename P>
inline
bool ref_parallel_merge_sort<InputIt, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.it, *right.it );
}
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_parallel_merge_sort( P const &p ) :
m_sorter( ref_predicate( p ) )
{
}
template<typename InputIt, typename P>
void ref_parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
typedef typename iterator_traits<InputIt>::value_type value_type;
vector<ref> refBuf;
InputIt it;
int i;
refBuf.resize( n );
;
for( i = 0, it = itBegin; i != n; refBuf[i].it = it, ++i, ++it );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<value_type> reorderBuf;
reorderBuf.resize( n );
for( i = 0, it = itBegin; i != n; reorderBuf[i] = move( *it ), ++i, ++it );
for( i = 0, it = itBegin; i != n; *it = move( reorderBuf[i] ), ++i, ++it );
}
catch( ... )
{
throw sort_exception();
}
}
template<typename InputIt, typename P>
inline
std::size_t ref_parallel_merge_sort<InputIt, P>::get_buffer_size()
{
return m_sorter.get_buffer_size();
}
template<typename InputIt, typename P>
inline
void ref_parallel_merge_sort<InputIt, P>::empty_buffers()
{
m_sorter.empty_buffers();
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
typedef typename vector<double>::iterator it_type;
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<it_type> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <iterator>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( InputIt itBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
typedef typename std::iterator_traits<InputIt>::value_type value_type;
typedef typename std::vector<value_type> buffer_type;
typedef typename buffer_type::iterator buffer_iterator;
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::thread m_thread;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
buffer_iterator m_itBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<value_type> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
~pool_thread();
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t calc_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_maxRightThreads;
buffer_type m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
template<typename InputIt2>
void threaded_sort( InputIt2 itBegin, std::size_t n, buffer_iterator itSortBuf );
template<typename InputIt2>
void unthreaded_sort( InputIt2 itBegin, std::size_t n, buffer_iterator itSortBuf );
template<typename OutputIt>
void merge_back( OutputIt itUp, buffer_iterator itLeft, buffer_iterator itLeftEnd, buffer_iterator itRight, buffer_iterator itRightEnd );
};
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::parallel_merge_sort( P const &p ) :
m_p( p )
{
unsigned hc = std::thread::hardware_concurrency();
m_maxRightThreads = hc != 0 ? (hc - 1) : 0;
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t minThreaded )
{
size_t const MIN_SIZE = 2;
if( n < MIN_SIZE )
return;
if( (m_minThreaded = minThreaded) < (2 * MIN_SIZE) )
m_minThreaded = 2 * MIN_SIZE;
try
{
std::size_t s = pool_thread::calc_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
threaded_sort( itBegin, n, m_callerSortBuf.begin() );
}
catch( ... )
{
throw sort_exception();
}
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::~parallel_merge_sort()
{
assert(m_activeThreads.size() == 0);
}
template<typename InputIt, typename P>
inline
std::size_t parallel_merge_sort<InputIt, P>::pool_thread::calc_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; )
right = rest - (rest / 2),
n += right,
rest = right;
return n;
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::~pool_thread()
{
using namespace std;
unique_lock<mutex> threadLock( m_mtx );
m_cmd = pool_thread::CMD_STOP;
m_sigInitiate.notify_one();
threadLock.unlock();
m_thread.join();
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::threaded_sort( InputIt2 itBegin, std::size_t n, buffer_iterator itSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
if( n < m_minThreaded || (m_standbyThreads.empty() && m_activeThreads.size() >= m_maxRightThreads) )
{
poolLock.unlock();
unthreaded_sort( itBegin, n, itSortBuf );
return;
}
typedef typename list<pool_thread>::iterator pt_it;
pt_it itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !m_standbyThreads.empty() )
{
pt_it itPTScan;
size_t optimalSize = pool_thread::calc_buffer_size( right ),
bestFit = (size_t)(ptrdiff_t)-1,
size;
for( itPT = m_standbyThreads.end(), itPTScan = m_standbyThreads.begin();
itPTScan != m_standbyThreads.end(); ++itPTScan )
if( (size = itPTScan->m_sortBuf.size()) >= optimalSize && size < bestFit )
itPT = itPTScan,
bestFit = size;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
buffer_iterator itMoveTo = itSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *itMoveTo = move( *itMoveFrom ), ++itMoveTo, ++itMoveFrom );
buffer_iterator itLeft = itSortBuf,
itRight = itLeft + left;
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_itBegin = itRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( itLeft, left, itSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
threadLock.unlock();
merge_back( itBegin, itLeft, itLeft + left, itRight, itRight + right );
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::unthreaded_sort( InputIt2 itBegin, std::size_t n, buffer_iterator itSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( itBegin[1], itBegin[0] ) )
{
value_type temp( move( itBegin[0] ) );
itBegin[0] = move( itBegin[1] );
itBegin[1] = move( temp );
}
return;
}
buffer_iterator itMoveTo = itSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *itMoveTo = move( *itMoveFrom ), ++itMoveTo, ++itMoveFrom );
size_t left = n / 2,
right = n - left;
buffer_iterator itLeft = itSortBuf,
itRight = itLeft + left;
if( left >= 2 )
unthreaded_sort( itLeft, left, itSortBuf + n );
if( right >= 2 )
unthreaded_sort( itRight, right, itSortBuf + n );
merge_back( itBegin, itLeft, itLeft + left, itRight, itRight + right );
}
template<typename InputIt, typename P>
template<typename OutputIt>
inline
void parallel_merge_sort<InputIt, P>::merge_back( OutputIt itUp, buffer_iterator itLeft, buffer_iterator itLeftEnd, buffer_iterator itRight, buffer_iterator itRightEnd )
{
assert(itLeft < itLeftEnd && itRight < itRightEnd);
using namespace std;
for( ; ; )
if( m_p( *itLeft, *itRight ) )
{
*itUp = move( *itLeft );
++itUp, ++itLeft;
if( itLeft == itLeftEnd )
{
for( ; itRight != itRightEnd; *itUp = move( *itRight ), ++itUp, ++itRight );
break;
}
}
else
{
*itUp = move( *itRight );
++itUp, ++itRight;
if( itRight == itRightEnd )
{
for( ; itLeft != itLeftEnd; *itUp = move( *itRight ), ++itUp, ++itLeft );
break;
}
}
}
template<typename InputIt, typename P>
std::size_t parallel_merge_sort<InputIt, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS ) :
m_mtx(),
m_sigInitiate(),
m_cmd( pool_thread::CMD_NONE ),
m_thread( std::thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) )
{
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
threadLock.unlock();
bool success;
try
{
size_t size = calc_buffer_size( m_n );
if( m_sortBuf.size() < size )
m_sortBuf.resize( size );
pPMS->threaded_sort( m_itBegin, m_n, m_sortBuf.begin() );
success = true;
}
catch( ... )
{
success = false;
}
threadLock.lock();
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class ref_parallel_merge_sort
{
private:
struct ref
{
InputIt it;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename InputIt, typename P>
inline
bool ref_parallel_merge_sort<InputIt, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.it, *right.it );
}
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_parallel_merge_sort( P const &p ) :
m_sorter( ref_predicate( p ) )
{
}
template<typename InputIt, typename P>
void ref_parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
typedef typename iterator_traits<InputIt>::value_type value_type;
vector<ref> refBuf;
InputIt it;
int i;
refBuf.resize( n );
for( i = 0, it = itBegin; i != n; refBuf[i].it = it, ++i, ++it );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<value_type> reorderBuf;
reorderBuf.resize( n );
for( i = 0, it = itBegin; i != n; reorderBuf[i] = move( *it ), ++i, ++it );
for( i = 0, it = itBegin; i != n; *it = move( reorderBuf[i] ), ++i, ++it );
}
catch( ... )
{
throw sort_exception();
}
}
template<typename InputIt, typename P>
inline
std::size_t ref_parallel_merge_sort<InputIt, P>::get_buffer_size()
{
return m_sorter.get_buffer_size();
}
template<typename InputIt, typename P>
inline
void ref_parallel_merge_sort<InputIt, P>::empty_buffers()
{
m_sorter.empty_buffers();
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#include <type_traits>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T, typename = typename enable_if<is_unsigned<T>::value, T>::type>
string decimal_unsigned( T t );
int main()
{
typedef typename vector<double>::iterator it_type;
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
hc = hc ? hc : 1;
v.resize( SIZE );
parallel_merge_sort<it_type> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T, typename>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
#include <iterator>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename TInputIt, typename P = std::less<T>>less<typename std::iterator_traits<InputIt>::value_type>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( TInputIt *pBeginitBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
typedef typename std::iterator_traits<InputIt>::value_type value_type;
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::unique_ptr<std::thread>thread m_thread; m_thrd;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
T value_type *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<T> vector<value_type> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
~pool_thread();
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t get_buffer_sizecalc_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_hardwareConcurrency;
std::vector<T> vector<value_type> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
template<typename InputIt2>
void threaded_sort( TInputIt2 *pBeginitBegin, std::size_t n, Tvalue_type *pSortBuf );
template<typename InputIt2>
void unthreaded_sort( TInputIt2 *pBeginitBegin, std::size_t n, Tvalue_type *pSortBuf );
template<typename OutputIt>
void merge_back( TOutputIt *pUpitUp, Tvalue_type *pLeft, Tvalue_type *pLeftEnd, Tvalue_type *pRight, Tvalue_type *pRightEnd );
};
template<typename TInputIt, typename P>
parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename TInputIt, typename P>
void parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::sort( TInputIt *pBeginitBegin, size_t n, std::size_t minThreaded )
{
if( n <= 1 )
return;
if( (m_minThreaded = minThreaded) < 4 )
m_minThreaded = 4;
try
{
std::size_t s = pool_thread::get_buffer_sizecalc_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( pBeginitBegin, n, &m_callerSortBuf[0] );
}
template<typename TInputIt, typename P>
parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::~parallel_merge_sort()
{
using namespace std;
lock_guard<mutex> poolLock( m_mtxPool );
for( pool_thread &pt : m_standbyThreads )
{
unique_lock<mutex> threadLock( pt.m_mtx );
pt.m_cmd = pool_thread::CMD_STOP;
pt.m_sigInitiate.notify_one();
threadLock.unlock();
pt.m_thread->join();
}
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename TInputIt, typename P>
inline
std::size_t parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::pool_thread::get_buffer_sizecalc_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename TInputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::~pool_thread()
{
using namespace std;
unique_lock<mutex> threadLock( m_mtx );
m_cmd = pool_thread::CMD_STOP;
m_sigInitiate.notify_one();
threadLock.unlock();
m_thrd.join();
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::threaded_sort( TInputIt2 *pBeginitBegin, std::size_t n, Tvalue_type *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
bool standbyEmpty;
if( n < m_minThreaded || ((standbyEmpty = m_standbyThreads.beginsize() == m_standbyThreads.end())0 && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( pBeginitBegin, n, pSortBuf );
return;
}
typedef typename list<pool_thread>::iterator itPT;pt_it;
pool_thread pt_it itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !standbyEmptym_standbyThreads.size() )
{
typenamept_it list<pool_thread>::iterator itPTSearch;itPTScan;
ptrdiff_t size_t soptimalSize = (ptrdiff_t)pool_thread::get_buffer_sizecalc_buffer_size( right ),
bestFit bestFit = (ptrdiff_t)((size_t)(ptrdiff_t)-1 >> 1),
diff;size;
for( itPT = m_standbyThreads.end(), itPTSearchitPTScan = m_standbyThreads.begin();
itPTSearchitPTScan != m_standbyThreads.end(); ++itPTSearch++itPTScan )
if( (diffsize = (ptrdiff_t)itPTSearchitPTScan->m_sortBuf.size() - s) >= 0optimalSize && diffsize < bestFit )
itPT = itPTSearchitPTScan,
bestFit = diff;size;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
value_type *pMoveTo = pSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++ = move( *itMoveFrom ), ++itMoveFrom );
value_type *pLeft = pSortBuf,
*pRight = pLeft + left;
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
threadLock.unlock();
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
threadLock.unlock();
merge_back( pBeginitBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename TInputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::unthreaded_sort( TInputIt2 *pBeginitBegin, std::size_t n, Tvalue_type *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( pBegin[1]itBegin[1], pBegin[0]itBegin[0] ) )
{
Tvalue_type temp( move( pBegin[0]itBegin[0] ) );
pBegin[0]itBegin[0] = move( pBegin[1]itBegin[1] );
pBegin[1]itBegin[1] = move( temp );
}
return;
}
value_type *pMoveTo = pSortBuf;
for( TInputIt2 *pMoveFromitMoveFrom = pBeginitBegin, *pMoveToitEnd = pSortBuf;itMoveFrom pMoveFrom+ !=n; (pBeginitMoveFrom +!= n);itEnd; *pMoveTo++ = move( *pMoveFrom++*itMoveFrom ), ++itMoveFrom );
size_t left = n / 2,
right = n - left;
T value_type *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( pBeginitBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename TInputIt, typename P>
template<typename OutputIt>
inline
void parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::merge_back( TOutputIt *pUpitUp, Tvalue_type *pLeft, Tvalue_type *pLeftEnd, Tvalue_type *pRight, Tvalue_type *pRightEnd )
{
assert(pLeft != pLeftEnd && pRight != pRightEnd);
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*pUp++*itUp = move( *pLeft++ );
++itUp;
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *pUp++*itUp = move( *pRight++ ), ++itUp );
break;
}
}
else
{
*pUp++*itUp = move( *pRight++ );
++itUp;
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *pUp++*itUp = move( *pLeft++ ), ++itUp );
break;
}
}
}
template<typename TInputIt, typename P>
std::size_t parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename TInputIt, typename P>
void parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename TInputIt, typename P>
parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS ) :
m_mtx(),
m_sigInitiate(),
m_cmd( pool_thread::CMD_NONE ),
m_thrd( std::thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) )
{
using namespace std;
m_cmd = pool_thread::CMD_NONE;
m_thread = move( unique_ptr<thread>( new thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) ) );
}
template<typename TInputIt, typename P>
void parallel_merge_sort<Tparallel_merge_sort<InputIt, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
}threadLock.unlock();
bool success;
try
{
size_t ssize = get_buffer_sizecalc_buffer_size( m_n );
if( m_sortBuf.size() < ssize )
m_sortBuf.resize( ssize );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
lock_guard<mutex> threadLock.lock( m_mtx );
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename TInputIt, typename P = std::less<T>>less<typename std::iterator_traits<InputIt>::value_type>>
class ref_parallel_merge_sort
{
private:
struct ref
{
TInputIt *p;it;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( TInputIt *pBeginitBegin, size_t n, std::size_t maxUnthreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
ref_predicate m_p;
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename TInputIt, typename P>
inline
ref_parallel_merge_sort<Tref_parallel_merge_sort<InputIt, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename TInputIt, typename P>
inline
bool ref_parallel_merge_sort<Tref_parallel_merge_sort<InputIt, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.pit, *right.pit );
}
template<typename TInputIt, typename P>
inline
ref_parallel_merge_sort<Tref_parallel_merge_sort<InputIt, P>::ref_parallel_merge_sort( P const &p ) :
m_pm_sorter( ref_predicate( p ),
m_sorter( m_p )
{
}
template<typename TInputIt, typename P>
void ref_parallel_merge_sort<Tref_parallel_merge_sort<InputIt, P>::sort( TInputIt *pBeginitBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
typedef typename iterator_traits<InputIt>::value_type value_type;
vector<ref> refBuf;
InputIt it;
int i;
refBuf.resize( n );
;
for( size_t i = 0;0, it = itBegin; i != n; refBuf[i].pit = &pBegin[i]it, ++i, ++it );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<T>vector<value_type> reorderBuf;
reorderBuf.resize( n );
for( size_t i = 0;0, it = itBegin; i != n; reorderBuf[i] = move( *refBuf[i].p*it ), ++i, ++it );
for( size_t i = 0;0, it = itBegin; i != n; pBegin[i]*it = move( reorderBuf[i] ), ++i, ++it );
}
catch( ... )
{
throw sort_exception();
}
}
template<typename InputIt, typename P>
inline
std::size_t ref_parallel_merge_sort<InputIt, P>::get_buffer_size()
{
return m_sorter.get_buffer_size();
}
template<typename InputIt, typename P>
inline
void ref_parallel_merge_sort<InputIt, P>::empty_buffers()
{
m_sorter.empty_buffers();
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
typedef typename vector<double>::iterator it_type;
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<double>parallel_merge_sort<it_type> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0]v.begin(), SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0]v.begin(), SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename T, typename P = std::less<T>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( T *pBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::unique_ptr<std::thread> m_thread;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
T *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<T> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t get_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_hardwareConcurrency;
std::vector<T> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
void threaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd );
};
template<typename T, typename P>
parallel_merge_sort<T, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t minThreaded )
{
if( n <= 1 )
return;
if( (m_minThreaded = minThreaded) < 4 )
m_minThreaded = 4;
try
{
std::size_t s = pool_thread::get_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( pBegin, n, &m_callerSortBuf[0] );
}
template<typename T, typename P>
parallel_merge_sort<T, P>::~parallel_merge_sort()
{
using namespace std;
lock_guard<mutex> poolLock( m_mtxPool );
for( pool_thread &pt : m_standbyThreads )
{
unique_lock<mutex> threadLock( pt.m_mtx );
pt.m_cmd = pool_thread::CMD_STOP;
pt.m_sigInitiate.notify_one();
threadLock.unlock();
pt.m_thread->join();
}
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename T, typename P>
inline
std::size_t parallel_merge_sort<T, P>::pool_thread::get_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::threaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
bool standbyEmpty;
if( n < m_minThreaded || ((standbyEmpty = m_standbyThreads.begin() == m_standbyThreads.end()) && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( pBegin, n, pSortBuf );
return;
}
typename list<pool_thread>::iterator itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !standbyEmpty )
{
typename list<pool_thread>::iterator itPTSearch;
ptrdiff_t s = (ptrdiff_t)pool_thread::get_buffer_size( right ),
bestFit = (ptrdiff_t)((size_t)(ptrdiff_t)-1 >> 1),
diff;
for( itPT = m_standbyThreads.end(), itPTSearch = m_standbyThreads.begin();
itPTSearch != m_standbyThreads.end(); ++itPTSearch )
if( (diff = (ptrdiff_t)itPTSearch->m_sortBuf.size() - s) >= 0 && diff < bestFit )
itPT = itPTSearch,
bestFit = diff;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
threadLock.unlock();
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( pBegin[1], pBegin[0] ) )
{
T temp( move( pBegin[0] ) );
pBegin[0] = move( pBegin[1] );
pBegin[1] = move( temp );
}
return;
}
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
size_t left = n / 2,
right = n - left;
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
inline
void parallel_merge_sort<T, P>::merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd )
{
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*pUp++ = move( *pLeft++ );
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *pUp++ = move( *pRight++ ) );
break;
}
}
else
{
*pUp++ = move( *pRight++ );
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *pUp++ = move( *pLeft++ ) );
break;
}
}
}
template<typename T, typename P>
std::size_t parallel_merge_sort<T, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename T, typename P>
parallel_merge_sort<T, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS )
{
using namespace std;
m_cmd = pool_thread::CMD_NONE;
m_thread = move( unique_ptr<thread>( new thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) ) );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
}
bool success;
try
{
size_t s = get_buffer_size( m_n );
if( m_sortBuf.size() < s )
m_sortBuf.resize( s );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
lock_guard<mutex> threadLock( m_mtx );
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename T, typename P = std::less<T>>
class ref_parallel_merge_sort
{
private:
struct ref
{
T *p;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( T *pBegin, size_t n, std::size_t maxUnthreaded );
private:
ref_predicate m_p;
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename T, typename P>
inline
bool ref_parallel_merge_sort<T, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.p, *right.p );
}
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_parallel_merge_sort( P const &p ) :
m_p( p ),
m_sorter( m_p )
{
}
template<typename T, typename P>
void ref_parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
vector<ref> refBuf;
refBuf.resize( n );
for( size_t i = 0; i != n; refBuf[i].p = &pBegin[i], ++i );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<T> reorderBuf;
reorderBuf.resize( n );
for( size_t i = 0; i != n; reorderBuf[i] = move( *refBuf[i].p ), ++i );
for( size_t i = 0; i != n; pBegin[i] = move( reorderBuf[i] ), ++i );
}
catch( ... )
{
throw sort_exception();
}
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<double> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
#include <iterator>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( InputIt itBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
typedef typename std::iterator_traits<InputIt>::value_type value_type;
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::thread m_thrd;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
value_type *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<value_type> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
~pool_thread();
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t calc_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_hardwareConcurrency;
std::vector<value_type> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
template<typename InputIt2>
void threaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf );
template<typename InputIt2>
void unthreaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf );
template<typename OutputIt>
void merge_back( OutputIt itUp, value_type *pLeft, value_type *pLeftEnd, value_type *pRight, value_type *pRightEnd );
};
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t minThreaded )
{
if( n <= 1 )
return;
if( (m_minThreaded = minThreaded) < 4 )
m_minThreaded = 4;
try
{
std::size_t s = pool_thread::calc_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( itBegin, n, &m_callerSortBuf[0] );
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::~parallel_merge_sort()
{
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename InputIt, typename P>
inline
std::size_t parallel_merge_sort<InputIt, P>::pool_thread::calc_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::~pool_thread()
{
using namespace std;
unique_lock<mutex> threadLock( m_mtx );
m_cmd = pool_thread::CMD_STOP;
m_sigInitiate.notify_one();
threadLock.unlock();
m_thrd.join();
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::threaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
if( n < m_minThreaded || (m_standbyThreads.size() == 0 && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( itBegin, n, pSortBuf );
return;
}
typedef typename list<pool_thread>::iterator pt_it;
pt_it itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !m_standbyThreads.size() )
{
pt_it itPTScan;
size_t optimalSize = pool_thread::calc_buffer_size( right ),
bestFit = (size_t)(ptrdiff_t)-1,
size;
for( itPT = m_standbyThreads.end(), itPTScan = m_standbyThreads.begin();
itPTScan != m_standbyThreads.end(); ++itPTScan )
if( (size = itPTScan->m_sortBuf.size()) >= optimalSize && size < bestFit )
itPT = itPTScan,
bestFit = size;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
value_type *pMoveTo = pSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++ = move( *itMoveFrom ), ++itMoveFrom );
value_type *pLeft = pSortBuf,
*pRight = pLeft + left;
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
threadLock.unlock();
merge_back( itBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename InputIt, typename P>
template<typename InputIt2>
void parallel_merge_sort<InputIt, P>::unthreaded_sort( InputIt2 itBegin, std::size_t n, value_type *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( itBegin[1], itBegin[0] ) )
{
value_type temp( move( itBegin[0] ) );
itBegin[0] = move( itBegin[1] );
itBegin[1] = move( temp );
}
return;
}
value_type *pMoveTo = pSortBuf;
for( InputIt2 itMoveFrom = itBegin, itEnd = itMoveFrom + n; itMoveFrom != itEnd; *pMoveTo++ = move( *itMoveFrom ), ++itMoveFrom );
size_t left = n / 2,
right = n - left;
value_type *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( itBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename InputIt, typename P>
template<typename OutputIt>
inline
void parallel_merge_sort<InputIt, P>::merge_back( OutputIt itUp, value_type *pLeft, value_type *pLeftEnd, value_type *pRight, value_type *pRightEnd )
{
assert(pLeft != pLeftEnd && pRight != pRightEnd);
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*itUp = move( *pLeft++ );
++itUp;
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *itUp = move( *pRight++ ), ++itUp );
break;
}
}
else
{
*itUp = move( *pRight++ );
++itUp;
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *itUp = move( *pLeft++ ), ++itUp );
break;
}
}
}
template<typename InputIt, typename P>
std::size_t parallel_merge_sort<InputIt, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename InputIt, typename P>
parallel_merge_sort<InputIt, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS ) :
m_mtx(),
m_sigInitiate(),
m_cmd( pool_thread::CMD_NONE ),
m_thrd( std::thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) )
{
}
template<typename InputIt, typename P>
void parallel_merge_sort<InputIt, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
threadLock.unlock();
bool success;
try
{
size_t size = calc_buffer_size( m_n );
if( m_sortBuf.size() < size )
m_sortBuf.resize( size );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
threadLock.lock();
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename InputIt, typename P = std::less<typename std::iterator_traits<InputIt>::value_type>>
class ref_parallel_merge_sort
{
private:
struct ref
{
InputIt it;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename InputIt, typename P>
inline
bool ref_parallel_merge_sort<InputIt, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.it, *right.it );
}
template<typename InputIt, typename P>
inline
ref_parallel_merge_sort<InputIt, P>::ref_parallel_merge_sort( P const &p ) :
m_sorter( ref_predicate( p ) )
{
}
template<typename InputIt, typename P>
void ref_parallel_merge_sort<InputIt, P>::sort( InputIt itBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
typedef typename iterator_traits<InputIt>::value_type value_type;
vector<ref> refBuf;
InputIt it;
int i;
refBuf.resize( n );
;
for( i = 0, it = itBegin; i != n; refBuf[i].it = it, ++i, ++it );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<value_type> reorderBuf;
reorderBuf.resize( n );
for( i = 0, it = itBegin; i != n; reorderBuf[i] = move( *it ), ++i, ++it );
for( i = 0, it = itBegin; i != n; *it = move( reorderBuf[i] ), ++i, ++it );
}
catch( ... )
{
throw sort_exception();
}
}
template<typename InputIt, typename P>
inline
std::size_t ref_parallel_merge_sort<InputIt, P>::get_buffer_size()
{
return m_sorter.get_buffer_size();
}
template<typename InputIt, typename P>
inline
void ref_parallel_merge_sort<InputIt, P>::empty_buffers()
{
m_sorter.empty_buffers();
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
typedef typename vector<double>::iterator it_type;
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<it_type> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( v.begin(), SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename T, typename P = std::less<T>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( T *pBegin, size_t n, std::size_t maxUnthreadedminThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::unique_ptr<std::thread> m_thread;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
T *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<T> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t get_buffer_size( size_t n );
};
P m_p;
std::size_t m_maxUnthreaded;m_minThreaded;
unsigned m_hardwareConcurrency;
std::vector<T> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
void threaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd );
};
template<typename T, typename P>
parallel_merge_sort<T, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t maxUnthreadedminThreaded )
{
if( n <= 1 )
return;
if( (m_maxUnthreadedm_minThreaded = maxUnthreadedminThreaded) < 24 )
m_maxUnthreadedm_minThreaded = 2;4;
try
{
std::size_t s = pool_thread::get_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( pBegin, n, &m_callerSortBuf[0] );
}
template<typename T, typename P>
parallel_merge_sort<T, P>::~parallel_merge_sort()
{
using namespace std;
lock_guard<mutex> poolLock( m_mtxPool );
for( pool_thread &pt : m_standbyThreads )
{
unique_lock<mutex> threadLock( pt.m_mtx );
pt.m_cmd = pool_thread::CMD_STOP;
pt.m_sigInitiate.notify_one();
threadLock.unlock();
pt.m_thread->join();
}
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename T, typename P>
inline
std::size_t parallel_merge_sort<T, P>::pool_thread::get_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::threaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
bool standbyEmpty;
if( n <=< m_maxUnthreadedm_minThreaded || ((standbyEmpty = m_standbyThreads.begin() == m_standbyThreads.end()) && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( pBegin, n, pSortBuf );
return;
}
typename list<pool_thread>::iterator itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !standbyEmpty )
{
typename list<pool_thread>::iterator itPTSearch;
ptrdiff_t s = (ptrdiff_t)pool_thread::get_buffer_size( right ),
bestFit = (ptrdiff_t)((size_t)(ptrdiff_t)-1 >> 1),
diff;
for( itPT = m_standbyThreads.end(), itPTSearch = m_standbyThreads.begin();
itPTSearch != m_standbyThreads.end(); ++itPTSearch )
if( (diff = (ptrdiff_t)itPTSearch->m_sortBuf.size() - s) >= 0 && diff < bestFit )
itPT = itPTSearch,
bestFit = diff;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
threadLock.unlock();
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( pBegin[1], pBegin[0] ) )
{
T temp( move( pBegin[0] ) );
pBegin[0] = move( pBegin[1] );
pBegin[1] = move( temp );
}
return;
}
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
size_t left = n / 2,
right = n - left;
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
inline
void parallel_merge_sort<T, P>::merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd )
{
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*pUp++ = move( *pLeft++ );
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *pUp++ = move( *pRight++ ) );
break;
}
}
else
{
*pUp++ = move( *pRight++ );
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *pUp++ = move( *pLeft++ ) );
break;
}
}
}
template<typename T, typename P>
std::size_t parallel_merge_sort<T, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.resizeclear( 0 ),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.resizeclear( 0 );
m_callerSortBuf.shrink_to_fit();
}
template<typename T, typename P>
parallel_merge_sort<T, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS )
{
using namespace std;
m_cmd = pool_thread::CMD_NONE;
m_thread = move( unique_ptr<thread>( new thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) ) );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
}
bool success;
try
{
size_t s = get_buffer_size( m_n );
if( m_sortBuf.size() < s )
m_sortBuf.resize( s );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
lock_guard<mutex> threadLock( m_mtx );
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename T, typename P = std::less<T>>
class ref_parallel_merge_sort
{
private:
struct ref
{
T *p;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( T *pBegin, size_t n, std::size_t maxUnthreaded );
private:
ref_predicate m_p;
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename T, typename P>
inline
bool ref_parallel_merge_sort<T, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.p, *right.p );
}
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_parallel_merge_sort( P const &p ) :
m_p( p ),
m_sorter( m_p )
{
}
template<typename T, typename P>
void ref_parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
vector<ref> refBuf;
refBuf.resize( n );
for( size_t i = 0; i != n; refBuf[i].p = &pBegin[i], ++i );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<T> reorderBuf;
reorderBuf.resize( n );
for( size_t i = 0; i != n; reorderBuf[i] = move( *refBuf[i].p ), ++i );
for( size_t i = 0; i != n; pBegin[i] = move( reorderBuf[i] ), ++i );
}
catch( ... )
{
throw sort_exception();
}
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<double> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename T, typename P = std::less<T>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( T *pBegin, size_t n, std::size_t maxUnthreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::unique_ptr<std::thread> m_thread;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
T *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<T> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t get_buffer_size( size_t n );
};
P m_p;
std::size_t m_maxUnthreaded;
unsigned m_hardwareConcurrency;
std::vector<T> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
void threaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd );
};
template<typename T, typename P>
parallel_merge_sort<T, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t maxUnthreaded )
{
if( n <= 1 )
return;
if( (m_maxUnthreaded = maxUnthreaded) < 2 )
m_maxUnthreaded = 2;
try
{
std::size_t s = pool_thread::get_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( pBegin, n, &m_callerSortBuf[0] );
}
template<typename T, typename P>
parallel_merge_sort<T, P>::~parallel_merge_sort()
{
using namespace std;
lock_guard<mutex> poolLock( m_mtxPool );
for( pool_thread &pt : m_standbyThreads )
{
unique_lock<mutex> threadLock( pt.m_mtx );
pt.m_cmd = pool_thread::CMD_STOP;
pt.m_sigInitiate.notify_one();
threadLock.unlock();
pt.m_thread->join();
}
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename T, typename P>
inline
std::size_t parallel_merge_sort<T, P>::pool_thread::get_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::threaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
bool standbyEmpty;
if( n <= m_maxUnthreaded || ((standbyEmpty = m_standbyThreads.begin() == m_standbyThreads.end()) && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( pBegin, n, pSortBuf );
return;
}
typename list<pool_thread>::iterator itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !standbyEmpty )
{
typename list<pool_thread>::iterator itPTSearch;
ptrdiff_t s = (ptrdiff_t)pool_thread::get_buffer_size( right ),
bestFit = (ptrdiff_t)((size_t)(ptrdiff_t)-1 >> 1),
diff;
for( itPT = m_standbyThreads.end(), itPTSearch = m_standbyThreads.begin();
itPTSearch != m_standbyThreads.end(); ++itPTSearch )
if( (diff = (ptrdiff_t)itPTSearch->m_sortBuf.size() - s) >= 0 && diff < bestFit )
itPT = itPTSearch,
bestFit = diff;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
threadLock.unlock();
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( pBegin[1], pBegin[0] ) )
{
T temp( move( pBegin[0] ) );
pBegin[0] = move( pBegin[1] );
pBegin[1] = move( temp );
}
return;
}
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
size_t left = n / 2,
right = n - left;
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
inline
void parallel_merge_sort<T, P>::merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd )
{
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*pUp++ = move( *pLeft++ );
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *pUp++ = move( *pRight++ ) );
break;
}
}
else
{
*pUp++ = move( *pRight++ );
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *pUp++ = move( *pLeft++ ) );
break;
}
}
}
template<typename T, typename P>
std::size_t parallel_merge_sort<T, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.resize( 0 ),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.resize( 0 );
m_callerSortBuf.shrink_to_fit();
}
template<typename T, typename P>
parallel_merge_sort<T, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS )
{
using namespace std;
m_cmd = pool_thread::CMD_NONE;
m_thread = move( unique_ptr<thread>( new thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) ) );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
}
bool success;
try
{
size_t s = get_buffer_size( m_n );
if( m_sortBuf.size() < s )
m_sortBuf.resize( s );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
lock_guard<mutex> threadLock( m_mtx );
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename T, typename P = std::less<T>>
class ref_parallel_merge_sort
{
private:
struct ref
{
T *p;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( T *pBegin, size_t n, std::size_t maxUnthreaded );
private:
ref_predicate m_p;
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename T, typename P>
inline
bool ref_parallel_merge_sort<T, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.p, *right.p );
}
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_parallel_merge_sort( P const &p ) :
m_p( p ),
m_sorter( m_p )
{
}
template<typename T, typename P>
void ref_parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
vector<ref> refBuf;
refBuf.resize( n );
for( size_t i = 0; i != n; refBuf[i].p = &pBegin[i], ++i );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<T> reorderBuf;
reorderBuf.resize( n );
for( size_t i = 0; i != n; reorderBuf[i] = move( *refBuf[i].p ), ++i );
for( size_t i = 0; i != n; pBegin[i] = move( reorderBuf[i] ), ++i );
}
catch( ... )
{
throw sort_exception();
}
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<double> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
#include <vector>
#include <vector>
#include <list>
#include <thread>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <utility>
#include <exception>
#include <cassert>
#include <cstring>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
struct sort_exception : public std::exception
{
};
template<typename T, typename P = std::less<T>>
class parallel_merge_sort
{
public:
parallel_merge_sort( P const &p = P() );
~parallel_merge_sort();
void sort( T *pBegin, size_t n, std::size_t minThreaded );
std::size_t get_buffer_size();
void empty_buffers();
private:
struct pool_thread
{
enum CMD : int { CMD_STOP = -1, CMD_NONE = 0, CMD_SORT = 1 };
enum RSP : int { RSP_ERR = -1, RSP_NONE = 0, RSP_SUCCESS = 1 };
std::unique_ptr<std::thread> m_thread;
std::mutex m_mtx;
std::condition_variable m_sigInitiate;
CMD m_cmd;
T *m_pBegin;
std::size_t m_n;
std::condition_variable m_sigResponse;
RSP m_rsp;
std::vector<T> m_sortBuf;
pool_thread( parallel_merge_sort *pPMS );
void sort_thread( parallel_merge_sort *pPMS );
static std::size_t get_buffer_size( size_t n );
};
P m_p;
std::size_t m_minThreaded;
unsigned m_hardwareConcurrency;
std::vector<T> m_callerSortBuf;
std::mutex m_mtxPool;
std::list<pool_thread> m_standbyThreads;
std::list<pool_thread> m_activeThreads;
void threaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf );
void merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd );
};
template<typename T, typename P>
parallel_merge_sort<T, P>::parallel_merge_sort( P const &p ) :
m_p( p ),
m_hardwareConcurrency( std::thread::hardware_concurrency() )
{
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t minThreaded )
{
if( n <= 1 )
return;
if( (m_minThreaded = minThreaded) < 4 )
m_minThreaded = 4;
try
{
std::size_t s = pool_thread::get_buffer_size( n );
if( m_callerSortBuf.size() < s )
m_callerSortBuf.resize( s );
}
catch( ... )
{
throw sort_exception();
}
threaded_sort( pBegin, n, &m_callerSortBuf[0] );
}
template<typename T, typename P>
parallel_merge_sort<T, P>::~parallel_merge_sort()
{
using namespace std;
lock_guard<mutex> poolLock( m_mtxPool );
for( pool_thread &pt : m_standbyThreads )
{
unique_lock<mutex> threadLock( pt.m_mtx );
pt.m_cmd = pool_thread::CMD_STOP;
pt.m_sigInitiate.notify_one();
threadLock.unlock();
pt.m_thread->join();
}
assert(m_activeThreads.begin() == m_activeThreads.end());
}
template<typename T, typename P>
inline
std::size_t parallel_merge_sort<T, P>::pool_thread::get_buffer_size( std::size_t n )
{
for( std::size_t rest = n, right; rest > 2; rest = right )
right = rest - (rest / 2),
n += right;
return n;
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::threaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
using namespace std;
unique_lock<mutex> poolLock( m_mtxPool );
bool standbyEmpty;
if( n < m_minThreaded || ((standbyEmpty = m_standbyThreads.begin() == m_standbyThreads.end()) && m_activeThreads.size() >= (m_hardwareConcurrency - 1)) )
{
poolLock.unlock();
unthreaded_sort( pBegin, n, pSortBuf );
return;
}
typename list<pool_thread>::iterator itPT;
pool_thread *pPT;
size_t left = n / 2,
right = n - left;
if( !standbyEmpty )
{
typename list<pool_thread>::iterator itPTSearch;
ptrdiff_t s = (ptrdiff_t)pool_thread::get_buffer_size( right ),
bestFit = (ptrdiff_t)((size_t)(ptrdiff_t)-1 >> 1),
diff;
for( itPT = m_standbyThreads.end(), itPTSearch = m_standbyThreads.begin();
itPTSearch != m_standbyThreads.end(); ++itPTSearch )
if( (diff = (ptrdiff_t)itPTSearch->m_sortBuf.size() - s) >= 0 && diff < bestFit )
itPT = itPTSearch,
bestFit = diff;
if( itPT == m_standbyThreads.end() )
itPT = --m_standbyThreads.end();
m_activeThreads.splice( m_activeThreads.end(), m_standbyThreads, itPT );
poolLock.unlock();
pPT = &*itPT;
}
else
m_activeThreads.emplace_back( this ),
itPT = --m_activeThreads.end(),
pPT = &*itPT,
poolLock.unlock();
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
auto pushThreadBack = [&poolLock, &itPT, this]()
{
poolLock.lock();
m_standbyThreads.splice( m_standbyThreads.end(), m_activeThreads, itPT );
};
invoke_on_destruct<decltype(pushThreadBack)> autoPushBackThread( pushThreadBack );
unique_lock<mutex> threadLock( pPT->m_mtx );
pPT->m_cmd = pool_thread::CMD_SORT;
pPT->m_rsp = pool_thread::RSP_NONE;
pPT->m_pBegin = pRight;
pPT->m_n = right;
pPT->m_sigInitiate.notify_one();
threadLock.unlock();
auto waitForThread = [&threadLock, pPT]()
{
threadLock.lock();
while( pPT->m_rsp == pool_thread::RSP_NONE )
pPT->m_sigResponse.wait( threadLock );
assert(pPT->m_rsp == pool_thread::RSP_SUCCESS || pPT->m_rsp == pool_thread::RSP_ERR);
threadLock.unlock();
};
invoke_on_destruct<decltype(waitForThread)> autoWaitForThread( waitForThread );
threaded_sort( pLeft, left, pSortBuf + n );
autoWaitForThread.invoke_and_disable();
if( pPT->m_rsp == pool_thread::RSP_ERR )
throw sort_exception();
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::unthreaded_sort( T *pBegin, std::size_t n, T *pSortBuf )
{
assert(n >= 2);
using namespace std;
if( n == 2 )
{
if( m_p( pBegin[1], pBegin[0] ) )
{
T temp( move( pBegin[0] ) );
pBegin[0] = move( pBegin[1] );
pBegin[1] = move( temp );
}
return;
}
for( T *pMoveFrom = pBegin, *pMoveTo = pSortBuf; pMoveFrom != (pBegin + n); *pMoveTo++ = move( *pMoveFrom++ ) );
size_t left = n / 2,
right = n - left;
T *pLeft = pSortBuf,
*pRight = pSortBuf + left;
if( left >= 2 )
unthreaded_sort( pLeft, left, pSortBuf + n );
if( right >= 2 )
unthreaded_sort( pRight, right, pSortBuf + n );
merge_back( pBegin, pLeft, pLeft + left, pRight, pRight + right );
}
template<typename T, typename P>
inline
void parallel_merge_sort<T, P>::merge_back( T *pUp, T *pLeft, T *pLeftEnd, T *pRight, T *pRightEnd )
{
using namespace std;
for( ; ; )
if( m_p( *pLeft, *pRight ) )
{
*pUp++ = move( *pLeft++ );
if( pLeft == pLeftEnd )
{
for( ; pRight != pRightEnd; *pUp++ = move( *pRight++ ) );
break;
}
}
else
{
*pUp++ = move( *pRight++ );
if( pRight == pRightEnd )
{
for( ; pLeft != pLeftEnd; *pUp++ = move( *pLeft++ ) );
break;
}
}
}
template<typename T, typename P>
std::size_t parallel_merge_sort<T, P>::get_buffer_size()
{
std::size_t s = 0;
for( pool_thread &pt : m_standbyThreads )
s += pt.m_sortBuf.capacity();
return s + m_callerSortBuf.capacity();
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::empty_buffers()
{
for( pool_thread &pt : m_standbyThreads )
pt.m_sortBuf.clear(),
pt.m_sortBuf.shrink_to_fit();
m_callerSortBuf.clear();
m_callerSortBuf.shrink_to_fit();
}
template<typename T, typename P>
parallel_merge_sort<T, P>::pool_thread::pool_thread( parallel_merge_sort *pPMS )
{
using namespace std;
m_cmd = pool_thread::CMD_NONE;
m_thread = move( unique_ptr<thread>( new thread( []( pool_thread *pPT, parallel_merge_sort *pPMS ) -> void { pPT->sort_thread( pPMS ); }, this, pPMS ) ) );
}
template<typename T, typename P>
void parallel_merge_sort<T, P>::pool_thread::sort_thread( parallel_merge_sort *pPMS )
{
using namespace std;
for( ; ; )
{
{
unique_lock<mutex> threadLock( m_mtx );
while( m_cmd == CMD_NONE )
m_sigInitiate.wait( threadLock );
if( m_cmd == CMD_STOP )
return;
assert(m_cmd == pool_thread::CMD_SORT);
m_cmd = CMD_NONE;
}
bool success;
try
{
size_t s = get_buffer_size( m_n );
if( m_sortBuf.size() < s )
m_sortBuf.resize( s );
pPMS->threaded_sort( m_pBegin, m_n, &m_sortBuf[0] );
success = true;
}
catch( ... )
{
success = false;
}
lock_guard<mutex> threadLock( m_mtx );
m_rsp = success ? RSP_SUCCESS : RSP_ERR,
m_sigResponse.notify_one();
}
}
template<typename T, typename P = std::less<T>>
class ref_parallel_merge_sort
{
private:
struct ref
{
T *p;
};
struct ref_predicate
{
ref_predicate( P p );
bool operator ()( ref const &left, ref const &right );
P m_p;
};
public:
ref_parallel_merge_sort( P const &p = P() );
void sort( T *pBegin, size_t n, std::size_t maxUnthreaded );
private:
ref_predicate m_p;
parallel_merge_sort<ref, ref_predicate> m_sorter;
};
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_predicate::ref_predicate( P p ) :
m_p ( p )
{
}
template<typename T, typename P>
inline
bool ref_parallel_merge_sort<T, P>::ref_predicate::operator ()( ref const &left, ref const &right )
{
return m_p( *left.p, *right.p );
}
template<typename T, typename P>
inline
ref_parallel_merge_sort<T, P>::ref_parallel_merge_sort( P const &p ) :
m_p( p ),
m_sorter( m_p )
{
}
template<typename T, typename P>
void ref_parallel_merge_sort<T, P>::sort( T *pBegin, size_t n, std::size_t maxUnthreaded )
{
using namespace std;
try
{
vector<ref> refBuf;
refBuf.resize( n );
for( size_t i = 0; i != n; refBuf[i].p = &pBegin[i], ++i );
m_sorter.sort( &refBuf[0], n, maxUnthreaded );
vector<T> reorderBuf;
reorderBuf.resize( n );
for( size_t i = 0; i != n; reorderBuf[i] = move( *refBuf[i].p ), ++i );
for( size_t i = 0; i != n; pBegin[i] = move( reorderBuf[i] ), ++i );
}
catch( ... )
{
throw sort_exception();
}
}
#include <iostream>
#include <cstdlib>
#include <functional>
#include <random>
#include <cstdint>
#include <iterator>
#if defined(_MSC_VER)
#include <Windows.h>
double get_usecs()
{
LONGLONG liTime;
GetSystemTimeAsFileTime( &(FILETIME &)liTime );
return (double)liTime / 10.0;
}
#elif defined(__unix__)
#include <sys/time.h>
double get_usecs()
{
timeval tv;
gettimeofday( &tv, nullptr );
return (double)tv.tv_sec * 1'000'000.0 + tv.tv_usec;
}
#elif
#error no OS-support for get_usecs()
#endif
using namespace std;
void fill_with_random( double *p, size_t n, unsigned seed = 0 )
{
default_random_engine re( seed );
uniform_real_distribution<double> distrib;
for( double *pEnd = p + n; p != pEnd; *p++ = distrib( re ) );
}
template<typename T>
string decimal_unsigned( T t );
int main()
{
size_t const SIZE = (size_t)1024 * 1024 * 1024 / sizeof(double);
unsigned hc = thread::hardware_concurrency();
vector<double> v;
double t;
v.resize( SIZE );
parallel_merge_sort<double> sd;
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE / hc );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds parallel" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
fill_with_random( &v[0], SIZE );
t = get_usecs();
sd.sort( &v[0], SIZE, SIZE );
t = get_usecs() - t;
cout << (t / 1'000'000.0) << " seconds sequential" << endl;
cout << decimal_unsigned( sd.get_buffer_size() * sizeof(double) ) << endl;
sd.empty_buffers();
}
#include <sstream>
string decify_string( string const &s );
template<typename T>
string decimal_unsigned( T t )
{
using namespace std;
ostringstream oss;
return move( decify_string( (oss << t, oss.str()) ) );
}
string decify_string( string const &s )
{
using namespace std;
ostringstream oss;
size_t length = s.length(),
head = length % 3,
segments = length / 3;
if( head == 0 && segments >= 1 )
head = 3,
--segments;
oss << s.substr( 0, head );
for( size_t i = head; i != length; i += 3 )
oss << "." << s.substr( i, 3 );
return move( oss.str() );
}
Loading
Loading
Loading
Loading
Loading
Loading
Loading
Loading
Loading
Loading
lang-cpp