Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_streaming_node.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_streaming_H
18 #define __TBB_flow_graph_streaming_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #if __TBB_PREVIEW_STREAMING_NODE
25 
26 // Included in namespace tbb::flow::interfaceX (in flow_graph.h)
27 
28 namespace internal {
29 
30 template <int N1, int N2>
31 struct port_ref_impl {
32  // "+1" since the port_ref range is a closed interval (includes its endpoints).
33  static const int size = N2 - N1 + 1;
34 };
35 
36 } // internal
37 
38 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
39 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
40 template <int N1, int N2 = N1>
43 };
44 
45 namespace internal {
46 
47 template <typename T>
48 struct num_arguments {
49  static const int value = 1;
50 };
51 
52 template <int N1, int N2>
53 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
54  static const int value = port_ref_impl<N1,N2>::size;
55 };
56 
57 template <int N1, int N2>
58 struct num_arguments<port_ref_impl<N1,N2>> {
59  static const int value = port_ref_impl<N1,N2>::size;
60 };
61 
62 template <typename... Args>
63 void ignore_return_values( Args&&... ) {}
64 
65 template <typename T>
66 T or_return_values( T&& t ) { return t; }
67 template <typename T, typename... Rest>
68 T or_return_values( T&& t, Rest&&... rest ) {
69  return t | or_return_values( std::forward<Rest>(rest)... );
70 }
71 
72 template<typename JP>
74  typedef size_t type;
76 };
77 
78 template<typename Key>
79 struct key_from_policy< key_matching<Key> > {
80  typedef Key type;
82 };
83 
84 template<typename Key>
85 struct key_from_policy< key_matching<Key&> > {
86  typedef const Key &type;
88 };
89 
90 template<typename Device, typename Key>
92  Device my_device;
94 public:
95  // TODO: investigate why default constructor is required
97  streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
98  Key key() const { return my_key; }
99  const Device& device() const { return my_device; }
100 };
101 
102 // --------- Kernel argument helpers --------- //
103 template <typename T>
106 };
107 
108 template <int N1, int N2>
109 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
111 };
112 
113 template <int N1, int N2>
114 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
116 };
117 
118 template <typename T>
119 struct is_port_ref {
121 };
122 
123 template <typename ...Args1>
125 
126 template <typename A1, typename ...Args1>
127 struct convert_and_call_impl<A1, Args1...> {
128  static const size_t my_delta = 1; // Index 0 contains device
129 
130  template <typename F, typename Tuple, typename ...Args2>
131  static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
132  convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
133  }
134  template <typename F, typename Tuple, typename ...Args2>
135  static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
136  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
137  }
138  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
139  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
140  convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
141  args2..., std::get<N1 + my_delta>(t));
142  }
143  template <typename F, typename Tuple, int N, typename ...Args2>
144  static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
145  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
146  }
147 
148  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
149  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
150  doit_impl(x, f, t, fn(), args1..., args2...);
151  }
152  template <typename F, typename Tuple, int N, typename ...Args2>
153  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
154  doit_impl(x, f, t, fn(), args1..., args2...);
155  }
156 };
157 
158 template <>
160  template <typename F, typename Tuple, typename ...Args2>
161  static void doit(F& f, Tuple&, Args2&... args2) {
162  f(args2...);
163  }
164 };
165 // ------------------------------------------- //
166 
167 template<typename JP, typename StreamFactory, typename... Ports>
169  // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
170  template <typename T>
171  struct async_msg_type {
172  typedef typename StreamFactory::template async_msg_type<T> type;
173  };
174 
179 
180  // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
182 };
183 
184 // Default empty implementation
185 template<typename StreamFactory, typename KernelInputTuple, typename = void>
187  typedef typename StreamFactory::device_type device_type;
188  typedef typename StreamFactory::kernel_type kernel_type;
189  typedef KernelInputTuple kernel_input_tuple;
190 protected:
191  template <typename ...Args>
192  void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
193  factory.send_kernel( device, kernel, args... );
194  }
195 };
196 
197 // Implementation for StreamFactory supporting range
198 template<typename StreamFactory, typename KernelInputTuple>
199 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
200  typedef typename StreamFactory::device_type device_type;
201  typedef typename StreamFactory::kernel_type kernel_type;
202  typedef KernelInputTuple kernel_input_tuple;
203 
204  typedef typename StreamFactory::range_type range_type;
205 
206  // Container for randge. It can contain either port references or real range.
207  struct range_wrapper {
208  virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
209  virtual range_wrapper *clone() const = 0;
210  virtual ~range_wrapper() {}
211  };
212 
213  struct range_value : public range_wrapper {
214  range_value( const range_type& value ) : my_value(value) {}
215 
216  range_value( range_type&& value ) : my_value(std::move(value)) {}
217 
219  return my_value;
220  }
221 
222  range_wrapper *clone() const __TBB_override {
223  return new range_value(my_value);
224  }
225  private:
227  };
228 
229  template <int N>
230  struct range_mapper : public range_wrapper {
232 
234  // "+1" since get<0>(ip) is StreamFactory::device.
235  return get<N + 1>(ip).data(false);
236  }
237 
238  range_wrapper *clone() const __TBB_override {
239  return new range_mapper<N>;
240  }
241  };
242 
243 protected:
244  template <typename ...Args>
245  void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
246  __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
247  factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
248  }
249 
250 public:
251  kernel_executor_helper() : my_range_wrapper(NULL) {}
252 
253  kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
254 
255  kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
256  // Set moving holder mappers to NULL to prevent double deallocation
257  executor.my_range_wrapper = NULL;
258  }
259 
261  if (my_range_wrapper) delete my_range_wrapper;
262  }
263 
264  void set_range(const range_type& work_size) {
265  my_range_wrapper = new range_value(work_size);
266  }
267 
268  void set_range(range_type&& work_size) {
269  my_range_wrapper = new range_value(std::move(work_size));
270  }
271 
272  template <int N>
274  my_range_wrapper = new range_mapper<N>;
275  }
276 
277  template <int N>
279  my_range_wrapper = new range_mapper<N>;
280  }
281 
282 private:
283  range_wrapper* my_range_wrapper;
284 };
285 
286 } // internal
287 
288 /*
289 /---------------------------------------- streaming_node ------------------------------------\
290 | |
291 | /--------------\ /----------------------\ /-----------\ /----------------------\ |
292 | | | | (device_with_key) O---O | | | |
293 | | | | | | | | | |
294 O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
295 | | | | (multifunction_node) | | | | (multifunction_node) | |
296 O---O | | O---O | | O---O
297 | \--------------/ \----------------------/ \-----------/ \----------------------/ |
298 | |
299 \--------------------------------------------------------------------------------------------/
300 */
301 template<typename... Args>
303 
304 template<typename... Ports, typename JP, typename StreamFactory>
306 streaming_node< tuple<Ports...>, JP, StreamFactory >
307  : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
308  typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
309  , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
310 {
311  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
312  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
314 protected:
315  typedef typename StreamFactory::device_type device_type;
316  typedef typename StreamFactory::kernel_type kernel_type;
317 private:
319  typedef composite_node<input_tuple, output_tuple> base_type;
320  static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
321  static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
322 
325 
326  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
327  typedef typename indexer_node_type::output_type indexer_node_output_type;
328  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
329  typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
330  typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
331 
332  template <int... S>
333  typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
334  return std::tie( internal::input_port<S>( my_indexer_node )... );
335  }
336 
337  template <int... S>
338  typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
339  return std::tie( internal::output_port<S>( my_kernel_node )... );
340  }
341 
342  typename base_type::input_ports_type get_input_ports() {
343  return get_input_ports( input_sequence() );
344  }
345 
346  typename base_type::output_ports_type get_output_ports() {
347  return get_output_ports( output_sequence() );
348  }
349 
350  template <int N>
352  make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
353  return 0;
354  }
355 
356  template <int... S>
358  make_edge( my_indexer_node, my_device_selector_node );
359  make_edge( my_device_selector_node, my_join_node );
360  internal::ignore_return_values( make_Nth_edge<S + 1>()... );
361  make_edge( my_join_node, my_kernel_node );
362  }
363 
364  void make_edges() {
365  make_edges( input_sequence() );
366  }
367 
368  class device_selector_base {
369  public:
370  virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
371  virtual device_selector_base *clone( streaming_node &n ) const = 0;
373  };
374 
375  template <typename UserFunctor>
376  class device_selector : public device_selector_base, tbb::internal::no_assign {
377  public:
378  device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
379  : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
380  , my_user_functor( uf ), my_node(n), my_factory( f )
381  {
382  my_port_epoches.fill( 0 );
383  }
384 
385  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
386  (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
388  || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
389  }
390 
391  device_selector_base *clone( streaming_node &n ) const __TBB_override {
392  return new device_selector( my_user_functor, n, my_factory );
393  }
394  private:
395  typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
396  typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
397 
398  template <int... S>
400  dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
401  return dispatch;
402  }
403 
404  template <typename T>
405  key_type get_key( std::false_type, const T &, size_t &epoch ) {
407  return epoch++;
408  }
409 
410  template <typename T>
411  key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
413  return key_from_message<key_type>( t );
414  }
415 
416  template <int N>
417  void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
418  typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
419  elem_type e = internal::cast_to<elem_type>( v );
420  device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
421  my_factory.send_data( device, e );
422  get<N + 1>( op ).try_put( e );
423  }
424 
425  template< typename DevicePort >
426  device_type get_device( key_type key, DevicePort& dp ) {
427  typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
428  if ( it == my_devices.end() ) {
429  device_type d = my_user_functor( my_factory );
430  std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
431  bool res = dp.try_put( device_with_key_type( d, key ) );
432  __TBB_ASSERT_EX( res, NULL );
433  my_node.notify_new_device( d );
434  }
435  epoch_desc &e = it->second;
436  device_type d = e.my_device;
437  if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
438  return d;
439  }
440 
441  struct epoch_desc {
442  epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
445  };
446 
448  std::array<size_t, NUM_INPUTS> my_port_epoches;
450  UserFunctor my_user_functor;
452  StreamFactory &my_factory;
453  };
454 
455  class device_selector_body {
456  public:
457  device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
458 
459  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
460  (*my_device_selector)(v, op);
461  }
462  private:
463  device_selector_base *my_device_selector;
464  };
465 
466  // TODO: investigate why copy-construction is disallowed
467  class args_storage_base : tbb::internal::no_copy {
468  public:
469  typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
470 
471  virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
472  virtual void send( device_type d ) = 0;
473  virtual args_storage_base *clone() const = 0;
474  virtual ~args_storage_base () {}
475 
476  protected:
477  args_storage_base( const kernel_type& kernel, StreamFactory &f )
478  : my_kernel( kernel ), my_factory( f )
479  {}
480 
481  args_storage_base( const args_storage_base &k )
482  : tbb::internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
483  {}
484 
486  StreamFactory &my_factory;
487  };
488 
489  template <typename... Args>
490  class args_storage : public args_storage_base {
492 
493  // ---------- Update events helpers ---------- //
494  template <int N>
495  bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
496  const auto& t = get<N + 1>( ip );
497  auto &port = get<N>( op );
498  return port.try_put( t );
499  }
500 
501  template <int... S>
503  return internal::or_return_values( do_try_put<S>( ip, op )... );
504  }
505 
506  // ------------------------------------------- //
507  class run_kernel_func : tbb::internal::no_assign {
508  public:
509  run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
510  : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
511 
512  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
513  // Allow the compiler to deduce types for function pointers automatically.
514  template <typename... FnArgs>
515  void operator()( FnArgs&... args ) {
516  internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
517  }
518  private:
519  struct kernel_func : tbb::internal::no_copy {
522  const args_storage& my_storage;
524 
525  kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
526  : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
527  {}
528 
529  template <typename... FnArgs>
530  void operator()( FnArgs&... args ) {
531  my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
532  }
533  } my_kernel_func;
534  };
535 
536  template<typename FinalizeFn>
537  class run_finalize_func : tbb::internal::no_assign {
538  public:
539  run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
540  : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
541 
542  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
543  // Allow the compiler to deduce types for function pointers automatically.
544  template <typename... FnArgs>
545  void operator()( FnArgs&... args ) {
546  internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
547  }
548  private:
550 
551  struct finalize_func : tbb::internal::no_assign {
552  StreamFactory &my_factory;
554  FinalizeFn my_fn;
555 
556  finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
557  : my_factory(factory), my_device(device), my_fn(fn) {}
558 
559  template <typename... FnArgs>
560  void operator()( FnArgs&... args ) {
561  my_factory.finalize( my_device, my_fn, args... );
562  }
563  } my_finalize_func;
564  };
565 
566  template<typename FinalizeFn>
567  static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
568  return run_finalize_func<FinalizeFn>( ip, factory, fn );
569  }
570 
571  class send_func : tbb::internal::no_assign {
572  public:
573  send_func( StreamFactory &factory, device_type d )
574  : my_factory(factory), my_device( d ) {}
575 
576  template <typename... FnArgs>
577  void operator()( FnArgs&... args ) {
578  my_factory.send_data( my_device, args... );
579  }
580  private:
581  StreamFactory &my_factory;
583  };
584 
585  public:
586  args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
587  : args_storage_base( kernel, f )
588  , my_args_pack( std::forward<Args>(args)... )
589  {}
590 
591  args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
592 
593  args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
594 
596  // Make const qualified args_pack (from non-const)
597  const args_pack_type& const_args_pack = my_args_pack;
598  // factory.enqure_kernel() gets
599  // - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
600  // - arguments (from my_args_pack) by const-reference via const_args_pack
601  tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
602 
603  if (! do_try_put( ip, op, input_sequence() ) ) {
604  graph& g = n.my_graph;
605  // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
606  g.increment_wait_count();
607 
608  // factory.finalize() gets
609  // - 'ip' tuple elements by reference, so 'ip' might be changed
610  // - arguments (from my_args_pack) by const-reference via const_args_pack
611  tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
612  g.decrement_wait_count();
613  }), const_args_pack );
614  }
615  }
616 
618  // factory.send() gets arguments by reference and updates these arguments with dependencies
619  // (it gets but usually ignores port_ref-s)
620  tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
621  }
622 
623  args_storage_base *clone() const __TBB_override {
624  // Create new args_storage with copying constructor.
625  return new args_storage<Args...>( *this );
626  }
627 
628  private:
629  typedef tbb::internal::stored_pack<Args...> args_pack_type;
631  };
632 
633  // Body for kernel_multifunction_node.
634  class kernel_body : tbb::internal::no_assign {
635  public:
636  kernel_body( const streaming_node &node ) : my_node( node ) {}
637 
639  __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
640  // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
641  my_node.my_args_storage->enqueue( ip, op, my_node );
642  }
643  private:
645  };
646 
648  struct wrap_to_async {
649  typedef T type; // Keep port_ref as it is
650  };
651 
652  template <typename T>
653  struct wrap_to_async<T, std::false_type> {
654  typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
655  };
656 
657  template <typename... Args>
658  args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
659  // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
660  return new args_storage<Args...>(storage, std::forward<Args>(args)...);
661  }
662 
664  my_args_storage->send( d );
665  }
666 
667  template <typename ...Args>
668  void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
669  this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
670  }
671 
672 public:
673  template <typename DeviceSelector>
674  streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
675  : base_type( g )
676  , my_indexer_node( g )
677  , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
678  , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
679  , my_join_node( g )
680  , my_kernel_node( g, serial, kernel_body( *this ) )
681  // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
682  , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
683  {
684  base_type::set_external_ports( get_input_ports(), get_output_ports() );
685  make_edges();
686  }
687 
689  : base_type( node.my_graph )
690  , my_indexer_node( node.my_indexer_node )
691  , my_device_selector( node.my_device_selector->clone( *this ) )
692  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
693  , my_join_node( node.my_join_node )
694  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
695  , my_args_storage( node.my_args_storage->clone() )
696  {
697  base_type::set_external_ports( get_input_ports(), get_output_ports() );
698  make_edges();
699  }
700 
702  : base_type( node.my_graph )
703  , my_indexer_node( std::move( node.my_indexer_node ) )
704  , my_device_selector( node.my_device_selector->clone(*this) )
705  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
706  , my_join_node( std::move( node.my_join_node ) )
707  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
708  , my_args_storage( node.my_args_storage )
709  {
710  base_type::set_external_ports( get_input_ports(), get_output_ports() );
711  make_edges();
712  // Set moving node mappers to NULL to prevent double deallocation.
713  node.my_args_storage = NULL;
714  }
715 
717  if ( my_args_storage ) delete my_args_storage;
718  if ( my_device_selector ) delete my_device_selector;
719  }
720 
721  template <typename... Args>
722  void set_args( Args&&... args ) {
723  // Copy the base class of args_storage and create new storage for "Args...".
724  args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
725  delete my_args_storage;
726  my_args_storage = new_args_storage;
727  }
728 
729 protected:
730  void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
731 
732 private:
734  device_selector_base *my_device_selector;
736  join_node<kernel_input_tuple, JP> my_join_node;
738 
739  args_storage_base *my_args_storage;
740 };
741 
742 #endif // __TBB_PREVIEW_STREAMING_NODE
743 #endif // __TBB_flow_graph_streaming_H
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::set_range
void set_range(port_ref_impl< N, N >)
Definition: _flow_graph_streaming_node.h:273
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::my_factory
StreamFactory & my_factory
Definition: _flow_graph_streaming_node.h:452
internal::streaming_device_with_key::key
Key key() const
Definition: _flow_graph_streaming_node.h:98
__TBB_DEPRECATED
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
streaming_node< tuple< Ports... >, JP, StreamFactory >::wrap_to_async< T, std::false_type >::type
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
Definition: _flow_graph_streaming_node.h:654
internal::ignore_return_values
void ignore_return_values(Args &&...)
Definition: _flow_graph_streaming_node.h:63
streaming_node< tuple< Ports... >, JP, StreamFactory >::enqueue_kernel
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
Definition: _flow_graph_streaming_node.h:668
internal::key_from_policy< key_matching< Key & > >::type
const typedef Key & type
Definition: _flow_graph_streaming_node.h:86
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::finalize_func::operator()
void operator()(FnArgs &... args)
Definition: _flow_graph_streaming_node.h:560
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::run_kernel_func
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
Definition: _flow_graph_streaming_node.h:509
streaming_node
class __TBB_DEPRECATED streaming_node
Definition: _flow_graph_streaming_node.h:302
internal
Definition: _flow_graph_async_msg_impl.h:24
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::make_run_finalize_func
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
Definition: _flow_graph_streaming_node.h:567
void
void
Definition: ittnotify_static.h:91
internal::key_from_policy::is_key_matching
std::false_type is_key_matching
Definition: _flow_graph_streaming_node.h:75
internal::num_arguments::value
static const int value
Definition: _flow_graph_streaming_node.h:49
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::args_pack_type
tbb::internal::stored_pack< Args... > args_pack_type
Definition: _flow_graph_streaming_node.h:629
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::send_and_put_impl
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
Definition: _flow_graph_streaming_node.h:417
fn
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s const char ITT_FORMAT s __itt_frame ITT_FORMAT p __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu __itt_counter unsigned long long ITT_FORMAT lu __itt_counter __itt_clock_domain unsigned long long void ITT_FORMAT p const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
Definition: ittnotify_static.h:311
streaming_node< tuple< Ports... >, JP, StreamFactory >::kernel_body::my_node
const streaming_node & my_node
Definition: _flow_graph_streaming_node.h:644
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::enqueue_kernel_impl
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
Definition: _flow_graph_streaming_node.h:245
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector_body::operator()
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
Definition: _flow_graph_streaming_node.h:459
internal::graph_policy_namespace::key_matching
field of type K being used for matching.
Definition: _flow_graph_body_impl.h:77
tbb::flow::interface11::reset_flags
reset_flags
Definition: _flow_graph_impl.h:158
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_mapper::range_mapper
range_mapper()
Definition: _flow_graph_streaming_node.h:231
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::clone
device_selector_base * clone(streaming_node &n) const __TBB_override
Definition: _flow_graph_streaming_node.h:391
tbb::internal::no_assign
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
streaming_node< tuple< Ports... >, JP, StreamFactory >::input_sequence
internal::make_sequence< NUM_INPUTS >::type input_sequence
Definition: _flow_graph_streaming_node.h:323
streaming_node< tuple< Ports... >, JP, StreamFactory >::make_edges
void make_edges(internal::sequence< S... >)
Definition: _flow_graph_streaming_node.h:357
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::set_range
void set_range(range_type &&work_size)
Definition: _flow_graph_streaming_node.h:268
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::run_finalize_func
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
Definition: _flow_graph_streaming_node.h:539
streaming_node< tuple< Ports... >, JP, StreamFactory >::kernel_body::operator()
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
Definition: _flow_graph_streaming_node.h:638
internal::or_return_values
T or_return_values(T &&t)
Definition: _flow_graph_streaming_node.h:66
streaming_node< tuple< Ports... >, JP, StreamFactory >::my_device_selector
device_selector_base * my_device_selector
Definition: _flow_graph_streaming_node.h:734
internal::do_try_put
task * do_try_put(const T &v, void *p)
Definition: _flow_graph_indexer_impl.h:34
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::set_range
void set_range(port_ref_impl< N, N >(*)())
Definition: _flow_graph_streaming_node.h:278
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::kernel_func::my_storage
const args_storage & my_storage
Definition: _flow_graph_streaming_node.h:522
tbb
The graph class.
Definition: serial/tbb/parallel_for.h:46
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_type
StreamFactory::device_type device_type
Definition: _flow_graph_streaming_node.h:315
streaming_node< tuple< Ports... >, JP, StreamFactory >::notify_new_device
void notify_new_device(device_type d)
Definition: _flow_graph_streaming_node.h:663
internal::convert_and_call_impl< A1, Args1... >::doit_impl
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:149
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::finalize_func::my_factory
StreamFactory & my_factory
Definition: _flow_graph_streaming_node.h:552
internal::streaming_device_with_key::my_key
std::decay< Key >::type my_key
Definition: _flow_graph_streaming_node.h:93
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::epoch_desc::epoch_desc
epoch_desc(device_type d)
Definition: _flow_graph_streaming_node.h:442
internal::streaming_device_with_key
Definition: _flow_graph_streaming_node.h:91
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_with_key_type
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
Definition: _flow_graph_streaming_node.h:318
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::send_func::my_factory
StreamFactory & my_factory
Definition: _flow_graph_streaming_node.h:581
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::kernel_executor_helper
kernel_executor_helper(kernel_executor_helper &&executor)
Definition: _flow_graph_streaming_node.h:255
streaming_node< tuple< Ports... >, JP, StreamFactory >::indexer_node_output_type
indexer_node_type::output_type indexer_node_output_type
Definition: _flow_graph_streaming_node.h:327
internal::streaming_node_traits::input_tuple
tuple< typename async_msg_type< Ports >::type... > input_tuple
Definition: _flow_graph_streaming_node.h:175
internal::convert_and_call_impl< A1, Args1... >::doit_impl
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:144
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::do_try_put
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
Definition: _flow_graph_streaming_node.h:502
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::get_device
device_type get_device(key_type key, DevicePort &dp)
Definition: _flow_graph_streaming_node.h:426
d
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
Definition: ittnotify_static.h:109
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::kernel_type
StreamFactory::kernel_type kernel_type
Definition: _flow_graph_streaming_node.h:201
internal::convert_and_call_impl< A1, Args1... >::doit
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:131
internal::streaming_node_traits::async_msg_type
Definition: _flow_graph_streaming_node.h:171
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::send
void send(device_type d) __TBB_override
Definition: _flow_graph_streaming_node.h:617
internal::sequence
Definition: _flow_graph_types_impl.h:329
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::kernel_executor_helper
kernel_executor_helper()
Definition: _flow_graph_streaming_node.h:251
internal::port_ref_impl::size
static const int size
Definition: _flow_graph_streaming_node.h:33
streaming_node< tuple< Ports... >, JP, StreamFactory >::reset_node
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
Definition: _flow_graph_streaming_node.h:730
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::args_storage
args_storage(const args_storage_base &k, Args &&... args)
Definition: _flow_graph_streaming_node.h:593
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::~kernel_executor_helper
~kernel_executor_helper()
Definition: _flow_graph_streaming_node.h:260
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector_body::device_selector_body
device_selector_body(device_selector_base *d)
Definition: _flow_graph_streaming_node.h:457
streaming_node< tuple< Ports... >, JP, StreamFactory >::my_indexer_node
indexer_node_type my_indexer_node
Definition: _flow_graph_streaming_node.h:733
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::kernel_executor_helper
kernel_executor_helper(const kernel_executor_helper &executor)
Definition: _flow_graph_streaming_node.h:253
streaming_node< tuple< Ports... >, JP, StreamFactory >::set_args
void set_args(Args &&... args)
Definition: _flow_graph_streaming_node.h:722
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::my_node
streaming_node & my_node
Definition: _flow_graph_streaming_node.h:451
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_value::range_value
range_value(const range_type &value)
Definition: _flow_graph_streaming_node.h:214
streaming_node< tuple< Ports... >, JP, StreamFactory >::input_tuple
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple
Definition: _flow_graph_streaming_node.h:311
streaming_node< tuple< Ports... >, JP, StreamFactory >::make_edges
void make_edges()
Definition: _flow_graph_streaming_node.h:364
internal::port_ref_impl
Definition: _flow_graph_streaming_node.h:31
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector_body::my_device_selector
device_selector_base * my_device_selector
Definition: _flow_graph_streaming_node.h:463
internal::kernel_executor_helper::device_type
StreamFactory::device_type device_type
Definition: _flow_graph_streaming_node.h:187
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::my_devices
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
Definition: _flow_graph_streaming_node.h:447
internal::streaming_device_with_key::streaming_device_with_key
streaming_device_with_key(const Device &d, Key k)
Definition: _flow_graph_streaming_node.h:97
streaming_node< tuple< Ports... >, JP, StreamFactory >::kernel_type
StreamFactory::kernel_type kernel_type
Definition: _flow_graph_streaming_node.h:316
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::create_dispatch_funcs
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
Definition: _flow_graph_streaming_node.h:399
streaming_node< tuple< Ports... >, JP, StreamFactory >::streaming_node
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
Definition: _flow_graph_streaming_node.h:674
tbb::flow::key_from_message
K key_from_message(const T &t)
Definition: flow_graph.h:720
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::my_args_pack
args_pack_type my_args_pack
Definition: _flow_graph_streaming_node.h:630
__TBB_STATIC_ASSERT
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_mapper::get_range
range_type get_range(const kernel_input_tuple &ip) const __TBB_override
Definition: _flow_graph_streaming_node.h:233
streaming_node< tuple< Ports... >, JP, StreamFactory >::base_type
composite_node< input_tuple, output_tuple > base_type
Definition: _flow_graph_streaming_node.h:319
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_value::range_value
range_value(range_type &&value)
Definition: _flow_graph_streaming_node.h:216
tbb::move
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_value::get_range
range_type get_range(const kernel_input_tuple &) const __TBB_override
Definition: _flow_graph_streaming_node.h:218
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage_base::my_factory
StreamFactory & my_factory
Definition: _flow_graph_streaming_node.h:486
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::operator()
void operator()(FnArgs &... args)
Definition: _flow_graph_streaming_node.h:545
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::my_dispatch_funcs
dispatch_funcs_type my_dispatch_funcs
Definition: _flow_graph_streaming_node.h:449
key
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
Definition: ittnotify_static.h:198
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::send_func::send_func
send_func(StreamFactory &factory, device_type d)
Definition: _flow_graph_streaming_node.h:573
internal::key_from_policy< key_matching< Key & > >::is_key_matching
std::true_type is_key_matching
Definition: _flow_graph_streaming_node.h:87
internal::streaming_device_with_key::my_device
Device my_device
Definition: _flow_graph_streaming_node.h:92
internal::key_from_policy< key_matching< Key > >::is_key_matching
std::true_type is_key_matching
Definition: _flow_graph_streaming_node.h:81
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::finalize_func::my_fn
FinalizeFn my_fn
Definition: _flow_graph_streaming_node.h:554
internal::is_port_ref_impl< port_ref_impl< N1, N2 >(*)() >::type
std::true_type type
Definition: _flow_graph_streaming_node.h:115
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::get_key
key_type get_key(std::true_type, const T &t, size_t &)
Definition: _flow_graph_streaming_node.h:411
internal::streaming_node_traits::output_tuple
input_tuple output_tuple
Definition: _flow_graph_streaming_node.h:176
streaming_node< tuple< Ports... >, JP, StreamFactory >::get_output_ports
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
Definition: _flow_graph_streaming_node.h:338
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage_base::my_kernel
const kernel_type my_kernel
Definition: _flow_graph_streaming_node.h:485
tbb::flow::interface11::rf_reset_protocol
@ rf_reset_protocol
Definition: _flow_graph_impl.h:159
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_wrapper::~range_wrapper
virtual ~range_wrapper()
Definition: _flow_graph_streaming_node.h:210
streaming_node< tuple< Ports... >, JP, StreamFactory >::streaming_node
streaming_node(const streaming_node &node)
Definition: _flow_graph_streaming_node.h:688
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::kernel_func::my_device
device_type my_device
Definition: _flow_graph_streaming_node.h:523
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::kernel_input_tuple
KernelInputTuple kernel_input_tuple
Definition: _flow_graph_streaming_node.h:202
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::set_range
void set_range(const range_type &work_size)
Definition: _flow_graph_streaming_node.h:264
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage_base::~args_storage_base
virtual ~args_storage_base()
Definition: _flow_graph_streaming_node.h:474
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::kernel_func::my_ip
kernel_input_tuple & my_ip
Definition: _flow_graph_streaming_node.h:520
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::my_ip
kernel_input_tuple & my_ip
Definition: _flow_graph_streaming_node.h:549
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage_base::args_storage_base
args_storage_base(const args_storage_base &k)
Definition: _flow_graph_streaming_node.h:481
streaming_node< tuple< Ports... >, JP, StreamFactory >::kernel_multifunction_node
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
Definition: _flow_graph_streaming_node.h:330
streaming_node< tuple< Ports... >, JP, StreamFactory >::make_args_storage
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
Definition: _flow_graph_streaming_node.h:658
streaming_node< tuple< Ports... >, JP, StreamFactory >::get_output_ports
base_type::output_ports_type get_output_ports()
Definition: _flow_graph_streaming_node.h:346
internal::key_from_policy
Definition: _flow_graph_streaming_node.h:73
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector_base::~device_selector_base
virtual ~device_selector_base()
Definition: _flow_graph_streaming_node.h:372
streaming_node< tuple< Ports... >, JP, StreamFactory >::my_kernel_node
kernel_multifunction_node my_kernel_node
Definition: _flow_graph_streaming_node.h:737
tbb::flow::interface11::make_edge
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3830
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage_base::output_ports_type
kernel_multifunction_node::output_ports_type output_ports_type
Definition: _flow_graph_streaming_node.h:469
streaming_node< tuple< Ports... >, JP, StreamFactory >::streaming_node
streaming_node(streaming_node &&node)
Definition: _flow_graph_streaming_node.h:701
internal::streaming_device_with_key::device
const Device & device() const
Definition: _flow_graph_streaming_node.h:99
internal::is_port_ref
Definition: _flow_graph_streaming_node.h:119
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::send_func::operator()
void operator()(FnArgs &... args)
Definition: _flow_graph_streaming_node.h:577
internal::streaming_node_traits::async_msg_type::type
StreamFactory::template async_msg_type< T > type
Definition: _flow_graph_streaming_node.h:172
internal::is_port_ref_impl
Definition: _flow_graph_streaming_node.h:104
tbb::internal::no_copy
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::my_port_epoches
std::array< size_t, NUM_INPUTS > my_port_epoches
Definition: _flow_graph_streaming_node.h:448
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::clone
args_storage_base * clone() const __TBB_override
Definition: _flow_graph_streaming_node.h:623
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage_base::args_storage_base
args_storage_base(const kernel_type &kernel, StreamFactory &f)
Definition: _flow_graph_streaming_node.h:477
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::output_ports_type
args_storage_base::output_ports_type output_ports_type
Definition: _flow_graph_streaming_node.h:491
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::operator()
void operator()(FnArgs &... args)
Definition: _flow_graph_streaming_node.h:515
__TBB_override
#define __TBB_override
Definition: tbb_stddef.h:240
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::get_key
key_type get_key(std::false_type, const T &, size_t &epoch)
Definition: _flow_graph_streaming_node.h:405
streaming_node< tuple< Ports... >, JP, StreamFactory >::get_input_ports
base_type::input_ports_type get_input_ports()
Definition: _flow_graph_streaming_node.h:342
streaming_node< tuple< Ports... >, JP, StreamFactory >::make_Nth_edge
int make_Nth_edge()
Definition: _flow_graph_streaming_node.h:351
tbb::internal::true_type
bool_constant< true > true_type
Definition: tbb_stddef.h:489
internal::convert_and_call_impl<>::doit
static void doit(F &f, Tuple &, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:161
streaming_node< tuple< Ports... >, JP, StreamFactory >::output_tuple
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
Definition: _flow_graph_streaming_node.h:312
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_value::my_value
range_type my_value
Definition: _flow_graph_streaming_node.h:226
internal::make_sequence
Definition: _flow_graph_types_impl.h:332
internal::is_port_ref::type
is_port_ref_impl< typename tbb::internal::strip< T >::type >::type type
Definition: _flow_graph_streaming_node.h:120
streaming_node< tuple< Ports... >, JP, StreamFactory >::my_device_selector_node
device_selector_node my_device_selector_node
Definition: _flow_graph_streaming_node.h:735
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::args_storage
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
Definition: _flow_graph_streaming_node.h:586
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::my_range_wrapper
range_wrapper * my_range_wrapper
Definition: _flow_graph_streaming_node.h:283
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::my_user_functor
UserFunctor my_user_functor
Definition: _flow_graph_streaming_node.h:450
streaming_node< tuple< Ports... >, JP, StreamFactory >::key_type
internal::key_from_policy< JP >::type key_type
Definition: _flow_graph_streaming_node.h:313
streaming_node< tuple< Ports... >, JP, StreamFactory >::~streaming_node
~streaming_node()
Definition: _flow_graph_streaming_node.h:716
internal::is_port_ref_impl< port_ref_impl< N1, N2 > >::type
std::true_type type
Definition: _flow_graph_streaming_node.h:110
internal::convert_and_call_impl< A1, Args1... >::doit_impl
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:135
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::kernel_func::my_node
const streaming_node & my_node
Definition: _flow_graph_streaming_node.h:521
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::epoch_desc::my_device
device_type my_device
Definition: _flow_graph_streaming_node.h:443
port_ref
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
Definition: _flow_graph_streaming_node.h:41
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::finalize_func::finalize_func
finalize_func(StreamFactory &factory, device_type device, FinalizeFn fn)
Definition: _flow_graph_streaming_node.h:556
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector_node
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
Definition: _flow_graph_streaming_node.h:329
streaming_node< tuple< Ports... >, JP, StreamFactory >::my_join_node
join_node< kernel_input_tuple, JP > my_join_node
Definition: _flow_graph_streaming_node.h:736
value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
Definition: ittnotify_static.h:192
tbb::internal::is_same_type
Detects whether two given types are the same.
Definition: _template_helpers.h:61
internal::is_port_ref_impl::type
std::false_type type
Definition: _flow_graph_streaming_node.h:105
internal::kernel_executor_helper
Definition: _flow_graph_streaming_node.h:186
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::device_type
StreamFactory::device_type device_type
Definition: _flow_graph_streaming_node.h:200
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::device_selector
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
Definition: _flow_graph_streaming_node.h:378
internal::num_arguments
Definition: _flow_graph_streaming_node.h:48
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::args_storage
args_storage(const args_storage &k)
Definition: _flow_graph_streaming_node.h:591
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_value::clone
range_wrapper * clone() const __TBB_override
Definition: _flow_graph_streaming_node.h:222
internal::key_from_policy< key_matching< Key > >::type
Key type
Definition: _flow_graph_streaming_node.h:80
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::epoch_desc::my_request_number
size_t my_request_number
Definition: _flow_graph_streaming_node.h:444
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::operator()
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
Definition: _flow_graph_streaming_node.h:385
tbb::flow::serial
@ serial
Definition: flow_graph.h:105
internal::streaming_node_traits::indexer_node_type
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
Definition: _flow_graph_streaming_node.h:181
tbb::internal::false_type
bool_constant< false > false_type
Definition: tbb_stddef.h:490
streaming_node< tuple< Ports... >, JP, StreamFactory >::get_input_ports
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
Definition: _flow_graph_streaming_node.h:333
internal::kernel_executor_helper::kernel_type
StreamFactory::kernel_type kernel_type
Definition: _flow_graph_streaming_node.h:188
streaming_node< tuple< Ports... >, JP, StreamFactory >::my_args_storage
args_storage_base * my_args_storage
Definition: _flow_graph_streaming_node.h:739
streaming_node< tuple< Ports... >, JP, StreamFactory >::indexer_node_type
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
Definition: _flow_graph_streaming_node.h:326
internal::streaming_device_with_key::streaming_device_with_key
streaming_device_with_key()
Definition: _flow_graph_streaming_node.h:96
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::kernel_func::operator()
void operator()(FnArgs &... args)
Definition: _flow_graph_streaming_node.h:530
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_type
StreamFactory::range_type range_type
Definition: _flow_graph_streaming_node.h:204
internal::streaming_node_traits
Definition: _flow_graph_streaming_node.h:168
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::enqueue
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
Definition: _flow_graph_streaming_node.h:595
streaming_node< tuple< Ports... >, JP, StreamFactory >::device_selector::dispatch_funcs_type
std::array< send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type
Definition: _flow_graph_streaming_node.h:396
internal::convert_and_call_impl< A1, Args1... >::doit_impl
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:153
streaming_node< tuple< Ports... >, JP, StreamFactory >::output_sequence
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
Definition: _flow_graph_streaming_node.h:324
internal::convert_and_call_impl
Definition: _flow_graph_streaming_node.h:124
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_kernel_func::kernel_func::kernel_func
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
Definition: _flow_graph_streaming_node.h:525
S
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
Definition: ittnotify_static.h:212
internal::convert_and_call_impl< A1, Args1... >::doit_impl
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
Definition: _flow_graph_streaming_node.h:139
internal::kernel_executor_helper< StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type >::range_mapper::clone
range_wrapper * clone() const __TBB_override
Definition: _flow_graph_streaming_node.h:238
streaming_node< tuple< Ports... >, JP, StreamFactory >::wrap_to_async::type
T type
Definition: _flow_graph_streaming_node.h:649
internal::kernel_executor_helper::enqueue_kernel_impl
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
Definition: _flow_graph_streaming_node.h:192
streaming_node< tuple< Ports... >, JP, StreamFactory >::kernel_input_tuple
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
Definition: _flow_graph_streaming_node.h:328
type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
Definition: ittnotify_static.h:198
internal::key_from_policy::type
size_t type
Definition: _flow_graph_streaming_node.h:74
streaming_node< tuple< Ports... >, JP, StreamFactory >::kernel_body::kernel_body
kernel_body(const streaming_node &node)
Definition: _flow_graph_streaming_node.h:636
__TBB_ASSERT_EX
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::send_func::my_device
device_type my_device
Definition: _flow_graph_streaming_node.h:582
internal::kernel_executor_helper::kernel_input_tuple
KernelInputTuple kernel_input_tuple
Definition: _flow_graph_streaming_node.h:189
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::do_try_put
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
Definition: _flow_graph_streaming_node.h:495
streaming_node< tuple< Ports... >, JP, StreamFactory >::args_storage::run_finalize_func::finalize_func::my_device
device_type my_device
Definition: _flow_graph_streaming_node.h:553
internal::streaming_node_traits::kernel_input_tuple
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
Definition: _flow_graph_streaming_node.h:178

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.