Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
task_stream_extended.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef _TBB_task_stream_extended_H
18 #define _TBB_task_stream_extended_H
19 
26 
27 
28 #if _TBB_task_stream_H
29 #error Either task_stream.h or this file can be included at the same time.
30 #endif
31 
32 #if !__TBB_CPF_BUILD
33 #error This code bears a preview status until it proves its usefulness/performance suitability.
34 #endif
35 
36 #include "tbb/tbb_stddef.h"
37 #include <deque>
38 #include <climits>
39 #include "tbb/atomic.h" // for __TBB_Atomic*
40 #include "tbb/spin_mutex.h"
41 #include "tbb/tbb_allocator.h"
42 #include "scheduler_common.h"
43 #include "tbb_misc.h" // for FastRandom
44 
45 namespace tbb {
46 namespace internal {
47 
49 
51 template< typename T, typename mutex_t >
52 struct queue_and_mutex {
53  typedef std::deque< T, tbb_allocator<T> > queue_base_t;
54 
57 
60 };
61 
62 typedef uintptr_t population_t;
63 const population_t one = 1;
64 
65 inline void set_one_bit( population_t& dest, int pos ) {
66  __TBB_ASSERT( pos>=0, NULL );
67  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
68  __TBB_AtomicOR( &dest, one<<pos );
69 }
70 
71 inline void clear_one_bit( population_t& dest, int pos ) {
72  __TBB_ASSERT( pos>=0, NULL );
73  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
74  __TBB_AtomicAND( &dest, ~(one<<pos) );
75 }
76 
77 inline bool is_bit_set( population_t val, int pos ) {
78  __TBB_ASSERT( pos>=0, NULL );
79  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
80  return (val & (one<<pos)) != 0;
81 }
82 
84 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
85  no_assign
86 #else
87  no_copy
88 #endif
89 {
90  random_lane_selector( FastRandom& random ) : my_random( random ) {}
91  unsigned operator()( unsigned out_of ) const {
92  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
93  return my_random.get() & (out_of-1);
94  }
95 private:
97 };
98 
100 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
101  no_assign
102 #else
103  no_copy
104 #endif
105 {
106  unsigned& my_previous;
107  lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
108 };
109 
111  subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
112  unsigned operator()( unsigned out_of ) const {
113  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
114  return (++my_previous &= out_of-1);
115  }
116 };
117 
119  preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
120  unsigned operator()( unsigned out_of ) const {
121  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
122  return (--my_previous &= (out_of-1));
123  }
124 };
125 
127 protected:
129 };
130 
132 
135 template<task_stream_accessor_type accessor>
137 protected:
140  task* result = queue.front();
141  queue.pop_front();
142  return result;
143  }
144 };
145 
146 template<>
148 protected:
150  task* result = NULL;
151  do {
152  result = queue.back();
153  queue.pop_back();
154  } while( !result && !queue.empty() );
155  return result;
156  }
157 };
158 
160 template<int Levels, task_stream_accessor_type accessor>
161 class task_stream : public task_stream_accessor< accessor > {
163  population_t population[Levels];
164  padded<lane_t>* lanes[Levels];
165  unsigned N;
166 
167 public:
168  task_stream() : N() {
169  for(int level = 0; level < Levels; level++) {
170  population[level] = 0;
171  lanes[level] = NULL;
172  }
173  }
174 
175  void initialize( unsigned n_lanes ) {
176  const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
177 
178  N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
179  __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
180  __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
181  for(int level = 0; level < Levels; level++) {
182  lanes[level] = new padded<lane_t>[N];
183  __TBB_ASSERT( !population[level], NULL );
184  }
185  }
186 
188  for(int level = 0; level < Levels; level++)
189  if (lanes[level]) delete[] lanes[level];
190  }
191 
193  bool try_push( task* source, int level, unsigned lane_idx ) {
194  __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
196  if( lock.try_acquire( lanes[level][lane_idx].my_mutex ) ) {
197  lanes[level][lane_idx].my_queue.push_back( source );
198  set_one_bit( population[level], lane_idx ); // TODO: avoid atomic op if the bit is already set
199  return true;
200  }
201  return false;
202  }
203 
205  template<typename lane_selector_t>
206  void push( task* source, int level, const lane_selector_t& next_lane ) {
207  bool succeed = false;
208  unsigned lane = 0;
209  do {
210  lane = next_lane( /*out_of=*/N );
211  __TBB_ASSERT( lane < N, "Incorrect lane index." );
212  } while( ! (succeed = try_push( source, level, lane )) );
213  }
214 
216  task* try_pop( int level, unsigned lane_idx ) {
217  __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
218  if( !is_bit_set( population[level], lane_idx ) )
219  return NULL;
220  task* result = NULL;
221  lane_t& lane = lanes[level][lane_idx];
223  if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
224  result = this->get_item( lane.my_queue );
225  if( lane.my_queue.empty() )
226  clear_one_bit( population[level], lane_idx );
227  }
228  return result;
229  }
230 
233  template<typename lane_selector_t>
234  task* pop( int level, const lane_selector_t& next_lane ) {
235  task* popped = NULL;
236  unsigned lane = 0;
237  do {
238  lane = next_lane( /*out_of=*/N );
239  __TBB_ASSERT( lane < N, "Incorrect lane index." );
240  } while( !empty( level ) && !(popped = try_pop( level, lane )) );
241  return popped;
242  }
243 
244  // TODO: unify '*_specific' logic with 'pop' methods above
246  __TBB_ASSERT( !queue.empty(), NULL );
247  // TODO: add a worst-case performance test and consider an alternative container with better
248  // performance for isolation search.
249  typename lane_t::queue_base_t::iterator curr = queue.end();
250  do {
251  // TODO: consider logic from get_task to simplify the code.
252  task* result = *--curr;
253  if( result __TBB_ISOLATION_EXPR( && result->prefix().isolation == isolation ) ) {
254  if( queue.end() - curr == 1 )
255  queue.pop_back(); // a little of housekeeping along the way
256  else
257  *curr = 0; // grabbing task with the same isolation
258  // TODO: move one of the container's ends instead if the task has been found there
259  return result;
260  }
261  } while( curr != queue.begin() );
262  return NULL;
263  }
264 
266  task* pop_specific( int level, __TBB_ISOLATION_ARG(unsigned& last_used_lane, isolation_tag isolation) ) {
267  task* result = NULL;
268  // Lane selection is round-robin in backward direction.
269  unsigned idx = last_used_lane & (N-1);
270  do {
271  if( is_bit_set( population[level], idx ) ) {
272  lane_t& lane = lanes[level][idx];
274  if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
275  result = look_specific( __TBB_ISOLATION_ARG(lane.my_queue, isolation) );
276  if( lane.my_queue.empty() )
277  clear_one_bit( population[level], idx );
278  if( result )
279  break;
280  }
281  }
282  idx=(idx-1)&(N-1);
283  } while( !empty(level) && idx != last_used_lane );
284  last_used_lane = idx;
285  return result;
286  }
287 
289  bool empty(int level) {
290  return !population[level];
291  }
292 
294 
296  intptr_t drain() {
297  intptr_t result = 0;
298  for(int level = 0; level < Levels; level++)
299  for(unsigned i=0; i<N; ++i) {
300  lane_t& lane = lanes[level][i];
302  for(typename lane_t::queue_base_t::iterator it=lane.my_queue.begin();
303  it!=lane.my_queue.end(); ++it, ++result)
304  {
305  __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
306  task* t = *it;
307  tbb::task::destroy(*t);
308  }
309  lane.my_queue.clear();
310  clear_one_bit( population[level], i );
311  }
312  return result;
313  }
314 }; // task_stream
315 
316 } // namespace internal
317 } // namespace tbb
318 
319 #endif /* _TBB_task_stream_extended_H */
tbb::internal::population_t
uintptr_t population_t
Definition: task_stream.h:46
tbb::internal::task_stream::push
void push(task *source, int level, const lane_selector_t &next_lane)
Push a task into a lane. Lane selection is performed by passed functor.
Definition: task_stream_extended.h:206
scheduler_common.h
spin_mutex.h
internal
Definition: _flow_graph_async_msg_impl.h:24
tbb::internal::task_prefix::isolation
isolation_tag isolation
The tag used for task isolation.
Definition: task.h:220
tbb::internal::queue_and_mutex::queue_and_mutex
queue_and_mutex()
Definition: task_stream_extended.h:58
tbb::internal::task_stream::population
population_t population[Levels]
Definition: task_stream.h:71
tbb::internal::FastRandom::get
unsigned short get()
Get a random number.
Definition: tbb_misc.h:146
__TBB_ISOLATION_EXPR
#define __TBB_ISOLATION_EXPR(isolation)
Definition: scheduler_common.h:67
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
tbb::internal::no_assign
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
tbb::internal::task_stream_accessor
Definition: task_stream_extended.h:136
tbb::internal::clear_one_bit
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:55
tbb::internal::task_stream::N
unsigned N
Definition: task_stream.h:73
tbb::internal::task_stream::look_specific
task * look_specific(__TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation))
Definition: task_stream_extended.h:245
tbb
The graph class.
Definition: serial/tbb/parallel_for.h:46
tbb::internal::task_stream::empty
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138
tbb::task
Base class for user-defined tasks.
Definition: task.h:615
tbb::internal::task_stream::task_stream
task_stream()
Definition: task_stream_extended.h:168
tbb::internal::task_stream::drain
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
Definition: task_stream_extended.h:296
tbb::internal::lane_selector_base::lane_selector_base
lane_selector_base(unsigned &previous)
Definition: task_stream_extended.h:107
tbb::internal::FastRandom
A fast random number generator.
Definition: tbb_misc.h:135
tbb::internal::preceding_lane_selector
Definition: task_stream_extended.h:118
tbb::internal::random_lane_selector
Definition: task_stream_extended.h:83
tbb::internal::random_lane_selector::my_random
FastRandom & my_random
Definition: task_stream_extended.h:96
tbb::internal::task_stream::pop
task * pop(int level, const lane_selector_t &next_lane)
Definition: task_stream_extended.h:234
tbb::spin_mutex::scoped_lock
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
tbb::task::prefix
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:1002
tbb::internal::queue_and_mutex
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:36
tbb::internal::subsequent_lane_selector::operator()
unsigned operator()(unsigned out_of) const
Definition: task_stream_extended.h:112
mutex_t
CRITICAL_SECTION mutex_t
Definition: ittnotify_config.h:209
tbb::internal::random_lane_selector::operator()
unsigned operator()(unsigned out_of) const
Definition: task_stream_extended.h:91
tbb::internal::is_bit_set
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:61
tbb::internal::task_stream::try_push
bool try_push(task *source, int level, unsigned lane_idx)
Returns true on successful push, otherwise - false.
Definition: task_stream_extended.h:193
tbb::internal::lane_selector_base::my_previous
unsigned & my_previous
Definition: task_stream_extended.h:106
tbb::internal::subsequent_lane_selector
Definition: task_stream_extended.h:110
__TBB_ISOLATION_ARG
#define __TBB_ISOLATION_ARG(arg1, isolation)
Definition: scheduler_common.h:68
__TBB_AtomicAND
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:888
lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Definition: ittnotify_static.h:121
atomic.h
tbb::internal::task_stream::lanes
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:72
tbb::internal::queue_and_mutex::queue_base_t
std::deque< T, tbb_allocator< T > > queue_base_t
Definition: task_stream_extended.h:53
tbb::internal::no_copy
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
tbb::internal::isolation_tag
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:143
tbb::internal::task_stream_base
Definition: task_stream_extended.h:126
tbb_misc.h
tbb::internal::task_stream_accessor_type
task_stream_accessor_type
Definition: task_stream_extended.h:131
tbb::internal::lane_selector_base
Definition: task_stream_extended.h:99
tbb::internal::one
const population_t one
Definition: task_stream.h:47
__TBB_AtomicOR
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:878
tbb::internal::task_stream::try_pop
task * try_pop(int level, unsigned lane_idx)
Returns pointer to task on successful pop, otherwise - NULL.
Definition: task_stream_extended.h:216
__TBB_Log2
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:860
tbb::internal::task_stream::initialize
void initialize(unsigned n_lanes)
Definition: task_stream_extended.h:175
tbb::internal::task_stream::~task_stream
~task_stream()
Definition: task_stream_extended.h:187
tbb::internal::padded
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:261
tbb::internal::front_accessor
@ front_accessor
Definition: task_stream_extended.h:131
tbb::internal::task_stream
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:69
tbb::internal::back_nonnull_accessor
@ back_nonnull_accessor
Definition: task_stream_extended.h:131
tbb::internal::task_stream::lane_t
task_stream_accessor< accessor >::lane_t lane_t
Definition: task_stream_extended.h:162
tbb::internal::queue_and_mutex::my_queue
queue_base_t my_queue
Definition: task_stream.h:39
tbb::internal::subsequent_lane_selector::subsequent_lane_selector
subsequent_lane_selector(unsigned &previous)
Definition: task_stream_extended.h:111
tbb::internal::task_stream_base::lane_t
queue_and_mutex< task *, spin_mutex > lane_t
Definition: task_stream_extended.h:128
tbb::internal::task_stream_accessor< back_nonnull_accessor >::get_item
task * get_item(lane_t::queue_base_t &queue)
Definition: task_stream_extended.h:149
tbb_stddef.h
tbb::internal::task_stream::pop_specific
task * pop_specific(int level, __TBB_ISOLATION_ARG(unsigned &last_used_lane, isolation_tag isolation))
Try finding and popping a related task.
Definition: task_stream_extended.h:266
tbb::internal::preceding_lane_selector::operator()
unsigned operator()(unsigned out_of) const
Definition: task_stream_extended.h:120
tbb::internal::task_stream_accessor::get_item
task * get_item(lane_t::queue_base_t &queue)
Definition: task_stream_extended.h:139
tbb_allocator.h
tbb::internal::queue_and_mutex::my_mutex
mutex_t my_mutex
Definition: task_stream.h:40
tbb::internal::preceding_lane_selector::preceding_lane_selector
preceding_lane_selector(unsigned &previous)
Definition: task_stream_extended.h:119
tbb::internal::queue_and_mutex::~queue_and_mutex
~queue_and_mutex()
Definition: task_stream_extended.h:59
tbb::internal::set_one_bit
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:49
tbb::internal::random_lane_selector::random_lane_selector
random_lane_selector(FastRandom &random)
Definition: task_stream_extended.h:90

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.