Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_queue.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/tbb_machine.h"
19 #include "tbb/tbb_exception.h"
20 // Define required to satisfy test in internal file.
21 #define __TBB_concurrent_queue_H
23 #include "concurrent_monitor.h"
24 #include "itt_notify.h"
25 #include <new>
26 #include <cstring> // for memset()
27 
28 #if defined(_MSC_VER) && defined(_Wp64)
29  // Workaround for overzealous compiler warnings in /Wp64 mode
30  #pragma warning (disable: 4267)
31 #endif
32 
33 #define RECORD_EVENTS 0
34 
35 
36 namespace tbb {
37 
38 namespace internal {
39 
41 
42 typedef size_t ticket;
43 
45 
47 struct micro_queue {
49 
51 
52  atomic<page*> head_page;
53  atomic<ticket> head_counter;
54 
55  atomic<page*> tail_page;
56  atomic<ticket> tail_counter;
57 
59 
60  void push( const void* item, ticket k, concurrent_queue_base& base,
62 
63  void abort_push( ticket k, concurrent_queue_base& base );
64 
65  bool pop( void* dst, ticket k, concurrent_queue_base& base );
66 
69 
70  page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page,
71  size_t end_in_page, ticket& g_index, concurrent_queue_base::copy_specifics op_type ) ;
72 
73  void make_invalid( ticket k );
74 };
75 
76 // we need to yank it out of micro_queue because of concurrent_queue_base::deallocate_page being virtual.
83 public:
85  my_ticket(k), my_queue(queue), my_page(p), base(b)
86  {}
88  page* p = my_page;
89  if( p ) {
91  page* q = p->next;
92  my_queue.head_page = q;
93  if( !q ) {
94  my_queue.tail_page = NULL;
95  }
96  }
98  if( p )
100  }
101 };
102 
105  predicate_leq( ticket t_ ) : t(t_) {}
106  bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
107 };
108 
110 
113 public:
114 private:
115  friend struct micro_queue;
116 
118  static const size_t phi = 3;
119 
120 public:
122  static const size_t n_queue = 8;
123 
125  static size_t index( ticket k ) {
126  return k*phi%n_queue;
127  }
128 
129  atomic<ticket> head_counter;
131  atomic<size_t> n_invalid_entries;
132  char pad1[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor)+sizeof(atomic<size_t>))&(NFS_MaxLineSize-1))];
133 
134  atomic<ticket> tail_counter;
136  char pad2[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))];
138 
140  // The formula here approximates LRU in a cache-oblivious way.
141  return array[index(k)];
142  }
143 
144  atomic<unsigned> abort_counter;
145 
147  static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
148 };
149 
150 #if _MSC_VER && !defined(__INTEL_COMPILER)
151  // unary minus operator applied to unsigned type, result still unsigned
152  #pragma warning( push )
153  #pragma warning( disable: 4146 )
154 #endif
155 
156 static void* static_invalid_page;
157 
158 //------------------------------------------------------------------------
159 // micro_queue
160 //------------------------------------------------------------------------
161 void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base,
164  page* p = NULL;
165  // find index on page where we would put the data
167  if( !index ) { // make a new page
168  __TBB_TRY {
169  p = base.allocate_page();
170  } __TBB_CATCH(...) {
171  ++base.my_rep->n_invalid_entries;
172  make_invalid( k );
173  __TBB_RETHROW();
174  }
175  p->mask = 0;
176  p->next = NULL;
177  }
178 
179  // wait for my turn
180  if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
181  for( atomic_backoff b(true);;b.pause() ) {
183  if( tail==k ) break;
184  else if( tail&0x1 ) {
185  // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
186  ++base.my_rep->n_invalid_entries;
188  }
189  }
190 
191  if( p ) { // page is newly allocated; insert in micro_queue
193  if( page* q = tail_page )
194  q->next = p;
195  else
196  head_page = p;
197  tail_page = p;
198  }
199 
200  if (item) {
201  p = tail_page;
202  ITT_NOTIFY( sync_acquired, p );
203  __TBB_TRY {
204  if( concurrent_queue_base::copy == op_type ) {
205  base.copy_item( *p, index, item );
206  } else {
207  __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
208  static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
209  }
210  } __TBB_CATCH(...) {
211  ++base.my_rep->n_invalid_entries;
213  __TBB_RETHROW();
214  }
216  // If no exception was thrown, mark item as present.
217  p->mask |= uintptr_t(1)<<index;
218  }
219  else // no item; this was called from abort_push
220  ++base.my_rep->n_invalid_entries;
221 
223 }
224 
225 
227  push(NULL, k, base, concurrent_queue_base::copy);
228 }
229 
230 bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
234  page *p = head_page;
235  __TBB_ASSERT( p, NULL );
237  bool success = false;
238  {
239  micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
240  if( p->mask & uintptr_t(1)<<index ) {
241  success = true;
242  ITT_NOTIFY( sync_acquired, dst );
243  ITT_NOTIFY( sync_acquired, head_page );
244  base.assign_and_destroy_item( dst, *p, index );
246  } else {
247  --base.my_rep->n_invalid_entries;
248  }
249  }
250  return success;
251 }
252 
255 {
258 
259  const page* srcp = src.head_page;
260  if( srcp ) {
261  ticket g_index = head_counter;
262  __TBB_TRY {
265  size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
266 
267  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
268  page* cur_page = head_page;
269 
270  if( srcp != src.tail_page ) {
271  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
272  cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
273  cur_page = cur_page->next;
274  }
275 
276  __TBB_ASSERT( srcp==src.tail_page, NULL );
277 
279  if( last_index==0 ) last_index = base.items_per_page;
280 
281  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
282  cur_page = cur_page->next;
283  }
284  tail_page = cur_page;
285  } __TBB_CATCH(...) {
286  make_invalid( g_index );
287  __TBB_RETHROW();
288  }
289  } else {
290  head_page = tail_page = NULL;
291  }
292  return *this;
293 }
294 
296  const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page,
298 {
299  page* new_page = base.allocate_page();
300  new_page->next = NULL;
301  new_page->mask = src_page->mask;
302  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
303  if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
304  if( concurrent_queue_base::copy == op_type ) {
305  base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
306  } else {
307  __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
308  static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
309  }
310  }
311  return new_page;
312 }
313 
315 {
316  static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
317  // mark it so that no more pushes are allowed.
318  static_invalid_page = &dummy;
319  {
322  if( page* q = tail_page )
323  q->next = static_cast<page*>(static_invalid_page);
324  else
325  head_page = static_cast<page*>(static_invalid_page);
326  tail_page = static_cast<page*>(static_invalid_page);
327  }
328 }
329 
330 #if _MSC_VER && !defined(__INTEL_COMPILER)
331  #pragma warning( pop )
332 #endif // warning 4146 is back
333 
334 //------------------------------------------------------------------------
335 // concurrent_queue_base
336 //------------------------------------------------------------------------
338  items_per_page = item_sz<= 8 ? 32 :
339  item_sz<= 16 ? 16 :
340  item_sz<= 32 ? 8 :
341  item_sz<= 64 ? 4 :
342  item_sz<=128 ? 2 :
343  1;
344  my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
346  __TBB_ASSERT( is_aligned(my_rep, NFS_GetLineSize()), "alignment error" );
347  __TBB_ASSERT( is_aligned(&my_rep->head_counter, NFS_GetLineSize()), "alignment error" );
348  __TBB_ASSERT( is_aligned(&my_rep->tail_counter, NFS_GetLineSize()), "alignment error" );
349  __TBB_ASSERT( is_aligned(&my_rep->array, NFS_GetLineSize()), "alignment error" );
350  std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
353  this->item_size = item_sz;
354 }
355 
357  size_t nq = my_rep->n_queue;
358  for( size_t i=0; i<nq; i++ )
359  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
361 }
362 
364  internal_insert_item( src, copy );
365 }
366 
368  internal_insert_item( src, move );
369 }
370 
373  unsigned old_abort_counter = r.abort_counter;
374  ticket k = r.tail_counter++;
375  ptrdiff_t e = my_capacity;
376 #if DO_ITT_NOTIFY
377  bool sync_prepare_done = false;
378 #endif
379  if( (ptrdiff_t)(k-r.head_counter)>=e ) { // queue is full
380 #if DO_ITT_NOTIFY
381  if( !sync_prepare_done ) {
382  ITT_NOTIFY( sync_prepare, &sync_prepare_done );
383  sync_prepare_done = true;
384  }
385 #endif
386  bool slept = false;
388  r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
389  while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
390  __TBB_TRY {
391  if( r.abort_counter!=old_abort_counter ) {
392  r.slots_avail.cancel_wait( thr_ctx );
394  }
395  slept = r.slots_avail.commit_wait( thr_ctx );
397  r.choose(k).abort_push(k, *this);
398  __TBB_RETHROW();
399  } __TBB_CATCH(...) {
400  __TBB_RETHROW();
401  }
402  if (slept == true) break;
403  r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
404  }
405  if( !slept )
406  r.slots_avail.cancel_wait( thr_ctx );
407  }
408  ITT_NOTIFY( sync_acquired, &sync_prepare_done );
409  __TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
410  r.choose( k ).push( src, k, *this, op_type );
412 }
413 
416  ticket k;
417 #if DO_ITT_NOTIFY
418  bool sync_prepare_done = false;
419 #endif
420  unsigned old_abort_counter = r.abort_counter;
421  // This loop is a single pop operation; abort_counter should not be re-read inside
422  do {
423  k=r.head_counter++;
424  if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // queue is empty
425 #if DO_ITT_NOTIFY
426  if( !sync_prepare_done ) {
427  ITT_NOTIFY( sync_prepare, dst );
428  sync_prepare_done = true;
429  }
430 #endif
431  bool slept = false;
433  r.items_avail.prepare_wait( thr_ctx, k );
434  while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
435  __TBB_TRY {
436  if( r.abort_counter!=old_abort_counter ) {
437  r.items_avail.cancel_wait( thr_ctx );
439  }
440  slept = r.items_avail.commit_wait( thr_ctx );
442  r.head_counter--;
443  __TBB_RETHROW();
444  } __TBB_CATCH(...) {
445  __TBB_RETHROW();
446  }
447  if (slept == true) break;
448  r.items_avail.prepare_wait( thr_ctx, k );
449  }
450  if( !slept )
451  r.items_avail.cancel_wait( thr_ctx );
452  }
453  __TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
454  } while( !r.choose(k).pop(dst,k,*this) );
455 
456  // wake up a producer..
458 }
459 
462  ++r.abort_counter;
465 }
466 
469  ticket k;
470  do {
471  k = r.head_counter;
472  for(;;) {
473  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
474  // Queue is empty
475  return false;
476  }
477  // Queue had item with ticket k when we looked. Attempt to get that item.
478  ticket tk=k;
479  k = r.head_counter.compare_and_swap( tk+1, tk );
480  if( k==tk )
481  break;
482  // Another thread snatched the item, retry.
483  }
484  } while( !r.choose( k ).pop( dst, k, *this ) );
485 
487 
488  return true;
489 }
490 
492  return internal_insert_if_not_full( src, copy );
493 }
494 
496  return internal_insert_if_not_full( src, move );
497 }
498 
501  ticket k = r.tail_counter;
502  for(;;) {
503  if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
504  // Queue is full
505  return false;
506  }
507  // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
508  ticket tk=k;
509  k = r.tail_counter.compare_and_swap( tk+1, tk );
510  if( k==tk )
511  break;
512  // Another thread claimed the slot, so retry.
513  }
514  r.choose(k).push(src, k, *this, op_type);
516  return true;
517 }
518 
520  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
522 }
523 
525  ticket tc = my_rep->tail_counter;
526  ticket hc = my_rep->head_counter;
527  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
528  return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
529 }
530 
531 void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
532  my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
533 }
534 
536  size_t nq = my_rep->n_queue;
537  for( size_t i=0; i<nq; ++i ) {
538  page* tp = my_rep->array[i].tail_page;
539  __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
540  if( tp!=NULL) {
541  if( tp!=static_invalid_page ) deallocate_page( tp );
542  my_rep->array[i].tail_page = NULL;
543  }
544  }
545 }
546 
549 }
550 
553  my_capacity = src.my_capacity;
554 
555  // copy concurrent_queue_rep.
560 
561  // copy micro_queues
562  for( size_t i = 0; i<my_rep->n_queue; ++i )
563  my_rep->array[i].assign( src.my_rep->array[i], *this, op_type );
564 
566  "the source concurrent queue should not be concurrently modified." );
567 }
568 
570  internal_assign( src, copy );
571 }
572 
574  internal_assign( src, move );
575 }
576 
577 //------------------------------------------------------------------------
578 // concurrent_queue_iterator_rep
579 //------------------------------------------------------------------------
581 public:
584  const size_t offset_of_last;
586  concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
587  head_counter(queue.my_rep->head_counter),
588  my_queue(queue),
589  offset_of_last(offset_of_last_)
590  {
591  const concurrent_queue_rep& rep = *queue.my_rep;
592  for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
593  array[k] = rep.array[k].head_page;
594  }
596  bool get_item( void*& item, size_t k ) {
597  if( k==my_queue.my_rep->tail_counter ) {
598  item = NULL;
599  return true;
600  } else {
602  __TBB_ASSERT(p,NULL);
604  item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
605  return (p->mask & uintptr_t(1)<<i)!=0;
606  }
607  }
608 };
609 
610 //------------------------------------------------------------------------
611 // concurrent_queue_iterator_base
612 //------------------------------------------------------------------------
613 
614 void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
616  new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
617  size_t k = my_rep->head_counter;
618  if( !my_rep->get_item(my_item, k) ) advance();
619 }
620 
622  initialize(queue,0);
623 }
624 
626  initialize(queue,offset_of_last);
627 }
628 
630  if( my_rep!=other.my_rep ) {
631  if( my_rep ) {
633  my_rep = NULL;
634  }
635  if( other.my_rep ) {
638  }
639  }
640  my_item = other.my_item;
641 }
642 
644  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
645  size_t k = my_rep->head_counter;
646  const concurrent_queue_base& queue = my_rep->my_queue;
647 #if TBB_USE_ASSERT
648  void* tmp;
649  my_rep->get_item(tmp,k);
650  __TBB_ASSERT( my_item==tmp, NULL );
651 #endif /* TBB_USE_ASSERT */
653  if( i==queue.items_per_page-1 ) {
655  root = root->next;
656  }
657  // advance k
658  my_rep->head_counter = ++k;
659  if( !my_rep->get_item(my_item, k) ) advance();
660 }
661 
663  //delete my_rep;
665  my_rep = NULL;
666 }
667 
668 } // namespace internal
669 
670 } // namespace tbb
tbb::internal::concurrent_queue_iterator_rep::offset_of_last
const size_t offset_of_last
Definition: concurrent_queue.cpp:584
tbb::internal::concurrent_queue_base_v3::page::mask
uintptr_t mask
Definition: _concurrent_queue_impl.h:840
tbb::internal::concurrent_queue_base_v3::copy_page_item
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
tbb::internal::concurrent_queue_base_v3::~concurrent_queue_base_v3
virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3()
Definition: concurrent_queue.cpp:356
tbb::internal::concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3
concurrent_queue_iterator_base_v3()
Default constructor.
Definition: _concurrent_queue_impl.h:977
tbb::internal::concurrent_queue_base_v3::internal_push_if_not_full
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
Definition: concurrent_queue.cpp:491
tbb::internal::concurrent_queue_base_v3::my_rep
concurrent_queue_rep * my_rep
Internal representation.
Definition: _concurrent_queue_impl.h:829
tbb::internal::concurrent_queue_base_v3::copy_item
virtual void copy_item(page &dst, size_t index, const void *src)=0
tbb::internal::micro_queue_pop_finalizer::page
concurrent_queue_base::page page
Definition: concurrent_queue.cpp:78
internal
Definition: _flow_graph_async_msg_impl.h:24
tbb::internal::concurrent_queue_rep::abort_counter
atomic< unsigned > abort_counter
Definition: concurrent_queue.cpp:144
concurrent_monitor.h
tbb::internal::concurrent_queue_rep::infinite_capacity
static const ptrdiff_t infinite_capacity
Value for effective_capacity that denotes unbounded queue.
Definition: concurrent_queue.cpp:147
tbb::internal::concurrent_queue_rep::pad1
char pad1[NFS_MaxLineSize-((sizeof(atomic< ticket >)+sizeof(concurrent_monitor)+sizeof(atomic< size_t >))&(NFS_MaxLineSize-1))]
Definition: concurrent_queue.cpp:132
tbb::internal::concurrent_queue_base_v3::deallocate_page
virtual void deallocate_page(page *p)=0
custom de-allocator
tbb::internal::concurrent_queue_base_v3::concurrent_queue_base_v3
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
Definition: concurrent_queue.cpp:337
tbb::internal::concurrent_queue_rep::slots_avail
concurrent_monitor slots_avail
Definition: concurrent_queue.cpp:135
tbb::internal::predicate_leq::predicate_leq
predicate_leq(ticket t_)
Definition: concurrent_queue.cpp:105
tbb::internal::concurrent_queue_base_v3::internal_finish_clear
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
Definition: concurrent_queue.cpp:535
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
tbb::user_abort
Exception for user-initiated abort.
Definition: tbb_exception.h:46
tbb::internal::no_assign
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
ITT_NOTIFY
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
tbb::internal::throw_exception
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
Definition: tbb_exception.h:105
tbb::internal::concurrent_queue_iterator_rep::get_item
bool get_item(void *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
Definition: concurrent_queue.cpp:596
tbb::internal::concurrent_queue_iterator_rep
Definition: concurrent_queue.cpp:580
tbb::internal::concurrent_queue_base_v3::assign
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
tbb::internal::static_invalid_page
static void * static_invalid_page
Definition: concurrent_queue.cpp:156
tbb::internal::concurrent_queue_base_v3::internal_push
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
Definition: concurrent_queue.cpp:363
tbb::internal::concurrent_queue_rep::choose
micro_queue & choose(ticket k)
Definition: concurrent_queue.cpp:139
tbb::internal::concurrent_queue_iterator_base_v3
Type-independent portion of concurrent_queue_iterator.
Definition: _concurrent_queue_impl.h:960
tbb
The graph class.
Definition: serial/tbb/parallel_for.h:46
tbb::cache_aligned_allocator
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
Definition: cache_aligned_allocator.h:60
tbb::internal::concurrent_queue_base_v3::page
Prefix on a page.
Definition: _concurrent_queue_impl.h:838
tbb::internal::concurrent_queue_base_v3::internal_pop_if_present
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
Definition: concurrent_queue.cpp:467
tbb::internal::concurrent_queue_base_v3::assign_and_destroy_item
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
tbb::cache_aligned_allocator::deallocate
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
Definition: cache_aligned_allocator.h:86
tbb::internal::predicate_leq
Definition: concurrent_queue.cpp:103
tbb::internal::concurrent_monitor::thread_context
Definition: concurrent_monitor.h:96
tbb::internal::concurrent_queue_iterator_rep::array
concurrent_queue_base::page * array[concurrent_queue_rep::n_queue]
Definition: concurrent_queue.cpp:585
tbb::internal::concurrent_monitor::notify
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
Definition: concurrent_monitor.h:169
tbb::internal::spin_wait_while_eq
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
tbb::internal::concurrent_queue_base_v3::internal_empty
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is empty.
Definition: concurrent_queue.cpp:524
tbb::internal::concurrent_queue_rep::items_avail
concurrent_monitor items_avail
Definition: concurrent_queue.cpp:130
tbb::spin_mutex::scoped_lock
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
tbb::internal::predicate_leq::t
ticket t
Definition: concurrent_queue.cpp:104
tbb::internal::concurrent_queue_base_v3::internal_abort
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
Definition: concurrent_queue.cpp:460
tbb::internal::concurrent_queue_iterator_base_v3::assign
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
Definition: concurrent_queue.cpp:629
tbb::internal::concurrent_queue_base_v3::internal_insert_if_not_full
bool internal_insert_if_not_full(const void *src, copy_specifics op_type)
Attempts to enqueue at tail of queue using specified operation (copy or move)
Definition: concurrent_queue.cpp:499
tbb::internal::micro_queue_pop_finalizer::my_page
page * my_page
Definition: concurrent_queue.cpp:81
itt_notify.h
tbb::internal::concurrent_queue_iterator_base_v3::my_rep
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
Definition: _concurrent_queue_impl.h:963
tbb::internal::concurrent_queue_iterator_base_v3::my_item
void * my_item
Pointer to current item.
Definition: _concurrent_queue_impl.h:974
tbb::internal::concurrent_queue_rep::phi
static const size_t phi
Approximately n_queue/golden ratio.
Definition: concurrent_queue.cpp:118
tbb::internal::concurrent_queue_rep::n_queue
static const size_t n_queue
Must be power of 2.
Definition: concurrent_queue.cpp:122
tbb::internal::concurrent_queue_rep::index
static size_t index(ticket k)
Map ticket to an array index.
Definition: concurrent_queue.cpp:125
tbb::internal::micro_queue::page_mutex
spin_mutex page_mutex
Definition: concurrent_queue.cpp:58
tbb::internal::concurrent_queue_rep::pad2
char pad2[NFS_MaxLineSize-((sizeof(atomic< ticket >)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))]
Definition: concurrent_queue.cpp:136
tbb::internal::concurrent_queue_base_v3::internal_assign
void internal_assign(const concurrent_queue_base_v3 &src, copy_specifics op_type)
Assigns one queue to another using specified operation (copy or move)
Definition: concurrent_queue.cpp:551
tbb::internal::eid_bad_last_alloc
@ eid_bad_last_alloc
Definition: tbb_exception.h:69
tbb::internal::micro_queue::tail_page
atomic< page * > tail_page
Definition: concurrent_queue.cpp:55
tbb_exception.h
tbb::internal::micro_queue::pop
bool pop(void *dst, ticket k, concurrent_queue_base &base)
Definition: concurrent_queue.cpp:230
tbb::internal::concurrent_queue_iterator_base_v3::~concurrent_queue_iterator_base_v3
__TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3()
Destructor.
Definition: concurrent_queue.cpp:662
tbb::internal::concurrent_monitor::commit_wait
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
Definition: concurrent_monitor.h:135
tbb::internal::concurrent_queue_base_v3::internal_pop
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
Definition: concurrent_queue.cpp:414
tbb::internal::concurrent_queue_base_v3::internal_size
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
Definition: concurrent_queue.cpp:519
tbb::internal::atomic_backoff::pause
void pause()
Pause for a while.
Definition: tbb_machine.h:360
tbb::internal::concurrent_queue_base_v3::internal_throw_exception
void __TBB_EXPORTED_METHOD internal_throw_exception() const
throw an exception
Definition: concurrent_queue.cpp:547
tbb::internal::micro_queue::abort_push
void abort_push(ticket k, concurrent_queue_base &base)
Definition: concurrent_queue.cpp:226
tbb::internal::micro_queue_pop_finalizer
Definition: concurrent_queue.cpp:77
tbb::internal::predicate_leq::operator()
bool operator()(uintptr_t p) const
Definition: concurrent_queue.cpp:106
tbb::internal::concurrent_queue_base_v8::internal_push_move
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
Definition: concurrent_queue.cpp:367
tbb::internal::concurrent_queue_iterator_rep::my_queue
const concurrent_queue_base & my_queue
Definition: concurrent_queue.cpp:583
lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Definition: ittnotify_static.h:121
tbb::internal::atomic_backoff
Class that implements exponential backoff.
Definition: tbb_machine.h:345
tbb::internal::micro_queue::make_copy
page * make_copy(concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
Definition: concurrent_queue.cpp:295
tbb::internal::NFS_MaxLineSize
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
tbb::internal::micro_queue::assign
micro_queue & assign(const micro_queue &src, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
Definition: concurrent_queue.cpp:253
tbb::internal::concurrent_queue_base_v3::copy_specifics
copy_specifics
Definition: _concurrent_queue_impl.h:852
tbb::internal::concurrent_queue_iterator_rep::concurrent_queue_iterator_rep
concurrent_queue_iterator_rep(const concurrent_queue_base &queue, size_t offset_of_last_)
Definition: concurrent_queue.cpp:586
tbb::internal::concurrent_queue_iterator_rep::head_counter
ticket head_counter
Definition: concurrent_queue.cpp:582
tbb::internal::concurrent_queue_rep::head_counter
atomic< ticket > head_counter
Definition: concurrent_queue.cpp:129
tbb::internal::no_copy
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
tbb::internal::concurrent_queue_rep::n_invalid_entries
atomic< size_t > n_invalid_entries
Definition: concurrent_queue.cpp:131
tbb::internal::concurrent_queue_base_v3::internal_insert_item
void internal_insert_item(const void *src, copy_specifics op_type)
Enqueues item at tail of queue using specified operation (copy or move)
Definition: concurrent_queue.cpp:371
tbb::internal::concurrent_queue_base_v3::page::next
page * next
Definition: _concurrent_queue_impl.h:839
tbb::internal::concurrent_queue_rep
Internal representation of a ConcurrentQueue.
Definition: concurrent_queue.cpp:112
tbb::internal::is_aligned
bool is_aligned(T *pointer, uintptr_t alignment)
A function to check if passed in pointer is aligned on a specific border.
Definition: tbb_stddef.h:370
tbb::internal::concurrent_queue_rep::array
micro_queue array[n_queue]
Definition: concurrent_queue.cpp:137
__TBB_TRY
#define __TBB_TRY
Definition: tbb_stddef.h:283
tbb::internal::micro_queue
A queue using simple locking.
Definition: concurrent_queue.cpp:47
tbb::internal::micro_queue_pop_finalizer::base
concurrent_queue_base & base
Definition: concurrent_queue.cpp:82
tbb::internal::concurrent_queue_base_v3::move
@ move
Definition: _concurrent_queue_impl.h:852
tbb_machine.h
tbb::internal::modulo_power_of_two
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:382
tbb::internal::eid_bad_alloc
@ eid_bad_alloc
Definition: tbb_exception.h:68
tbb::internal::micro_queue_pop_finalizer::~micro_queue_pop_finalizer
~micro_queue_pop_finalizer()
Definition: concurrent_queue.cpp:87
tbb::internal::ticket
size_t ticket
Definition: concurrent_queue.cpp:42
tbb::internal::concurrent_queue_base_v3::internal_set_capacity
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
Definition: concurrent_queue.cpp:531
tbb::internal::concurrent_queue_base_v3::item_size
size_t item_size
Size of an item.
Definition: _concurrent_queue_impl.h:850
tbb::internal::micro_queue::head_page
atomic< page * > head_page
Definition: concurrent_queue.cpp:52
tbb::internal::concurrent_queue_base_v3::items_per_page
size_t items_per_page
Always a power of 2.
Definition: _concurrent_queue_impl.h:847
tbb::internal::eid_user_abort
@ eid_user_abort
Definition: tbb_exception.h:85
tbb::internal::concurrent_queue_base_v8::move_content
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Definition: concurrent_queue.cpp:573
tbb::internal::micro_queue_pop_finalizer::my_queue
micro_queue & my_queue
Definition: concurrent_queue.cpp:80
tbb::internal::NFS_GetLineSize
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
Definition: cache_aligned_allocator.cpp:167
tbb::internal::concurrent_queue_base_v3::copy
@ copy
Definition: _concurrent_queue_impl.h:852
tbb::internal::concurrent_monitor
concurrent_monitor
Definition: concurrent_monitor.h:93
tbb::internal::concurrent_monitor::abort_all
void abort_all()
Abort any sleeping threads at the time of the call.
Definition: concurrent_monitor.h:175
tbb::internal::concurrent_monitor::prepare_wait
void prepare_wait(thread_context &thr, uintptr_t ctx=0)
prepare wait by inserting 'thr' into the wait queue
Definition: concurrent_monitor.cpp:32
tbb::internal::micro_queue_pop_finalizer::my_ticket
ticket my_ticket
Definition: concurrent_queue.cpp:79
tbb::internal::concurrent_queue_base_v8
For internal use only.
Definition: _concurrent_queue_impl.h:940
tbb::internal::micro_queue_pop_finalizer::micro_queue_pop_finalizer
micro_queue_pop_finalizer(micro_queue &queue, concurrent_queue_base &b, ticket k, page *p)
Definition: concurrent_queue.cpp:84
tbb::cache_aligned_allocator::allocate
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
Definition: cache_aligned_allocator.h:80
tbb::internal::concurrent_queue_rep::tail_counter
atomic< ticket > tail_counter
Definition: concurrent_queue.cpp:134
tbb::internal::concurrent_queue_base_v3
For internal use only.
Definition: _concurrent_queue_impl.h:826
tbb::internal::concurrent_queue_base_v3::allocate_page
virtual page * allocate_page()=0
custom allocator
tbb::internal::micro_queue::page
concurrent_queue_base::page page
Definition: concurrent_queue.cpp:48
tbb::internal::micro_queue::tail_counter
atomic< ticket > tail_counter
Definition: concurrent_queue.cpp:56
tbb::internal::micro_queue::make_invalid
void make_invalid(ticket k)
Definition: concurrent_queue.cpp:314
sync_releasing
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
Definition: ittnotify_static.h:104
tbb::internal::micro_queue::head_counter
atomic< ticket > head_counter
Definition: concurrent_queue.cpp:53
tbb_stddef.h
tbb::internal::concurrent_queue_base
concurrent_queue_base_v3 concurrent_queue_base
Definition: concurrent_queue.cpp:40
__TBB_CATCH
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
p
void const char const char int ITT_FORMAT __itt_group_sync p
Definition: ittnotify_static.h:91
tail
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id tail
Definition: ittnotify_static.h:207
tbb::internal::spin_wait_until_eq
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
tbb::internal::concurrent_queue_base_v8::internal_push_move_if_not_full
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
Definition: concurrent_queue.cpp:495
tbb::internal::concurrent_queue_base_v3::my_capacity
ptrdiff_t my_capacity
Capacity of the queue.
Definition: _concurrent_queue_impl.h:844
tbb::spin_mutex
A lock that occupies a single byte.
Definition: spin_mutex.h:39
tbb::internal::micro_queue::push
void push(const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
Definition: concurrent_queue.cpp:161
tbb::internal::concurrent_queue_iterator_base_v3::initialize
void initialize(const concurrent_queue_base_v3 &queue, size_t offset_of_data)
Definition: concurrent_queue.cpp:614
__TBB_RETHROW
#define __TBB_RETHROW()
Definition: tbb_stddef.h:286
tbb::internal::concurrent_monitor::cancel_wait
void cancel_wait(thread_context &thr)
Cancel the wait. Removes the thread from the wait queue if not removed yet.
Definition: concurrent_monitor.cpp:50
_concurrent_queue_impl.h
tbb::internal::concurrent_queue_iterator_base_v3::advance
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
Definition: concurrent_queue.cpp:643

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.