Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
pipeline.cpp
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/pipeline.h"
18#include "tbb/spin_mutex.h"
20#include "itt_notify.h"
21#include "semaphore.h"
22#include "tls.h" // for parallel filters that do not use NULL as end_of_input
23
24
25namespace tbb {
26
27namespace internal {
28
30struct task_info {
31 void* my_object;
39 void reset() {
40 my_object = NULL;
41 my_token = 0;
42 my_token_ready = false;
43 is_valid = false;
44 }
45};
47
50 friend class tbb::filter;
53 friend class tbb::pipeline;
54
56
59
62
64
66
68
70
73
75
76 void grow( size_type minimum_size );
77
79
81
84
87
90
94 bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
95
96 void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
97 void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
98 void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
99 void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
100
101public:
103 input_buffer( bool is_ordered_, bool is_bound_ ) :
104 array(NULL), my_sem(NULL), array_size(0),
105 low_token(0), high_token(0),
106 is_ordered(is_ordered_), is_bound(is_bound_),
109 __TBB_ASSERT( array, NULL );
110 if(is_bound) create_sema(0);
111 }
112
115 __TBB_ASSERT( array, NULL );
118 if(my_sem) {
119 free_sema();
120 }
123 }
124 }
125
127
135 bool put_token( task_info& info_, bool force_put = false ) {
136 {
137 info_.is_valid = true;
139 Token token;
140 bool was_empty = !array[low_token&(array_size-1)].is_valid;
141 if( is_ordered ) {
142 if( !info_.my_token_ready ) {
143 info_.my_token = high_token++;
144 info_.my_token_ready = true;
145 }
146 token = info_.my_token;
147 } else
148 token = high_token++;
149 __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
150 if( token!=low_token || is_bound || force_put ) {
151 // Trying to put token that is beyond low_token.
152 // Need to wait until low_token catches up before dispatching.
153 if( token-low_token>=array_size )
154 grow( token-low_token+1 );
155 ITT_NOTIFY( sync_releasing, this );
156 array[token&(array_size-1)] = info_;
157 if(was_empty && is_bound) {
158 sema_V();
159 }
160 return true;
161 }
162 }
163 return false;
164 }
165
167
168 // Uses template to avoid explicit dependency on stage_task.
169 // This is only called for serial filters, and is the reason for the
170 // advance parameter in return_item (we're incrementing low_token here.)
171 // Non-TBF serial stages don't advance the token at the start because the presence
172 // of the current token in the buffer keeps another stage from being spawned.
173 template<typename StageTask>
174 void note_done( Token token, StageTask& spawner ) {
175 task_info wakee;
176 wakee.reset();
177 {
179 if( !is_ordered || token==low_token ) {
180 // Wake the next task
181 task_info& item = array[++low_token & (array_size-1)];
182 ITT_NOTIFY( sync_acquired, this );
183 wakee = item;
184 item.is_valid = false;
185 }
186 }
187 if( wakee.is_valid )
188 spawner.spawn_stage_task(wakee);
189 }
190
191#if __TBB_TASK_GROUP_CONTEXT
193 void clear( filter* my_filter ) {
194 long t=low_token;
195 for( size_type i=0; i<array_size; ++i, ++t ){
196 task_info& temp = array[t&(array_size-1)];
197 if (temp.is_valid ) {
198 my_filter->finalize(temp.my_object);
199 temp.is_valid = false;
200 }
201 }
202 }
203#endif
204
206 // is parallel (as indicated by advance == true). If the filter is serial, leave the
207 // item in the buffer to keep another stage from being spawned.
208 bool return_item(task_info& info, bool advance) {
210 task_info& item = array[low_token&(array_size-1)];
211 ITT_NOTIFY( sync_acquired, this );
212 if( item.is_valid ) {
213 info = item;
214 item.is_valid = false;
215 if (advance) low_token++;
216 return true;
217 }
218 return false;
219 }
220
223
224 // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
225 void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
226 void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
227 bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
229};
230
231void input_buffer::grow( size_type minimum_size ) {
232 size_type old_size = array_size;
233 size_type new_size = old_size ? 2*old_size : initial_buffer_size;
234 while( new_size<minimum_size )
235 new_size*=2;
237 task_info* old_array = array;
238 for( size_type i=0; i<new_size; ++i )
239 new_array[i].is_valid = false;
240 long t=low_token;
241 for( size_type i=0; i<old_size; ++i, ++t )
242 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
243 array = new_array;
245 if( old_array )
247}
248
249class stage_task: public task, public task_info {
250private:
251 friend class tbb::pipeline;
252 pipeline& my_pipeline;
256
257public:
259
260 stage_task( pipeline& pipeline ) :
261 my_pipeline(pipeline),
262 my_filter(pipeline.filter_list),
263 my_at_start(true)
264 {
266 }
268 stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
269 task_info(info),
270 my_pipeline(pipeline),
271 my_filter(filter_),
272 my_at_start(false)
273 {}
275 void reset() {
277 my_filter = my_pipeline.filter_list;
278 my_at_start = true;
279 }
282#if __TBB_TASK_GROUP_CONTEXT
284 {
286 __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
288 my_object = NULL;
289 }
290 }
291#endif // __TBB_TASK_GROUP_CONTEXT
293 void spawn_stage_task(const task_info& info)
294 {
295 stage_task* clone = new (allocate_additional_child_of(*parent()))
297 spawn(*clone);
298 }
299};
300
302 __TBB_ASSERT( !my_at_start || !my_object, NULL );
303 __TBB_ASSERT( !my_filter->is_bound(), NULL );
304 if( my_at_start ) {
305 if( my_filter->is_serial() ) {
306 my_object = (*my_filter)(my_object);
307 if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
308 {
309 if( my_filter->is_ordered() ) {
310 my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
311 my_token_ready = true;
313 if( my_pipeline.has_thread_bound_filters )
314 my_pipeline.token_counter++; // ideally, with relaxed semantics
315 }
316 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
317 reset();
318 goto process_another_stage;
319 } else {
320 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
321 if( --my_pipeline.input_tokens>0 )
322 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
323 }
324 } else {
325 my_pipeline.end_of_input = true;
326 return NULL;
327 }
328 } else /*not is_serial*/ {
329 if( my_pipeline.end_of_input )
330 return NULL;
332 if( my_pipeline.has_thread_bound_filters )
333 my_pipeline.token_counter++;
334 }
335 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
336 if( --my_pipeline.input_tokens>0 )
337 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
338 my_object = (*my_filter)(my_object);
340 {
341 my_pipeline.end_of_input = true;
343 if( my_pipeline.has_thread_bound_filters )
344 my_pipeline.token_counter--; // fix token_counter
345 }
346 return NULL;
347 }
348 }
349 my_at_start = false;
350 } else {
351 my_object = (*my_filter)(my_object);
352 if( my_filter->is_serial() )
354 }
356 if( my_filter ) {
357 // There is another filter to execute.
358 if( my_filter->is_serial() ) {
359 // The next filter must execute tokens in order
360 if( my_filter->my_input_buffer->put_token(*this) ){
361 // Can't proceed with the same item
362 if( my_filter->is_bound() ) {
363 // Find the next non-thread-bound filter
364 do {
366 } while( my_filter && my_filter->is_bound() );
367 // Check if there is an item ready to process
369 goto process_another_stage;
370 }
371 my_filter = NULL; // To prevent deleting my_object twice if exception occurs
372 return NULL;
373 }
374 }
375 } else {
376 // Reached end of the pipe.
377 size_t ntokens_avail = ++my_pipeline.input_tokens;
378 if(my_pipeline.filter_list->is_bound() ) {
379 if(ntokens_avail == 1) {
380 my_pipeline.filter_list->my_input_buffer->sema_V();
381 }
382 return NULL;
383 }
384 if( ntokens_avail>1 // Only recycle if there is one available token
385 || my_pipeline.end_of_input ) {
386 return NULL; // No need to recycle for new input
387 }
388 ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
389 // Recycle as an input stage task.
390 reset();
391 }
392process_another_stage:
393 /* A semi-hackish way to reexecute the same task object immediately without spawning.
394 recycle_as_continuation marks the task for future execution,
395 and then 'this' pointer is returned to bypass spawning. */
397 return this;
398}
399
401 pipeline& my_pipeline;
403
405 if( !my_pipeline.end_of_input )
406 if( !my_pipeline.filter_list->is_bound() )
407 if( my_pipeline.input_tokens > 0 ) {
409 set_ref_count(1);
410 return new( allocate_child() ) stage_task( my_pipeline );
411 }
412 if( do_segment_scanning ) {
413 filter* current_filter = my_pipeline.filter_list->next_segment;
414 /* first non-thread-bound filter that follows thread-bound one
415 and may have valid items to process */
416 filter* first_suitable_filter = current_filter;
417 while( current_filter ) {
418 __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
419 __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
420 if( !my_pipeline.end_of_input || current_filter->has_more_work())
421 {
422 task_info info;
423 info.reset();
424 task* bypass = NULL;
425 int refcnt = 0;
426 task_list list;
427 // No new tokens are created; it's OK to process all waiting tokens.
428 // If the filter is serial, the second call to return_item will return false.
429 while( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
430 task* t = new( allocate_child() ) stage_task( my_pipeline, current_filter, info );
431 if( ++refcnt == 1 )
432 bypass = t;
433 else // there's more than one task
434 list.push_back(*t);
435 // TODO: limit the list size (to arena size?) to spawn tasks sooner
436 __TBB_ASSERT( refcnt <= int(my_pipeline.token_counter), "token counting error" );
437 info.reset();
438 }
439 if( refcnt ) {
440 set_ref_count( refcnt );
441 if( refcnt > 1 )
442 spawn(list);
444 return bypass;
445 }
446 current_filter = current_filter->next_segment;
447 if( !current_filter ) {
448 if( !my_pipeline.end_of_input ) {
450 return this;
451 }
452 current_filter = first_suitable_filter;
453 __TBB_Yield();
454 }
455 } else {
456 /* The preceding pipeline segment is empty.
457 Fast-forward to the next post-TBF segment. */
458 first_suitable_filter = first_suitable_filter->next_segment;
459 current_filter = first_suitable_filter;
460 }
461 } /* while( current_filter ) */
462 return NULL;
463 } else {
464 if( !my_pipeline.end_of_input ) {
466 return this;
467 }
468 return NULL;
469 }
470 }
471public:
472 pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
473 {
474 __TBB_ASSERT( my_pipeline.filter_list, NULL );
475 filter* first = my_pipeline.filter_list;
476 if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
477 // Scanning the pipeline for segments
478 filter* head_of_previous_segment = first;
479 for( filter* subfilter=first->next_filter_in_pipeline;
480 subfilter!=NULL;
481 subfilter=subfilter->next_filter_in_pipeline )
482 {
483 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
484 do_segment_scanning = true;
485 head_of_previous_segment->next_segment = subfilter;
486 head_of_previous_segment = subfilter;
487 }
488 }
489 }
490 }
491};
492
493#if _MSC_VER && !defined(__INTEL_COMPILER)
494 // Workaround for overzealous compiler warnings
495 // Suppress compiler warning about constant conditional expression
496 #pragma warning (disable: 4127)
497#endif
498
499// The class destroys end_counter and clears all input buffers if pipeline was cancelled.
501 pipeline& my_pipeline;
502public:
503 pipeline_cleaner(pipeline& _pipeline) :
504 my_pipeline(_pipeline)
505 {}
507#if __TBB_TASK_GROUP_CONTEXT
508 if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
509 my_pipeline.clear_filters();
510#endif
511 my_pipeline.end_counter = NULL;
512 }
513};
514
515} // namespace internal
516
517void pipeline::inject_token( task& ) {
518 __TBB_ASSERT(false,"illegal call to inject_token");
519}
520
521#if __TBB_TASK_GROUP_CONTEXT
522void pipeline::clear_filters() {
523 for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
524 if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
525 if( internal::input_buffer* b = f->my_input_buffer )
526 b->clear(f);
527 }
528}
529#endif
530
531pipeline::pipeline() :
532 filter_list(NULL),
533 filter_end(NULL),
534 end_counter(NULL),
535 end_of_input(false),
536 has_thread_bound_filters(false)
537{
538 token_counter = 0;
539 input_tokens = 0;
540}
541
542pipeline::~pipeline() {
543 clear();
544}
545
546void pipeline::clear() {
547 filter* next;
548 for( filter* f = filter_list; f; f=next ) {
549 if( internal::input_buffer* b = f->my_input_buffer ) {
550 delete b;
551 f->my_input_buffer = NULL;
552 }
553 next=f->next_filter_in_pipeline;
554 f->next_filter_in_pipeline = filter::not_in_pipeline();
555 if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
556 f->prev_filter_in_pipeline = filter::not_in_pipeline();
557 f->my_pipeline = NULL;
558 }
559 if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
560 f->next_segment = NULL;
561 }
562 filter_list = filter_end = NULL;
563}
564
565void pipeline::add_filter( filter& filter_ ) {
566#if TBB_USE_ASSERT
567 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
568 __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
569 __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
570 __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
571#endif
572 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
573 filter_.my_pipeline = this;
574 filter_.prev_filter_in_pipeline = filter_end;
575 if ( filter_list == NULL)
576 filter_list = &filter_;
577 else
578 filter_end->next_filter_in_pipeline = &filter_;
579 filter_.next_filter_in_pipeline = NULL;
580 filter_end = &filter_;
581 } else {
582 if( !filter_end )
583 filter_end = reinterpret_cast<filter*>(&filter_list);
584
585 *reinterpret_cast<filter**>(filter_end) = &filter_;
586 filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
587 *reinterpret_cast<filter**>(filter_end) = NULL;
588 }
589 if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
590 if( filter_.is_serial() ) {
591 if( filter_.is_bound() )
592 has_thread_bound_filters = true;
593 filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
594 } else {
595 if(filter_.prev_filter_in_pipeline) {
596 if(filter_.prev_filter_in_pipeline->is_bound()) {
597 // successors to bound filters must have an input_buffer
598 filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
599 }
600 } else { // input filter
601 if(filter_.object_may_be_null() ) {
602 //TODO: buffer only needed to hold TLS; could improve
603 filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
604 filter_.my_input_buffer->create_my_tls();
605 }
606 }
607 }
608 } else {
609 if( filter_.is_serial() ) {
610 filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
611 }
612 }
613
614}
615
616void pipeline::remove_filter( filter& filter_ ) {
617 __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
618 __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
619 __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
620 if (&filter_ == filter_list)
621 filter_list = filter_.next_filter_in_pipeline;
622 else {
623 __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
624 filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
625 }
626 if (&filter_ == filter_end)
627 filter_end = filter_.prev_filter_in_pipeline;
628 else {
629 __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
630 filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
631 }
632 if( internal::input_buffer* b = filter_.my_input_buffer ) {
633 delete b;
634 filter_.my_input_buffer = NULL;
635 }
636 filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
637 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
638 filter_.next_segment = NULL;
639 filter_.my_pipeline = NULL;
640}
641
642void pipeline::run( size_t max_number_of_live_tokens
644 , tbb::task_group_context& context
645#endif
646 ) {
647 __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
648 __TBB_ASSERT( !end_counter, "pipeline already running?" );
649 if( filter_list ) {
650 internal::pipeline_cleaner my_pipeline_cleaner(*this);
651 end_of_input = false;
652 input_tokens = internal::Token(max_number_of_live_tokens);
653 if(has_thread_bound_filters) {
654 // release input filter if thread-bound
655 if(filter_list->is_bound()) {
656 filter_list->my_input_buffer->sema_V();
657 }
658 }
659#if __TBB_TASK_GROUP_CONTEXT
660 end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
661#else
662 end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
663#endif
664 // Start execution of tasks
665 task::spawn_root_and_wait( *end_counter );
666
667 if(has_thread_bound_filters) {
668 for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
669 if(f->is_bound()) {
670 f->my_input_buffer->sema_V(); // wake to end
671 }
672 }
673 }
674 }
675}
676
677#if __TBB_TASK_GROUP_CONTEXT
678void pipeline::run( size_t max_number_of_live_tokens ) {
679 if( filter_list ) {
680 // Construct task group context with the exception propagation mode expected
681 // by the pipeline caller.
682 uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
684 task_group_context::default_traits & ~task_group_context::exact_exception;
685 task_group_context context(task_group_context::bound, ctx_traits);
686 run(max_number_of_live_tokens, context);
687 }
688}
689#endif // __TBB_TASK_GROUP_CONTEXT
690
692 __TBB_ASSERT(my_pipeline, NULL);
693 __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
694 return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
695}
696
698 if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
699 if ( next_filter_in_pipeline != filter::not_in_pipeline() )
700 my_pipeline->remove_filter(*this);
701 else
702 __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
703 } else {
704 __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
705 }
706}
707
709 __TBB_ASSERT(my_input_buffer, NULL);
710 __TBB_ASSERT(object_may_be_null(), NULL);
711 if(is_serial()) {
712 my_pipeline->end_of_input = true;
713 } else {
714 __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
715 my_input_buffer->set_my_tls_end_of_input();
716 }
717}
718
720 return internal_process_item(true);
721}
722
724 return internal_process_item(false);
725}
726
728 __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
730 info.reset();
731
732 if( my_pipeline->end_of_input && !has_more_work() )
733 return end_of_stream;
734
735 if( !prev_filter_in_pipeline ) {
736 if( my_pipeline->end_of_input )
737 return end_of_stream;
738 while( my_pipeline->input_tokens == 0 ) {
739 if( !is_blocking )
740 return item_not_available;
741 my_input_buffer->sema_P();
742 }
743 info.my_object = (*this)(info.my_object);
744 if( info.my_object ) {
745 __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
746 my_pipeline->input_tokens--;
747 if( is_ordered() ) {
748 info.my_token = my_pipeline->token_counter;
749 info.my_token_ready = true;
750 }
751 my_pipeline->token_counter++; // ideally, with relaxed semantics
752 } else {
753 my_pipeline->end_of_input = true;
754 return end_of_stream;
755 }
756 } else { /* this is not an input filter */
757 while( !my_input_buffer->has_item() ) {
758 if( !is_blocking ) {
759 return item_not_available;
760 }
761 my_input_buffer->sema_P();
762 if( my_pipeline->end_of_input && !has_more_work() ) {
763 return end_of_stream;
764 }
765 }
766 if( !my_input_buffer->return_item(info, /*advance*/true) ) {
767 __TBB_ASSERT(false,"return_item failed");
768 }
769 info.my_object = (*this)(info.my_object);
770 }
771 if( next_filter_in_pipeline ) {
772 if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
773 __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
774 }
775 } else {
776 size_t ntokens_avail = ++(my_pipeline->input_tokens);
777 if( my_pipeline->filter_list->is_bound() ) {
778 if( ntokens_avail == 1 ) {
779 my_pipeline->filter_list->my_input_buffer->sema_V();
780 }
781 }
782 }
783
784 return success;
785}
786
787} // tbb
788
#define __TBB_Yield()
Definition: ibm_aix51.h:44
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:41
#define __TBB_TASK_GROUP_CONTEXT
Definition: tbb_config.h:541
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
The graph class.
long tokendiff_t
Definition: pipeline.h:44
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info.
Definition: tbb_misc.cpp:87
unsigned long Token
Definition: pipeline.h:43
auto first(Container &c) -> decltype(begin(c))
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
A stage in a pipeline.
Definition: pipeline.h:64
bool is_ordered() const
True if filter must receive stream in order.
Definition: pipeline.h:133
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:173
bool has_more_work()
has the filter not yet processed all the tokens it will ever see?
Definition: pipeline.cpp:691
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
Definition: pipeline.h:159
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:67
static const unsigned char version_mask
Definition: pipeline.h:92
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:164
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:138
static const unsigned char exact_exception_propagation
7th bit defines exception propagation mode expected by the application.
Definition: pipeline.h:84
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
Definition: pipeline.h:191
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
Definition: pipeline.h:181
bool object_may_be_null()
true if an input filter can emit null
Definition: pipeline.h:143
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:184
void __TBB_EXPORTED_METHOD set_end_of_input()
Definition: pipeline.cpp:708
bool is_serial() const
True if filter is serial.
Definition: pipeline.h:128
virtual __TBB_EXPORTED_METHOD ~filter()
Destroy filter.
Definition: pipeline.cpp:697
A stage in a pipeline served by a user thread.
Definition: pipeline.h:196
result_type __TBB_EXPORTED_METHOD try_process_item()
If a data item is available, invoke operator() on that item.
Definition: pipeline.cpp:723
result_type internal_process_item(bool is_blocking)
Internal routine for item processing.
Definition: pipeline.cpp:727
result_type __TBB_EXPORTED_METHOD process_item()
Wait until a data item becomes available, and invoke operator() on that item.
Definition: pipeline.cpp:719
A lock that occupies a single byte.
Definition: spin_mutex.h:39
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Used to form groups of tasks.
Definition: task.h:358
Base class for user-defined tasks.
Definition: task.h:615
internal::allocate_child_proxy & allocate_child()
Returns proxy for overloaded new that allocates a child task of *this.
Definition: task.h:681
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
Definition: task.h:865
bool is_cancelled() const
Returns true if the context has received cancellation request.
Definition: task.h:974
void recycle_as_continuation()
Change this to be a continuation of its former self.
Definition: task.h:711
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:663
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:808
A list of children.
Definition: task.h:1074
void push_back(task &task)
Push task onto back of list.
Definition: task.h:1091
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
This structure is used to store task information in a input buffer.
Definition: pipeline.cpp:30
Token my_token
Invalid unless a task went through an ordered stage.
Definition: pipeline.cpp:33
bool is_valid
True if my_object is valid.
Definition: pipeline.cpp:37
bool my_token_ready
False until my_token is set.
Definition: pipeline.cpp:35
void reset()
Set to initial state (no object, no token)
Definition: pipeline.cpp:39
A buffer of input items for a filter.
Definition: pipeline.cpp:48
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:83
static const size_type initial_buffer_size
Initial size for "array".
Definition: pipeline.cpp:80
friend class tbb::pipeline
Definition: pipeline.cpp:53
bool put_token(task_info &info_, bool force_put=false)
Put a token into the buffer.
Definition: pipeline.cpp:135
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:93
bool has_item()
true if the current low_token is valid.
Definition: pipeline.cpp:222
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:231
basic_tls< intptr_t > end_of_input_tls_t
for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
Definition: pipeline.cpp:92
void note_done(Token token, StageTask &spawner)
Note that processing of a token is finished.
Definition: pipeline.cpp:174
bool return_item(task_info &info, bool advance)
return an item, invalidate the queued item, but only advance if the filter
Definition: pipeline.cpp:208
void create_sema(size_t initial_tokens)
Definition: pipeline.cpp:96
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:72
~input_buffer()
Destroy the buffer.
Definition: pipeline.cpp:114
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:89
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:69
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:58
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:86
input_buffer(bool is_ordered_, bool is_bound_)
Construct empty buffer.
Definition: pipeline.cpp:103
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:61
size_type array_size
Size of array.
Definition: pipeline.cpp:65
friend class tbb::pipeline
Definition: pipeline.cpp:251
stage_task(pipeline &pipeline, filter *filter_, const task_info &info)
Construct stage_task for a subsequent stage in a pipeline.
Definition: pipeline.cpp:268
stage_task(pipeline &pipeline)
Construct stage_task for first stage in a pipeline.
Definition: pipeline.cpp:260
bool my_at_start
True if this task has not yet read the input.
Definition: pipeline.cpp:255
void reset()
Roughly equivalent to the constructor of input stage task.
Definition: pipeline.cpp:275
task * execute() __TBB_override
The virtual task execution method.
Definition: pipeline.cpp:301
pipeline_root_task(pipeline &pipeline)
Definition: pipeline.cpp:472
task * execute() __TBB_override
Should be overridden by derived classes.
Definition: pipeline.cpp:404
pipeline_cleaner(pipeline &_pipeline)
Definition: pipeline.cpp:503
Edsger Dijkstra's counting semaphore.
Definition: semaphore.h:94
void P()
wait/acquire
Definition: semaphore.h:105
void V()
post/release
Definition: semaphore.h:110
void set(T value)
Definition: tls.h:56

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.