Intel(R) Threading Building Blocks Doxygen Documentation
version 4.2.3
|
Go to the documentation of this file.
17 #ifndef __TBB__concurrent_queue_impl_H
18 #define __TBB__concurrent_queue_impl_H
20 #ifndef __TBB_concurrent_queue_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 #include "../tbb_stddef.h"
25 #include "../tbb_machine.h"
26 #include "../atomic.h"
27 #include "../spin_mutex.h"
28 #include "../cache_aligned_allocator.h"
29 #include "../tbb_exception.h"
30 #include "../tbb_profiling.h"
32 #include __TBB_STD_SWAP_HEADER
37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
40 namespace strict_ppl {
41 template<
typename T,
typename A>
class concurrent_queue;
44 template<
typename T,
typename A>
class concurrent_bounded_queue;
49 namespace strict_ppl {
73 static const size_t phi = 3;
77 static const size_t n_queue = 8;
103 return uintptr_t(
p)>1;
121 #if _MSC_VER && !defined(__INTEL_COMPILER)
123 #pragma warning( push )
124 #pragma warning( disable: 4146 )
133 typedef void (*item_constructor_t)(T* location,
const void* src);
145 void copy_item(
page& dst,
size_t dindex,
const void* src, item_constructor_t construct_item ) {
146 construct_item( &get_ref(dst, dindex), src );
150 item_constructor_t construct_item )
152 T& src_item = get_ref(
const_cast<page&
>(src), sindex );
153 construct_item( &get_ref(dst, dindex),
static_cast<const void*
>(&src_item) );
157 T& from = get_ref(src,index);
177 return (&
static_cast<padded_page*
>(
static_cast<void*
>(&
p))->
last)[index];
189 item_constructor_t construct_item ) ;
194 item_constructor_t construct_item ) ;
197 size_t end_in_page,
ticket& g_index, item_constructor_t construct_item ) ;
199 void invalidate_page_and_rethrow(
ticket k ) ;
216 item_constructor_t construct_item )
226 ++base.
my_rep->n_invalid_entries;
227 invalidate_page_and_rethrow( k );
233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.
my_rep );
249 copy_item( *
p, index, item, construct_item );
255 ++base.
my_rep->n_invalid_entries;
272 bool success =
false;
275 if(
p->mask & uintptr_t(1)<<index ) {
277 assign_and_destroy_item( dst, *
p, index );
279 --base.
my_rep->n_invalid_entries;
287 item_constructor_t construct_item )
294 ticket g_index = head_counter;
298 size_t end_in_first_page = (index+n_items<base.
my_rep->items_per_page)?(index+n_items):base.
my_rep->items_per_page;
300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301 page* cur_page = head_page;
305 cur_page->
next = make_copy( base, srcp, 0, base.
my_rep->items_per_page, g_index, construct_item );
306 cur_page = cur_page->
next;
311 if( last_index==0 ) last_index = base.
my_rep->items_per_page;
313 cur_page->
next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314 cur_page = cur_page->
next;
316 tail_page = cur_page;
318 invalidate_page_and_rethrow( g_index );
321 head_page = tail_page = NULL;
329 page* invalid_page = (
page*)uintptr_t(1);
335 q->
next = invalid_page;
337 head_page = invalid_page;
338 tail_page = invalid_page;
346 ticket& g_index, item_constructor_t construct_item )
350 new_page->
next = NULL;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->
mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
367 my_ticket(k), my_queue(queue), my_page(
p), allocator(b)
378 my_queue.head_page = q;
380 my_queue.tail_page = NULL;
385 allocator.deallocate_page(
p );
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
406 return k*phi%n_queue;
411 return array[index(k)];
440 size_t n =
sizeof(
padded_page) + (r.items_per_page-1)*
sizeof(T);
441 return reinterpret_cast<page*
>(allocate_block ( n ));
446 size_t n =
sizeof(
padded_page) + (r.items_per_page-1)*
sizeof(T);
447 deallocate_block(
reinterpret_cast<void*
>(
p), n );
451 virtual void *allocate_block(
size_t n ) = 0;
454 virtual void deallocate_block(
void *
p,
size_t n ) = 0;
461 size_t nq = my_rep->n_queue;
462 for(
size_t i=0; i<nq; i++ )
463 __TBB_ASSERT( my_rep->
array[i].tail_page==NULL,
"pages were not freed properly" );
471 ticket k = r.tail_counter++;
472 r.
choose(k).push( src, k, *
this, construct_item );
477 bool internal_try_pop(
void* dst ) ;
480 size_t internal_size()
const ;
483 bool internal_empty()
const ;
487 void internal_finish_clear() ;
497 #if __TBB_CPP11_RVALUE_REF_PRESENT
507 const size_t item_size =
sizeof(T);
514 my_rep->item_size = item_size;
515 my_rep->items_per_page = item_size<= 8 ? 32 :
516 item_size<= 16 ? 16 :
530 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
536 #if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (push)
538 #pragma warning (disable: 4267)
540 k = r.head_counter.compare_and_swap( tk+1, tk );
541 #if defined(_MSC_VER) && defined(_Wp64)
542 #pragma warning (pop)
548 }
while( !r.
choose( k ).pop( dst, k, *
this ) );
555 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
556 ticket hc = r.head_counter;
557 size_t nie = r.n_invalid_entries;
558 ticket tc = r.tail_counter;
560 ptrdiff_t sz = tc-hc-nie;
561 return sz<0 ? 0 : size_t(sz);
567 ticket tc = r.tail_counter;
568 ticket hc = r.head_counter;
570 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
576 size_t nq = r.n_queue;
577 for(
size_t i=0; i<nq; ++i ) {
581 deallocate_page( tp );
582 r.
array[i].tail_page = NULL;
593 r.items_per_page = src.
my_rep->items_per_page;
596 r.head_counter = src.
my_rep->head_counter;
597 r.tail_counter = src.
my_rep->tail_counter;
598 r.n_invalid_entries = src.
my_rep->n_invalid_entries;
601 for(
size_t i = 0; i < r.n_queue; ++i )
602 r.
array[i].assign( src.
my_rep->array[i], *
this, construct_item);
605 "the source concurrent queue should not be concurrently modified." );
618 head_counter(queue.my_rep->head_counter),
621 for(
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622 array[k] = queue.
my_rep->array[k].head_page;
626 bool get_item( T*& item,
size_t k ) ;
631 if( k==my_queue.my_rep->tail_counter ) {
639 return (
p->mask & uintptr_t(1)<<i)!=0;
645 template<
typename Value>
651 template<
typename C,
typename T,
typename U>
654 template<
typename C,
typename T,
typename U>
662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
669 : my_rep(NULL), my_item(NULL) {
694 template<
typename Value>
699 if( !my_rep->get_item(my_item, k) ) advance();
702 template<
typename Value>
704 if( my_rep!=other.
my_rep ) {
717 template<
typename Value>
719 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
720 size_t k = my_rep->head_counter;
724 my_rep->get_item(tmp,k);
728 if( i==queue.
my_rep->items_per_page-1 ) {
733 my_rep->head_counter = ++k;
734 if( !my_rep->get_item(my_item, k) ) advance();
747 template<
typename Container,
typename Value>
749 public std::iterator<std::forward_iterator_tag,Value> {
750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
751 template<
typename T,
class A>
752 friend class ::tbb::strict_ppl::concurrent_queue;
779 return *
static_cast<Value*
>(this->my_item);
792 Value* result = &operator*();
799 template<
typename C,
typename T,
typename U>
801 return i.my_item==j.my_item;
804 template<
typename C,
typename T,
typename U>
806 return i.my_item!=j.my_item;
854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
868 virtual void copy_item(
page& dst,
size_t index,
const void* src ) = 0;
869 virtual void assign_and_destroy_item(
void* dst,
page& src,
size_t index ) = 0;
915 #if __TBB_CPP11_RVALUE_REF_PRESENT
926 void internal_insert_item(
const void* src, copy_specifics op_type );
929 bool internal_insert_if_not_full(
const void* src, copy_specifics op_type );
934 virtual void copy_page_item(
page& dst,
size_t dindex,
const page& src,
size_t sindex ) = 0;
955 virtual void move_item(
page& dst,
size_t index,
const void* src ) = 0;
965 template<
typename C,
typename T,
typename U>
968 template<
typename C,
typename T,
typename U>
1011 template<
typename Container,
typename Value>
1013 public std::iterator<std::forward_iterator_tag,Value> {
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<
typename T,
class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1045 return *
static_cast<Value*
>(my_item);
1065 template<
typename C,
typename T,
typename U>
1067 return i.my_item==j.my_item;
1070 template<
typename C,
typename T,
typename U>
1072 return i.my_item!=j.my_item;
concurrent_queue_iterator_base_v3()
Default constructor.
concurrent_queue_rep * my_rep
Internal representation.
micro_queue< T > array[n_queue]
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
static T & get_ref(page &p, size_t index)
atomic< page * > tail_page
size_t item_size
Size of an item.
micro_queue< T >::padded_page padded_page
concurrent_queue_iterator()
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
atomic< ticket > tail_counter
const concurrent_queue_base_v3< T > & my_queue
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
Value * operator->() const
Base class for types that should not be assigned.
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
atomic< page * > head_page
Value & operator*() const
Reference to current item.
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
Type-independent portion of concurrent_queue_iterator.
Constness-independent portion of concurrent_queue_iterator.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
concurrent_queue_iterator_base_v3()
Default constructor.
Class used to ensure exception-safety of method "pop".
Container::iterator last(Container &c)
padded_page()
Not defined anywhere - exists to quiet warnings.
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
void swap(atomic< T > &lhs, atomic< T > &rhs)
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
void invalidate_page_and_rethrow(ticket k)
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
Meets requirements of a forward iterator for STL.
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
~concurrent_queue_iterator_base_v3()
Destructor.
concurrent_queue_base_v8(size_t item_sz)
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
micro_queue< T >::item_constructor_t item_constructor_t
base class of concurrent_queue
static size_t index(ticket k)
Map ticket to an array index.
Represents acquisition of a mutex.
virtual concurrent_queue_rep_base::page * allocate_page()=0
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
static const size_t n_queue
void internal_finish_clear()
free any remaining pages
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
T last
Must be last field.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
void * my_item
Pointer to current item.
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
A queue using simple locking.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
virtual ~concurrent_queue_base_v3()
concurrent_queue_base_v3()
void internal_throw_exception() const
Obsolete.
bool internal_empty() const
check if the queue is empty; thread safe
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
void move(tbb_thread &t1, tbb_thread &t2)
void pause()
Pause for a while.
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Class that implements exponential backoff.
void assign_and_destroy_item(void *dst, page &src, size_t index)
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
parts of concurrent_queue_rep that do not have references to micro_queue
atomic< ticket > head_counter
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
T last
Must be last field.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
Meets requirements of a forward iterator for STL.
Value & operator*() const
Reference to current item.
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
Base class for types that should not be copied or assigned.
virtual void move_item(page &dst, size_t index, const void *src)=0
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
Internal representation of a ConcurrentQueue.
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
A queue using simple locking.
void itt_hide_store_word(T &dst, T src)
Value * my_item
Pointer to current item.
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Abstract class to define interface for page allocation/deallocation.
atomic< ticket > head_counter
size_t item_size
Size of an item.
representation of concurrent_queue_base
size_t items_per_page
Always a power of 2.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
virtual page * allocate_page() __TBB_override
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
concurrent_queue_rep< T > * my_rep
Internal representation.
concurrent_queue_rep_base::page page
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
micro_queue< T >::padded_page padded_page
size_t items_per_page
Always a power of 2.
concurrent_queue_page_allocator & allocator
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
Value * operator++(int)
Post increment.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void advance()
Advance iterator one step towards tail of queue.
atomic< ticket > tail_counter
Identifiers declared inside namespace internal should never be used directly by client code.
micro_queue< T > & my_queue
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
void const char const char int ITT_FORMAT __itt_group_sync p
virtual ~concurrent_queue_page_allocator()
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
#define __TBB_EXPORTED_METHOD
Value * operator++(int)
Post increment.
void call_itt_notify(notify_type, void *)
concurrent_queue_iterator & operator++()
Advance to next item in queue.
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
Similar to C++0x std::remove_cv.
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
ptrdiff_t my_capacity
Capacity of the queue.
micro_queue< T > & choose(ticket k)
Value * operator->() const
A lock that occupies a single byte.
~micro_queue_pop_finalizer()
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
concurrent_queue_rep< T >::page page
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
concurrent_queue_iterator()
#define __TBB_compiler_fence()
concurrent_queue_rep_base::page page
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
Copyright © 2005-2020 Intel Corporation. All Rights Reserved.
Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are
registered trademarks or trademarks of Intel Corporation or its
subsidiaries in the United States and other countries.
* Other names and brands may be claimed as the property of others.