Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
task_stream.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef _TBB_task_stream_H
18 #define _TBB_task_stream_H
19 
20 #include "tbb/tbb_stddef.h"
21 #include <deque>
22 #include <climits>
23 #include "tbb/atomic.h" // for __TBB_Atomic*
24 #include "tbb/spin_mutex.h"
25 #include "tbb/tbb_allocator.h"
26 #include "scheduler_common.h"
27 #include "tbb_misc.h" // for FastRandom
28 
29 namespace tbb {
30 namespace internal {
31 
33 
35 template< typename T, typename mutex_t >
37  typedef std::deque< T, tbb_allocator<T> > queue_base_t;
38 
41 
44 };
45 
46 typedef uintptr_t population_t;
47 const population_t one = 1;
48 
49 inline void set_one_bit( population_t& dest, int pos ) {
50  __TBB_ASSERT( pos>=0, NULL );
51  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
52  __TBB_AtomicOR( &dest, one<<pos );
53 }
54 
55 inline void clear_one_bit( population_t& dest, int pos ) {
56  __TBB_ASSERT( pos>=0, NULL );
57  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
58  __TBB_AtomicAND( &dest, ~(one<<pos) );
59 }
60 
61 inline bool is_bit_set( population_t val, int pos ) {
62  __TBB_ASSERT( pos>=0, NULL );
63  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
64  return (val & (one<<pos)) != 0;
65 }
66 
68 template<int Levels>
73  unsigned N;
74 
75 public:
76  task_stream() : N() {
77  for(int level = 0; level < Levels; level++) {
78  population[level] = 0;
79  lanes[level] = NULL;
80  }
81  }
82 
83  void initialize( unsigned n_lanes ) {
84  const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
85 
86  N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
87  __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
88  __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
89  for(int level = 0; level < Levels; level++) {
90  lanes[level] = new padded<lane_t>[N];
91  __TBB_ASSERT( !population[level], NULL );
92  }
93  }
94 
96  for(int level = 0; level < Levels; level++)
97  if (lanes[level]) delete[] lanes[level];
98  }
99 
101  void push( task* source, int level, FastRandom& random ) {
102  // Lane selection is random. Each thread should keep a separate seed value.
103  unsigned idx;
104  for( ; ; ) {
105  idx = random.get() & (N-1);
107  if( lock.try_acquire(lanes[level][idx].my_mutex) ) {
108  lanes[level][idx].my_queue.push_back(source);
109  set_one_bit( population[level], idx ); //TODO: avoid atomic op if the bit is already set
110  break;
111  }
112  }
113  }
114 
116  task* pop( int level, unsigned& last_used_lane ) {
117  task* result = NULL;
118  // Lane selection is round-robin. Each thread should keep its last used lane.
119  unsigned idx = (last_used_lane+1)&(N-1);
120  for( ; population[level]; idx=(idx+1)&(N-1) ) {
121  if( is_bit_set( population[level], idx ) ) {
122  lane_t& lane = lanes[level][idx];
124  if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
125  result = lane.my_queue.front();
126  lane.my_queue.pop_front();
127  if( lane.my_queue.empty() )
128  clear_one_bit( population[level], idx );
129  break;
130  }
131  }
132  }
133  last_used_lane = idx;
134  return result;
135  }
136 
138  bool empty(int level) {
139  return !population[level];
140  }
141 
143 
145  intptr_t drain() {
146  intptr_t result = 0;
147  for(int level = 0; level < Levels; level++)
148  for(unsigned i=0; i<N; ++i) {
149  lane_t& lane = lanes[level][i];
151  for(lane_t::queue_base_t::iterator it=lane.my_queue.begin();
152  it!=lane.my_queue.end(); ++it, ++result)
153  {
154  __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
155  task* t = *it;
156  tbb::task::destroy(*t);
157  }
158  lane.my_queue.clear();
159  clear_one_bit( population[level], i );
160  }
161  return result;
162  }
163 }; // task_stream
164 
165 } // namespace internal
166 } // namespace tbb
167 
168 #endif /* _TBB_task_stream_H */
tbb::internal::population_t
uintptr_t population_t
Definition: task_stream.h:46
scheduler_common.h
spin_mutex.h
internal
Definition: _flow_graph_async_msg_impl.h:24
tbb::internal::queue_and_mutex::queue_and_mutex
queue_and_mutex()
Definition: task_stream.h:42
tbb::internal::task_stream::population
population_t population[Levels]
Definition: task_stream.h:71
tbb::internal::FastRandom::get
unsigned short get()
Get a random number.
Definition: tbb_misc.h:146
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
tbb::internal::clear_one_bit
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:55
tbb::internal::task_stream::N
unsigned N
Definition: task_stream.h:73
tbb
The graph class.
Definition: serial/tbb/parallel_for.h:46
tbb::internal::task_stream::empty
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138
tbb::task
Base class for user-defined tasks.
Definition: task.h:615
tbb::internal::task_stream::task_stream
task_stream()
Definition: task_stream.h:76
tbb::internal::task_stream::drain
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
Definition: task_stream.h:145
tbb::internal::FastRandom
A fast random number generator.
Definition: tbb_misc.h:135
tbb::spin_mutex::scoped_lock
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
tbb::internal::queue_and_mutex
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:36
mutex_t
CRITICAL_SECTION mutex_t
Definition: ittnotify_config.h:209
tbb::internal::is_bit_set
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:61
__TBB_AtomicAND
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:888
lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Definition: ittnotify_static.h:121
atomic.h
tbb::internal::task_stream::lanes
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:72
tbb::internal::queue_and_mutex::queue_base_t
std::deque< T, tbb_allocator< T > > queue_base_t
Definition: task_stream.h:37
tbb::internal::no_copy
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
tbb_misc.h
tbb::internal::one
const population_t one
Definition: task_stream.h:47
__TBB_AtomicOR
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:878
__TBB_Log2
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:860
tbb::internal::task_stream::initialize
void initialize(unsigned n_lanes)
Definition: task_stream.h:83
tbb::internal::task_stream::~task_stream
~task_stream()
Definition: task_stream.h:95
tbb::internal::padded
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:261
tbb::internal::task_stream::lane_t
queue_and_mutex< task *, spin_mutex > lane_t
Definition: task_stream.h:70
tbb::internal::task_stream
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:69
tbb::internal::queue_and_mutex::my_queue
queue_base_t my_queue
Definition: task_stream.h:39
tbb_stddef.h
tbb::internal::task_stream::pop
task * pop(int level, unsigned &last_used_lane)
Try finding and popping a task.
Definition: task_stream.h:116
tbb::internal::task_stream::push
void push(task *source, int level, FastRandom &random)
Push a task into a lane.
Definition: task_stream.h:101
tbb_allocator.h
tbb::internal::queue_and_mutex::my_mutex
mutex_t my_mutex
Definition: task_stream.h:40
tbb::internal::queue_and_mutex::~queue_and_mutex
~queue_and_mutex()
Definition: task_stream.h:43
tbb::internal::set_one_bit
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:49

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.