Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::internal::input_buffer Class Reference

A buffer of input items for a filter. More...

Inheritance diagram for tbb::internal::input_buffer:
Collaboration diagram for tbb::internal::input_buffer:

Public Member Functions

 input_buffer (bool is_ordered_, bool is_bound_)
 Construct empty buffer. More...
 
 ~input_buffer ()
 Destroy the buffer. More...
 
bool put_token (task_info &info_, bool force_put=false)
 Put a token into the buffer. More...
 
template<typename StageTask >
void note_done (Token token, StageTask &spawner)
 Note that processing of a token is finished. More...
 
bool return_item (task_info &info, bool advance)
 return an item, invalidate the queued item, but only advance if the filter More...
 
bool has_item ()
 true if the current low_token is valid. More...
 
void create_my_tls ()
 
void destroy_my_tls ()
 
bool my_tls_end_of_input ()
 
void set_my_tls_end_of_input ()
 

Private Types

typedef Token size_type
 
typedef basic_tls< intptr_t > end_of_input_tls_t
 for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input More...
 

Private Member Functions

void grow (size_type minimum_size)
 Resize "array". More...
 
void create_sema (size_t initial_tokens)
 
void free_sema ()
 
void sema_P ()
 
void sema_V ()
 
- Private Member Functions inherited from tbb::internal::no_copy
 no_copy (const no_copy &)=delete
 
 no_copy ()=default
 

Private Attributes

task_infoarray
 Array of deferred tasks that cannot yet start executing. More...
 
semaphoremy_sem
 for thread-bound filter, semaphore for waiting, NULL otherwise. More...
 
size_type array_size
 Size of array. More...
 
Token low_token
 Lowest token that can start executing. More...
 
spin_mutex array_mutex
 Serializes updates. More...
 
Token high_token
 Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned. More...
 
bool is_ordered
 True for ordered filter, false otherwise. More...
 
bool is_bound
 True for thread-bound filter, false otherwise. More...
 
end_of_input_tls_t end_of_input_tls
 
bool end_of_input_tls_allocated
 

Static Private Attributes

static const size_type initial_buffer_size = 4
 Initial size for "array". More...
 

Friends

class tbb::internal::pipeline_root_task
 
class tbb::filter
 
class tbb::thread_bound_filter
 
class tbb::internal::stage_task
 
class tbb::pipeline
 

Detailed Description

A buffer of input items for a filter.

Each item is a task_info, inserted into a position in the buffer corresponding to a Token.

Definition at line 48 of file pipeline.cpp.

Member Typedef Documentation

◆ end_of_input_tls_t

for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input

Definition at line 92 of file pipeline.cpp.

◆ size_type

Definition at line 55 of file pipeline.cpp.

Constructor & Destructor Documentation

◆ input_buffer()

tbb::internal::input_buffer::input_buffer ( bool  is_ordered_,
bool  is_bound_ 
)
inline

Construct empty buffer.

Definition at line 103 of file pipeline.cpp.

103  :
104  array(NULL), my_sem(NULL), array_size(0),
105  low_token(0), high_token(0),
106  is_ordered(is_ordered_), is_bound(is_bound_),
109  __TBB_ASSERT( array, NULL );
110  if(is_bound) create_sema(0);
111  }

References __TBB_ASSERT, array, create_sema(), grow(), initial_buffer_size, and is_bound.

Here is the call graph for this function:

◆ ~input_buffer()

tbb::internal::input_buffer::~input_buffer ( )
inline

Destroy the buffer.

Definition at line 114 of file pipeline.cpp.

114  {
115  __TBB_ASSERT( array, NULL );
116  cache_aligned_allocator<task_info>().deallocate(array,array_size);
118  if(my_sem) {
119  free_sema();
120  }
122  destroy_my_tls();
123  }
124  }

References __TBB_ASSERT, array, array_size, tbb::cache_aligned_allocator< T >::deallocate(), destroy_my_tls(), end_of_input_tls_allocated, free_sema(), my_sem, and tbb::internal::poison_pointer().

Here is the call graph for this function:

Member Function Documentation

◆ create_my_tls()

void tbb::internal::input_buffer::create_my_tls ( )
inline

Definition at line 225 of file pipeline.cpp.

225 { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }

References tbb::internal::basic_tls< T >::create(), end_of_input_tls, end_of_input_tls_allocated, and tbb::internal::handle_perror().

Here is the call graph for this function:

◆ create_sema()

void tbb::internal::input_buffer::create_sema ( size_t  initial_tokens)
inlineprivate

Definition at line 96 of file pipeline.cpp.

96 { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }

References __TBB_ASSERT, and my_sem.

Referenced by input_buffer().

Here is the caller graph for this function:

◆ destroy_my_tls()

void tbb::internal::input_buffer::destroy_my_tls ( )
inline

Definition at line 226 of file pipeline.cpp.

226 { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }

References tbb::internal::basic_tls< T >::destroy(), end_of_input_tls, and tbb::internal::handle_perror().

Referenced by ~input_buffer().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ free_sema()

void tbb::internal::input_buffer::free_sema ( )
inlineprivate

Definition at line 97 of file pipeline.cpp.

97 { __TBB_ASSERT(my_sem,NULL); delete my_sem; }

References __TBB_ASSERT, and my_sem.

Referenced by ~input_buffer().

Here is the caller graph for this function:

◆ grow()

void tbb::internal::input_buffer::grow ( size_type  minimum_size)
private

Resize "array".

Caller is responsible to acquiring a lock on "array_mutex".

Definition at line 231 of file pipeline.cpp.

231  {
232  size_type old_size = array_size;
233  size_type new_size = old_size ? 2*old_size : initial_buffer_size;
234  while( new_size<minimum_size )
235  new_size*=2;
236  task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
237  task_info* old_array = array;
238  for( size_type i=0; i<new_size; ++i )
239  new_array[i].is_valid = false;
240  long t=low_token;
241  for( size_type i=0; i<old_size; ++i, ++t )
242  new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
243  array = new_array;
245  if( old_array )
246  cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
247 }

References tbb::cache_aligned_allocator< T >::allocate(), array, array_size, tbb::cache_aligned_allocator< T >::deallocate(), initial_buffer_size, low_token, and new_size.

Referenced by input_buffer(), and put_token().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ has_item()

bool tbb::internal::input_buffer::has_item ( )
inline

true if the current low_token is valid.

Definition at line 222 of file pipeline.cpp.

References array, array_mutex, array_size, lock, and low_token.

◆ my_tls_end_of_input()

bool tbb::internal::input_buffer::my_tls_end_of_input ( )
inline

Definition at line 227 of file pipeline.cpp.

227 { return end_of_input_tls.get() != 0; }

References end_of_input_tls, and tbb::internal::basic_tls< T >::get().

Referenced by tbb::internal::stage_task::execute().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ note_done()

template<typename StageTask >
void tbb::internal::input_buffer::note_done ( Token  token,
StageTask &  spawner 
)
inline

Note that processing of a token is finished.

Fires up processing of the next token, if processing was deferred.

Definition at line 174 of file pipeline.cpp.

174  {
175  task_info wakee;
176  wakee.reset();
177  {
179  if( !is_ordered || token==low_token ) {
180  // Wake the next task
181  task_info& item = array[++low_token & (array_size-1)];
182  ITT_NOTIFY( sync_acquired, this );
183  wakee = item;
184  item.is_valid = false;
185  }
186  }
187  if( wakee.is_valid )
188  spawner.spawn_stage_task(wakee);
189  }

References array, array_mutex, array_size, is_ordered, tbb::internal::task_info::is_valid, ITT_NOTIFY, lock, low_token, and tbb::internal::task_info::reset().

Referenced by tbb::internal::stage_task::execute().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ put_token()

bool tbb::internal::input_buffer::put_token ( task_info info_,
bool  force_put = false 
)
inline

Put a token into the buffer.

If task information was placed into buffer, returns true; otherwise returns false, informing the caller to create and spawn a task. If input buffer owned by thread-bound filter and the item at low_token was not valid, issue a V() If the input_buffer is owned by a successor to a thread-bound filter, the force_put parameter should be true to ensure the token is inserted in the buffer.

Definition at line 135 of file pipeline.cpp.

135  {
136  {
137  info_.is_valid = true;
139  Token token;
140  bool was_empty = !array[low_token&(array_size-1)].is_valid;
141  if( is_ordered ) {
142  if( !info_.my_token_ready ) {
143  info_.my_token = high_token++;
144  info_.my_token_ready = true;
145  }
146  token = info_.my_token;
147  } else
148  token = high_token++;
149  __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
150  if( token!=low_token || is_bound || force_put ) {
151  // Trying to put token that is beyond low_token.
152  // Need to wait until low_token catches up before dispatching.
153  if( token-low_token>=array_size )
154  grow( token-low_token+1 );
155  ITT_NOTIFY( sync_releasing, this );
156  array[token&(array_size-1)] = info_;
157  if(was_empty && is_bound) {
158  sema_V();
159  }
160  return true;
161  }
162  }
163  return false;
164  }

References __TBB_ASSERT, array, array_mutex, array_size, grow(), high_token, is_bound, is_ordered, tbb::internal::task_info::is_valid, ITT_NOTIFY, lock, low_token, tbb::internal::task_info::my_token, tbb::internal::task_info::my_token_ready, sema_V(), and sync_releasing.

Referenced by tbb::internal::stage_task::execute().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ return_item()

bool tbb::internal::input_buffer::return_item ( task_info info,
bool  advance 
)
inline

return an item, invalidate the queued item, but only advance if the filter

Definition at line 208 of file pipeline.cpp.

208  {
210  task_info& item = array[low_token&(array_size-1)];
211  ITT_NOTIFY( sync_acquired, this );
212  if( item.is_valid ) {
213  info = item;
214  item.is_valid = false;
215  if (advance) low_token++;
216  return true;
217  }
218  return false;
219  }

References array, array_mutex, array_size, tbb::internal::task_info::is_valid, ITT_NOTIFY, lock, and low_token.

Referenced by tbb::internal::stage_task::execute(), and tbb::internal::pipeline_root_task::execute().

Here is the caller graph for this function:

◆ sema_P()

void tbb::internal::input_buffer::sema_P ( )
inlineprivate

Definition at line 98 of file pipeline.cpp.

98 { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }

References __TBB_ASSERT, my_sem, and tbb::internal::semaphore::P().

Here is the call graph for this function:

◆ sema_V()

void tbb::internal::input_buffer::sema_V ( )
inlineprivate

Definition at line 99 of file pipeline.cpp.

99 { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }

References __TBB_ASSERT, my_sem, and tbb::internal::semaphore::V().

Referenced by put_token().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ set_my_tls_end_of_input()

void tbb::internal::input_buffer::set_my_tls_end_of_input ( )
inline

Definition at line 228 of file pipeline.cpp.

228 { end_of_input_tls.set(1); }

References end_of_input_tls, and tbb::internal::basic_tls< T >::set().

Here is the call graph for this function:

Friends And Related Function Documentation

◆ tbb::filter

friend class tbb::filter
friend

Definition at line 50 of file pipeline.cpp.

◆ tbb::internal::pipeline_root_task

friend class tbb::internal::pipeline_root_task
friend

Definition at line 49 of file pipeline.cpp.

◆ tbb::internal::stage_task

friend class tbb::internal::stage_task
friend

Definition at line 52 of file pipeline.cpp.

◆ tbb::pipeline

friend class tbb::pipeline
friend

Definition at line 53 of file pipeline.cpp.

◆ tbb::thread_bound_filter

friend class tbb::thread_bound_filter
friend

Definition at line 51 of file pipeline.cpp.

Member Data Documentation

◆ array

task_info* tbb::internal::input_buffer::array
private

Array of deferred tasks that cannot yet start executing.

Definition at line 58 of file pipeline.cpp.

Referenced by grow(), has_item(), input_buffer(), note_done(), put_token(), return_item(), and ~input_buffer().

◆ array_mutex

spin_mutex tbb::internal::input_buffer::array_mutex
private

Serializes updates.

Definition at line 72 of file pipeline.cpp.

Referenced by has_item(), note_done(), put_token(), and return_item().

◆ array_size

size_type tbb::internal::input_buffer::array_size
private

Size of array.

Always 0 or a power of 2

Definition at line 65 of file pipeline.cpp.

Referenced by grow(), has_item(), note_done(), put_token(), return_item(), and ~input_buffer().

◆ end_of_input_tls

end_of_input_tls_t tbb::internal::input_buffer::end_of_input_tls
private

◆ end_of_input_tls_allocated

bool tbb::internal::input_buffer::end_of_input_tls_allocated
private

Definition at line 94 of file pipeline.cpp.

Referenced by create_my_tls(), and ~input_buffer().

◆ high_token

Token tbb::internal::input_buffer::high_token
private

Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned.

Definition at line 83 of file pipeline.cpp.

Referenced by put_token().

◆ initial_buffer_size

const size_type tbb::internal::input_buffer::initial_buffer_size = 4
staticprivate

Initial size for "array".

Must be a power of 2

Definition at line 80 of file pipeline.cpp.

Referenced by grow(), and input_buffer().

◆ is_bound

bool tbb::internal::input_buffer::is_bound
private

True for thread-bound filter, false otherwise.

Definition at line 89 of file pipeline.cpp.

Referenced by input_buffer(), and put_token().

◆ is_ordered

bool tbb::internal::input_buffer::is_ordered
private

True for ordered filter, false otherwise.

Definition at line 86 of file pipeline.cpp.

Referenced by note_done(), and put_token().

◆ low_token

Token tbb::internal::input_buffer::low_token
private

Lowest token that can start executing.

All prior Token have already been seen.

Definition at line 69 of file pipeline.cpp.

Referenced by grow(), has_item(), note_done(), put_token(), and return_item().

◆ my_sem

semaphore* tbb::internal::input_buffer::my_sem
private

for thread-bound filter, semaphore for waiting, NULL otherwise.

Definition at line 61 of file pipeline.cpp.

Referenced by create_sema(), free_sema(), sema_P(), sema_V(), and ~input_buffer().


The documentation for this class was generated from the following file:
tbb::internal::input_buffer::array_size
size_type array_size
Size of array.
Definition: pipeline.cpp:65
tbb::internal::basic_tls::destroy
int destroy()
Definition: tls.h:55
tbb::internal::input_buffer::sema_V
void sema_V()
Definition: pipeline.cpp:99
tbb::internal::input_buffer::size_type
Token size_type
Definition: pipeline.cpp:55
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
tbb::internal::input_buffer::end_of_input_tls_allocated
bool end_of_input_tls_allocated
Definition: pipeline.cpp:94
ITT_NOTIFY
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
tbb::internal::handle_perror
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info.
Definition: tbb_misc.cpp:87
tbb::internal::poison_pointer
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
tbb::internal::semaphore::V
void V()
post/release
Definition: semaphore.h:110
tbb::internal::input_buffer::array_mutex
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:72
tbb::internal::Token
unsigned long Token
Definition: pipeline.h:43
tbb::internal::basic_tls::create
int create()
Definition: tls.h:48
tbb::internal::input_buffer::is_bound
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:89
tbb::spin_mutex::scoped_lock
friend class scoped_lock
Definition: spin_mutex.h:179
new_size
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
Definition: ittnotify_static.h:163
tbb::internal::input_buffer::destroy_my_tls
void destroy_my_tls()
Definition: pipeline.cpp:226
tbb::internal::basic_tls::get
T get()
Definition: tls.h:57
tbb::internal::task_info::my_token
Token my_token
Invalid unless a task went through an ordered stage.
Definition: pipeline.cpp:33
lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Definition: ittnotify_static.h:121
tbb::internal::semaphore::P
void P()
wait/acquire
Definition: semaphore.h:105
tbb::internal::input_buffer::free_sema
void free_sema()
Definition: pipeline.cpp:97
tbb::internal::input_buffer::array
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:58
tbb::internal::input_buffer::create_sema
void create_sema(size_t initial_tokens)
Definition: pipeline.cpp:96
tbb::internal::input_buffer::initial_buffer_size
static const size_type initial_buffer_size
Initial size for "array".
Definition: pipeline.cpp:80
tbb::internal::input_buffer::grow
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:231
tbb::internal::input_buffer::is_ordered
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:86
tbb::internal::basic_tls::set
void set(T value)
Definition: tls.h:56
sync_releasing
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
Definition: ittnotify_static.h:104
tbb::internal::tokendiff_t
long tokendiff_t
Definition: pipeline.h:44
tbb::internal::input_buffer::high_token
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:83
tbb::internal::input_buffer::low_token
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:69
tbb::internal::input_buffer::my_sem
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:61
tbb::internal::input_buffer::end_of_input_tls
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:93

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.