17 #ifndef __TBB_flow_graph_streaming_H
18 #define __TBB_flow_graph_streaming_H
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 #if __TBB_PREVIEW_STREAMING_NODE
30 template <
int N1,
int N2>
33 static const int size = N2 - N1 + 1;
40 template <
int N1,
int N2 = N1>
52 template <
int N1,
int N2>
57 template <
int N1,
int N2>
62 template <
typename... Args>
67 template <
typename T,
typename... Rest>
78 template<
typename Key>
84 template<
typename Key>
90 template<
typename Device,
typename Key>
103 template <
typename T>
108 template <
int N1,
int N2>
113 template <
int N1,
int N2>
118 template <
typename T>
123 template <
typename ...Args1>
126 template <
typename A1,
typename ...Args1>
128 static const size_t my_delta = 1;
130 template <
typename F,
typename Tuple,
typename ...Args2>
131 static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
134 template <
typename F,
typename Tuple,
typename ...Args2>
138 template <
typename F,
typename Tuple,
int N1,
int N2,
typename ...Args2>
141 args2..., std::get<N1 + my_delta>(t));
143 template <
typename F,
typename Tuple,
int N,
typename ...Args2>
148 template <
typename F,
typename Tuple,
int N1,
int N2,
typename ...Args2>
150 doit_impl(x, f, t,
fn(), args1..., args2...);
152 template <
typename F,
typename Tuple,
int N,
typename ...Args2>
154 doit_impl(x, f, t,
fn(), args1..., args2...);
160 template <
typename F,
typename Tuple,
typename ...Args2>
161 static void doit(F& f, Tuple&, Args2&... args2) {
167 template<
typename JP,
typename StreamFactory,
typename... Ports>
170 template <
typename T>
185 template<
typename StreamFactory,
typename KernelInputTuple,
typename =
void>
191 template <
typename ...Args>
193 factory.send_kernel( device, kernel, args... );
198 template<
typename StreamFactory,
typename KernelInputTuple>
207 struct range_wrapper {
209 virtual range_wrapper *clone()
const = 0;
213 struct range_value :
public range_wrapper {
223 return new range_value(my_value);
230 struct range_mapper :
public range_wrapper {
235 return get<N + 1>(ip).data(
false);
239 return new range_mapper<N>;
244 template <
typename ...Args>
246 __TBB_ASSERT(my_range_wrapper,
"Range is not set. Call set_range() before running streaming_node.");
247 factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
257 executor.my_range_wrapper = NULL;
261 if (my_range_wrapper)
delete my_range_wrapper;
265 my_range_wrapper =
new range_value(work_size);
269 my_range_wrapper =
new range_value(
std::move(work_size));
274 my_range_wrapper =
new range_mapper<N>;
279 my_range_wrapper =
new range_mapper<N>;
301 template<
typename... Args>
304 template<
typename... Ports,
typename JP,
typename StreamFactory>
307 :
public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
308 typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
319 typedef composite_node<input_tuple, output_tuple>
base_type;
334 return std::tie( internal::input_port<S>( my_indexer_node )... );
339 return std::tie( internal::output_port<S>( my_kernel_node )... );
352 make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
358 make_edge( my_indexer_node, my_device_selector_node );
359 make_edge( my_device_selector_node, my_join_node );
361 make_edge( my_join_node, my_kernel_node );
368 class device_selector_base {
370 virtual void operator()(
const indexer_node_output_type &v,
typename device_selector_node::output_ports_type &op ) = 0;
371 virtual device_selector_base *clone(
streaming_node &n )
const = 0;
375 template <
typename UserFunctor>
380 , my_user_functor( uf ), my_node(n), my_factory( f )
382 my_port_epoches.fill( 0 );
386 (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
388 || my_port_epoches[v.tag()] == 0,
"Epoch is changed when key matching is requested" );
392 return new device_selector( my_user_functor, n, my_factory );
395 typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(
size_t &,
const indexer_node_output_type &,
typename device_selector_node::output_ports_type &);
404 template <
typename T>
410 template <
typename T>
413 return key_from_message<key_type>( t );
418 typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
419 elem_type e = internal::cast_to<elem_type>( v );
421 my_factory.send_data( device, e );
422 get<N + 1>( op ).try_put( e );
425 template<
typename DevicePort >
428 if ( it == my_devices.end() ) {
430 std::tie( it, std::ignore ) = my_devices.insert( std::make_pair(
key,
d ) );
433 my_node.notify_new_device(
d );
435 epoch_desc &e = it->second;
437 if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
455 class device_selector_body {
460 (*my_device_selector)(v, op);
473 virtual args_storage_base *clone()
const = 0;
478 : my_kernel( kernel ), my_factory( f )
482 :
tbb::
internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
489 template <
typename... Args>
490 class args_storage :
public args_storage_base {
496 const auto& t = get<N + 1>( ip );
497 auto &port = get<N>( op );
498 return port.try_put( t );
510 : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
514 template <
typename... FnArgs>
526 : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
529 template <
typename... FnArgs>
531 my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
536 template<
typename FinalizeFn>
540 : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(),
fn ) {}
544 template <
typename... FnArgs>
557 : my_factory(factory), my_device(device), my_fn(
fn) {}
559 template <
typename... FnArgs>
561 my_factory.finalize( my_device, my_fn, args... );
566 template<
typename FinalizeFn>
568 return run_finalize_func<FinalizeFn>( ip, factory,
fn );
574 : my_factory(factory), my_device(
d ) {}
576 template <
typename... FnArgs>
578 my_factory.send_data( my_device, args... );
587 : args_storage_base( kernel, f )
588 , my_args_pack( std::forward<Args>(args)... )
591 args_storage(
const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
593 args_storage(
const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
601 tbb::internal::call( run_kernel_func( ip, n, *
this ), const_args_pack );
604 graph& g = n.my_graph;
606 g.increment_wait_count();
611 tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
612 g.decrement_wait_count();
613 }), const_args_pack );
620 tbb::internal::call( send_func( this->my_factory,
d ), my_args_pack );
625 return new args_storage<Args...>( *this );
639 __TBB_ASSERT( (my_node.my_args_storage != NULL),
"No arguments storage" );
641 my_node.my_args_storage->enqueue( ip, op, my_node );
648 struct wrap_to_async {
652 template <
typename T>
657 template <
typename... Args>
660 return new args_storage<Args...>(storage, std::forward<Args>(args)...);
664 my_args_storage->send(
d );
667 template <
typename ...Args>
673 template <
typename DeviceSelector>
676 , my_indexer_node( g )
677 , my_device_selector( new device_selector<DeviceSelector>(
d, *this, f ) )
678 , my_device_selector_node( g,
serial, device_selector_body( my_device_selector ) )
680 , my_kernel_node( g,
serial, kernel_body( *this ) )
682 , my_args_storage( make_args_storage( args_storage<>(kernel, f),
port_ref<0, NUM_INPUTS - 1>() ) )
684 base_type::set_external_ports( get_input_ports(), get_output_ports() );
690 , my_indexer_node( node.my_indexer_node )
691 , my_device_selector( node.my_device_selector->clone( *this ) )
692 , my_device_selector_node( node.my_graph,
serial, device_selector_body( my_device_selector ) )
693 , my_join_node( node.my_join_node )
694 , my_kernel_node( node.my_graph,
serial, kernel_body( *this ) )
695 , my_args_storage( node.my_args_storage->clone() )
697 base_type::set_external_ports( get_input_ports(), get_output_ports() );
703 , my_indexer_node( std::
move( node.my_indexer_node ) )
704 , my_device_selector( node.my_device_selector->clone(*this) )
705 , my_device_selector_node( node.my_graph,
serial, device_selector_body( my_device_selector ) )
706 , my_join_node( std::
move( node.my_join_node ) )
707 , my_kernel_node( node.my_graph,
serial, kernel_body( *this ) )
708 , my_args_storage( node.my_args_storage )
710 base_type::set_external_ports( get_input_ports(), get_output_ports() );
713 node.my_args_storage = NULL;
717 if ( my_args_storage )
delete my_args_storage;
718 if ( my_device_selector )
delete my_device_selector;
721 template <
typename... Args>
724 args_storage_base *
const new_args_storage = make_args_storage( *my_args_storage,
typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
725 delete my_args_storage;
726 my_args_storage = new_args_storage;
742 #endif // __TBB_PREVIEW_STREAMING_NODE
743 #endif // __TBB_flow_graph_streaming_H