Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::flow::interface11::limiter_node< T, DecrementType > Class Template Reference

Forwards messages only if the threshold has not been reached. More...

#include <flow_graph.h>

Collaboration diagram for tbb::flow::interface11::limiter_node< T, DecrementType >:

Public Types

typedef T input_type
 
typedef T output_type
 
typedef receiver< input_type >::predecessor_type predecessor_type
 
typedef sender< output_type >::successor_type successor_type
 

Public Member Functions

 limiter_node (graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
 Constructor. More...
 
 limiter_node (const limiter_node &src)
 Copy constructor. More...
 
bool register_successor (successor_type &r) __TBB_override
 Replace the current successor with this new successor. More...
 
bool remove_successor (successor_type &r) __TBB_override
 Removes a successor from this node. More...
 
bool register_predecessor (predecessor_type &src) __TBB_override
 Adds src to the list of cached predecessors. More...
 
bool remove_predecessor (predecessor_type &src) __TBB_override
 Removes src from the list of cached predecessors. More...
 

Public Attributes

internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
 The internal receiver< DecrementType > that decrements the count. More...
 

Protected Member Functions

tasktry_put_task (const T &t) __TBB_override
 Puts an item to this receiver. More...
 
graphgraph_reference () const __TBB_override
 
void reset_receiver (reset_flags) __TBB_override
 
void reset_node (reset_flags f) __TBB_override
 

Private Member Functions

bool check_conditions ()
 
taskforward_task ()
 
void forward ()
 
taskdecrement_counter (long long delta)
 
void initialize ()
 

Private Attributes

size_t my_threshold
 
size_t my_count
 
size_t my_tries
 
internal::reservable_predecessor_cache< T, spin_mutexmy_predecessors
 
spin_mutex my_mutex
 
internal::broadcast_cache< T > my_successors
 

Friends

class internal::forward_task_bypass< limiter_node< T, DecrementType > >
 
class internal::decrementer< limiter_node< T, DecrementType >, DecrementType >
 
template<typename R , typename B >
class run_and_put_task
 
template<typename X , typename Y >
class internal::broadcast_cache
 
template<typename X , typename Y >
class internal::round_robin_cache
 

Detailed Description

template<typename T, typename DecrementType = continue_msg>
class tbb::flow::interface11::limiter_node< T, DecrementType >

Forwards messages only if the threshold has not been reached.

This node forwards items until its threshold is reached. It contains no buffering. If the downstream node rejects, the message is dropped.

Definition at line 120 of file flow_graph.h.

Member Typedef Documentation

◆ input_type

template<typename T , typename DecrementType = continue_msg>
typedef T tbb::flow::interface11::limiter_node< T, DecrementType >::input_type

Definition at line 2979 of file flow_graph.h.

◆ output_type

template<typename T , typename DecrementType = continue_msg>
typedef T tbb::flow::interface11::limiter_node< T, DecrementType >::output_type

Definition at line 2980 of file flow_graph.h.

◆ predecessor_type

template<typename T , typename DecrementType = continue_msg>
typedef receiver<input_type>::predecessor_type tbb::flow::interface11::limiter_node< T, DecrementType >::predecessor_type

Definition at line 2981 of file flow_graph.h.

◆ successor_type

template<typename T , typename DecrementType = continue_msg>
typedef sender<output_type>::successor_type tbb::flow::interface11::limiter_node< T, DecrementType >::successor_type

Definition at line 2982 of file flow_graph.h.

Constructor & Destructor Documentation

◆ limiter_node() [1/2]

template<typename T , typename DecrementType = continue_msg>
tbb::flow::interface11::limiter_node< T, DecrementType >::limiter_node ( graph g,
__TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0)   
)
inline

Constructor.

Definition at line 3102 of file flow_graph.h.

3104  : graph_node(g), my_threshold(threshold), my_count(0),
3106  my_tries(0), decrement(),
3107  init_decrement_predecessors(num_decrement_predecessors),
3108  decrement(num_decrement_predecessors)) {
3109  initialize();
3110  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::initialize().

Here is the call graph for this function:

◆ limiter_node() [2/2]

template<typename T , typename DecrementType = continue_msg>
tbb::flow::interface11::limiter_node< T, DecrementType >::limiter_node ( const limiter_node< T, DecrementType > &  src)
inline

Copy constructor.

Definition at line 3121 of file flow_graph.h.

3121  :
3122  graph_node(src.my_graph), receiver<T>(), sender<T>(),
3123  my_threshold(src.my_threshold), my_count(0),
3125  my_tries(0), decrement(),
3126  init_decrement_predecessors(src.init_decrement_predecessors),
3127  decrement(src.init_decrement_predecessors)) {
3128  initialize();
3129  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::initialize().

Here is the call graph for this function:

Member Function Documentation

◆ check_conditions()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::check_conditions ( )
inlineprivate

◆ decrement_counter()

template<typename T , typename DecrementType = continue_msg>
task* tbb::flow::interface11::limiter_node< T, DecrementType >::decrement_counter ( long long  delta)
inlineprivate

Definition at line 3068 of file flow_graph.h.

3068  {
3069  {
3071  if( delta > 0 && size_t(delta) > my_count )
3072  my_count = 0;
3073  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3075  else
3076  my_count -= size_t(delta); // absolute value of delta is sufficiently small
3077  }
3078  return forward_task();
3079  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::forward_task(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, and tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold.

Here is the call graph for this function:

◆ forward()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::forward ( )
inlineprivate

Definition at line 3063 of file flow_graph.h.

3063  {
3064  __TBB_ASSERT(false, "Should never be called");
3065  return;
3066  }

References __TBB_ASSERT.

◆ forward_task()

template<typename T , typename DecrementType = continue_msg>
task* tbb::flow::interface11::limiter_node< T, DecrementType >::forward_task ( )
inlineprivate

Definition at line 3010 of file flow_graph.h.

3010  {
3011  input_type v;
3012  task *rval = NULL;
3013  bool reserved = false;
3014  {
3016  if ( check_conditions() )
3017  ++my_tries;
3018  else
3019  return NULL;
3020  }
3021 
3022  //SUCCESS
3023  // if we can reserve and can put, we consume the reservation
3024  // we increment the count and decrement the tries
3025  if ( (my_predecessors.try_reserve(v)) == true ){
3026  reserved=true;
3027  if ( (rval = my_successors.try_put_task(v)) != NULL ){
3028  {
3030  ++my_count;
3031  --my_tries;
3032  my_predecessors.try_consume();
3033  if ( check_conditions() ) {
3034  if ( internal::is_graph_active(this->my_graph) ) {
3035  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3036  internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3038  }
3039  }
3040  }
3041  return rval;
3042  }
3043  }
3044  //FAILURE
3045  //if we can't reserve, we decrement the tries
3046  //if we can reserve but can't put, we decrement the tries and release the reservation
3047  {
3049  --my_tries;
3050  if (reserved) my_predecessors.try_release();
3051  if ( check_conditions() ) {
3052  if ( internal::is_graph_active(this->my_graph) ) {
3053  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3054  internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3055  __TBB_ASSERT(!rval, "Have two tasks to handle");
3056  return rtask;
3057  }
3058  }
3059  return rval;
3060  }
3061  }

References __TBB_ASSERT, tbb::flow::interface11::limiter_node< T, DecrementType >::check_conditions(), tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries, and tbb::flow::interface11::internal::spawn_in_graph_arena().

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::decrement_counter().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ graph_reference()

template<typename T , typename DecrementType = continue_msg>
graph& tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference ( ) const
inlineprotected

Definition at line 3252 of file flow_graph.h.

3252 { return my_graph; }

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::forward_task(), tbb::flow::interface11::limiter_node< T, DecrementType >::register_predecessor(), and tbb::flow::interface11::limiter_node< T, DecrementType >::register_successor().

Here is the caller graph for this function:

◆ initialize()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::initialize ( )
inlineprivate

Definition at line 3081 of file flow_graph.h.

3081  {
3082  my_predecessors.set_owner(this);
3083  my_successors.set_owner(this);
3084  decrement.set_owner(this);
3086  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3087  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3088  static_cast<sender<output_type> *>(this)
3089  );
3090  }

References CODEPTR, tbb::flow::interface11::limiter_node< T, DecrementType >::decrement, tbb::internal::fgt_node(), tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, and tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors.

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::limiter_node().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ register_predecessor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::register_predecessor ( predecessor_type src)
inline

Adds src to the list of cached predecessors.

Definition at line 3202 of file flow_graph.h.

3202  {
3204  my_predecessors.add( src );
3205  if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3206  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3207  internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3209  }
3210  return true;
3211  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold, tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries, and tbb::flow::interface11::internal::spawn_in_graph_arena().

Here is the call graph for this function:

◆ register_successor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::register_successor ( successor_type r)
inline

Replace the current successor with this new successor.

Definition at line 3138 of file flow_graph.h.

3138  {
3140  bool was_empty = my_successors.empty();
3141  my_successors.register_successor(r);
3142  //spawn a forward task if this is the only successor
3143  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3144  if ( internal::is_graph_active(this->my_graph) ) {
3145  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3146  internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3148  }
3149  }
3150  return true;
3151  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold, tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries, and tbb::flow::interface11::internal::spawn_in_graph_arena().

Here is the call graph for this function:

◆ remove_predecessor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::remove_predecessor ( predecessor_type src)
inline

Removes src from the list of cached predecessors.

Definition at line 3214 of file flow_graph.h.

3214  {
3215  my_predecessors.remove( src );
3216  return true;
3217  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors.

◆ remove_successor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::remove_successor ( successor_type r)
inline

Removes a successor from this node.

r.remove_predecessor(*this) is also called.

Definition at line 3155 of file flow_graph.h.

3155  {
3156  r.remove_predecessor(*this);
3157  my_successors.remove_successor(r);
3158  return true;
3159  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors.

◆ reset_node()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::reset_node ( reset_flags  f)
inlineprotected

◆ reset_receiver()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::reset_receiver ( reset_flags  )
inlineprotected

Definition at line 3254 of file flow_graph.h.

3254  {
3255  __TBB_ASSERT(false,NULL); // should never be called
3256  }

References __TBB_ASSERT.

◆ try_put_task()

template<typename T , typename DecrementType = continue_msg>
task* tbb::flow::interface11::limiter_node< T, DecrementType >::try_put_task ( const T &  t)
inlineprotected

Puts an item to this receiver.

Definition at line 3225 of file flow_graph.h.

3225  {
3226  {
3228  if ( my_count + my_tries >= my_threshold )
3229  return NULL;
3230  else
3231  ++my_tries;
3232  }
3233 
3234  task * rtask = my_successors.try_put_task(t);
3235 
3236  if ( !rtask ) { // try_put_task failed.
3238  --my_tries;
3239  if (check_conditions() && internal::is_graph_active(this->my_graph)) {
3240  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3241  internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3242  }
3243  }
3244  else {
3246  ++my_count;
3247  --my_tries;
3248  }
3249  return rtask;
3250  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::check_conditions(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold, and tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries.

Here is the call graph for this function:

Friends And Related Function Documentation

◆ internal::broadcast_cache

template<typename T , typename DecrementType = continue_msg>
template<typename X , typename Y >
friend class internal::broadcast_cache
friend

Definition at line 3222 of file flow_graph.h.

◆ internal::decrementer< limiter_node< T, DecrementType >, DecrementType >

template<typename T , typename DecrementType = continue_msg>
friend class internal::decrementer< limiter_node< T, DecrementType >, DecrementType >
friend

Definition at line 3003 of file flow_graph.h.

◆ internal::forward_task_bypass< limiter_node< T, DecrementType > >

template<typename T , typename DecrementType = continue_msg>
friend class internal::forward_task_bypass< limiter_node< T, DecrementType > >
friend

Definition at line 3000 of file flow_graph.h.

◆ internal::round_robin_cache

template<typename T , typename DecrementType = continue_msg>
template<typename X , typename Y >
friend class internal::round_robin_cache
friend

Definition at line 3223 of file flow_graph.h.

◆ run_and_put_task

template<typename T , typename DecrementType = continue_msg>
template<typename R , typename B >
friend class run_and_put_task
friend

Definition at line 3221 of file flow_graph.h.

Member Data Documentation

◆ decrement

template<typename T , typename DecrementType = continue_msg>
internal::decrementer< limiter_node<T, DecrementType>, DecrementType > tbb::flow::interface11::limiter_node< T, DecrementType >::decrement

The internal receiver< DecrementType > that decrements the count.

Definition at line 3093 of file flow_graph.h.

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::initialize(), and tbb::flow::interface11::limiter_node< T, DecrementType >::reset_node().

◆ my_count

◆ my_mutex

◆ my_predecessors

◆ my_successors

◆ my_threshold

◆ my_tries


The documentation for this class was generated from the following file:
tbb::flow::interface11::internal::is_graph_active
bool is_graph_active(tbb::flow::interface10::graph &g)
Definition: _flow_graph_impl.h:494
tbb::flow::interface11::limiter_node::input_type
T input_type
Definition: flow_graph.h:2979
tbb::flow::interface11::internal::spawn_in_graph_arena
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
Definition: _flow_graph_impl.h:521
CODEPTR
#define CODEPTR()
Definition: _flow_graph_trace_impl.h:297
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
__TBB_DEPRECATED_LIMITER_ARG4
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
Definition: _flow_graph_impl.h:54
tbb::flow::interface11::limiter_node::check_conditions
bool check_conditions()
Definition: flow_graph.h:3005
tbb::flow::interface11::limiter_node::my_successors
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2997
tbb::spin_mutex::scoped_lock
friend class scoped_lock
Definition: spin_mutex.h:179
tbb::flow::interface11::limiter_node::my_tries
size_t my_tries
Definition: flow_graph.h:2994
lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Definition: ittnotify_static.h:121
tbb::flow::interface11::limiter_node::my_predecessors
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2995
tbb::flow::interface11::limiter_node::my_count
size_t my_count
Definition: flow_graph.h:2993
tbb::flow::interface11::limiter_node::initialize
void initialize()
Definition: flow_graph.h:3081
task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
Definition: ittnotify_static.h:119
tbb::flow::interface11::limiter_node::my_mutex
spin_mutex my_mutex
Definition: flow_graph.h:2996
tbb::flow::interface11::rf_clear_edges
@ rf_clear_edges
Definition: _flow_graph_impl.h:161
tbb::internal::fgt_node
static void fgt_node(void *, string_index, void *, void *)
Definition: _flow_graph_trace_impl.h:326
tbb::flow::interface11::limiter_node::my_threshold
size_t my_threshold
Definition: flow_graph.h:2992
internal::forward_task_bypass
A task that calls a node's forward_task function.
Definition: _flow_graph_body_impl.h:270
tbb::flow::interface11::limiter_node::forward_task
task * forward_task()
Definition: flow_graph.h:3010
tbb::flow::interface11::limiter_node::decrement
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:3093
tbb::flow::interface11::limiter_node::graph_reference
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:3252

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.