173 template<
typename StageTask>
188 spawner.spawn_stage_task(wakee);
191 #if __TBB_TASK_GROUP_CONTEXT
192 void clear(
filter* my_filter ) {
239 new_array[i].is_valid =
false;
241 for(
size_type i=0; i<old_size; ++i, ++t )
242 new_array[t&(
new_size-1)] = old_array[t&(old_size-1)];
282 #if __TBB_TASK_GROUP_CONTEXT
291 #endif // __TBB_TASK_GROUP_CONTEXT
318 goto process_another_stage;
369 goto process_another_stage;
379 if(ntokens_avail == 1) {
380 my_pipeline.filter_list->my_input_buffer->sema_V();
392 process_another_stage:
416 filter* first_suitable_filter = current_filter;
417 while( current_filter ) {
447 if( !current_filter ) {
452 current_filter = first_suitable_filter;
458 first_suitable_filter = first_suitable_filter->
next_segment;
459 current_filter = first_suitable_filter;
479 for(
filter* subfilter=
first->next_filter_in_pipeline;
481 subfilter=subfilter->next_filter_in_pipeline )
483 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
486 head_of_previous_segment = subfilter;
493 #if _MSC_VER && !defined(__INTEL_COMPILER)
496 #pragma warning (disable: 4127)
507 #if __TBB_TASK_GROUP_CONTEXT
517 void pipeline::inject_token(
task& ) {
521 #if __TBB_TASK_GROUP_CONTEXT
522 void pipeline::clear_filters() {
523 for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
525 if( internal::input_buffer* b = f->my_input_buffer )
531 pipeline::pipeline() :
536 has_thread_bound_filters(false)
542 pipeline::~pipeline() {
546 void pipeline::clear() {
548 for( filter* f = filter_list; f; f=next ) {
549 if( internal::input_buffer* b = f->my_input_buffer ) {
551 f->my_input_buffer = NULL;
553 next=f->next_filter_in_pipeline;
557 f->my_pipeline = NULL;
560 f->next_segment = NULL;
562 filter_list = filter_end = NULL;
565 void pipeline::add_filter( filter& filter_ ) {
570 __TBB_ASSERT( !end_counter,
"invocation of add_filter on running pipeline" );
573 filter_.my_pipeline =
this;
574 filter_.prev_filter_in_pipeline = filter_end;
575 if ( filter_list == NULL)
576 filter_list = &filter_;
578 filter_end->next_filter_in_pipeline = &filter_;
579 filter_.next_filter_in_pipeline = NULL;
580 filter_end = &filter_;
583 filter_end =
reinterpret_cast<filter*
>(&filter_list);
585 *
reinterpret_cast<filter**
>(filter_end) = &filter_;
586 filter_end =
reinterpret_cast<filter*
>(&filter_.next_filter_in_pipeline);
587 *
reinterpret_cast<filter**
>(filter_end) = NULL;
590 if( filter_.is_serial() ) {
591 if( filter_.is_bound() )
592 has_thread_bound_filters =
true;
593 filter_.my_input_buffer =
new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
595 if(filter_.prev_filter_in_pipeline) {
596 if(filter_.prev_filter_in_pipeline->is_bound()) {
598 filter_.my_input_buffer =
new internal::input_buffer(
false,
false );
601 if(filter_.object_may_be_null() ) {
603 filter_.my_input_buffer =
new internal::input_buffer(
false,
false );
604 filter_.my_input_buffer->create_my_tls();
609 if( filter_.is_serial() ) {
610 filter_.my_input_buffer =
new internal::input_buffer( filter_.is_ordered(),
false );
616 void pipeline::remove_filter( filter& filter_ ) {
619 __TBB_ASSERT( !end_counter,
"invocation of remove_filter on running pipeline" );
620 if (&filter_ == filter_list)
621 filter_list = filter_.next_filter_in_pipeline;
623 __TBB_ASSERT( filter_.prev_filter_in_pipeline,
"filter list broken?" );
624 filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
626 if (&filter_ == filter_end)
627 filter_end = filter_.prev_filter_in_pipeline;
629 __TBB_ASSERT( filter_.next_filter_in_pipeline,
"filter list broken?" );
630 filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
632 if( internal::input_buffer* b = filter_.my_input_buffer ) {
634 filter_.my_input_buffer = NULL;
638 filter_.next_segment = NULL;
639 filter_.my_pipeline = NULL;
642 void pipeline::run(
size_t max_number_of_live_tokens
647 __TBB_ASSERT( max_number_of_live_tokens>0,
"pipeline::run must have at least one token" );
648 __TBB_ASSERT( !end_counter,
"pipeline already running?" );
650 internal::pipeline_cleaner my_pipeline_cleaner(*
this);
651 end_of_input =
false;
653 if(has_thread_bound_filters) {
655 if(filter_list->is_bound()) {
656 filter_list->my_input_buffer->sema_V();
659 #if __TBB_TASK_GROUP_CONTEXT
667 if(has_thread_bound_filters) {
668 for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
670 f->my_input_buffer->sema_V();
677 #if __TBB_TASK_GROUP_CONTEXT
678 void pipeline::run(
size_t max_number_of_live_tokens ) {
686 run(max_number_of_live_tokens, context);
689 #endif // __TBB_TASK_GROUP_CONTEXT
693 __TBB_ASSERT(my_input_buffer,
"has_more_work() called for filter with no input buffer");
700 my_pipeline->remove_filter(*
this);
712 my_pipeline->end_of_input =
true;
714 __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
715 my_input_buffer->set_my_tls_end_of_input();
720 return internal_process_item(
true);
724 return internal_process_item(
false);
728 __TBB_ASSERT(my_pipeline != NULL,
"It's not supposed that process_item is called for a filter that is not in a pipeline.");
732 if( my_pipeline->end_of_input && !has_more_work() )
733 return end_of_stream;
735 if( !prev_filter_in_pipeline ) {
736 if( my_pipeline->end_of_input )
737 return end_of_stream;
738 while( my_pipeline->input_tokens == 0 ) {
740 return item_not_available;
741 my_input_buffer->sema_P();
745 __TBB_ASSERT(my_pipeline->input_tokens > 0,
"Token failed in thread-bound filter");
746 my_pipeline->input_tokens--;
748 info.
my_token = my_pipeline->token_counter;
751 my_pipeline->token_counter++;
753 my_pipeline->end_of_input =
true;
754 return end_of_stream;
757 while( !my_input_buffer->has_item() ) {
759 return item_not_available;
761 my_input_buffer->sema_P();
762 if( my_pipeline->end_of_input && !has_more_work() ) {
763 return end_of_stream;
766 if( !my_input_buffer->return_item(info,
true) ) {
771 if( next_filter_in_pipeline ) {
772 if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,
true) ) {
773 __TBB_ASSERT(
false,
"Couldn't put token after thread-bound buffer");
776 size_t ntokens_avail = ++(my_pipeline->input_tokens);
777 if( my_pipeline->filter_list->is_bound() ) {
778 if( ntokens_avail == 1 ) {
779 my_pipeline->filter_list->my_input_buffer->sema_V();