17 #ifndef __TBB_flow_graph_H 18 #define __TBB_flow_graph_H 34 #if __TBB_PREVIEW_ASYNC_MSG 39 #if __TBB_PREVIEW_STREAMING_NODE 42 #include <unordered_map> 43 #include <type_traits> 44 #endif // __TBB_PREVIEW_STREAMING_NODE 46 #if TBB_DEPRECATED_FLOW_ENQUEUE 47 #define FLOW_SPAWN(a) tbb::task::enqueue((a)) 49 #define FLOW_SPAWN(a) tbb::task::spawn((a)) 53 #if __TBB_CPP11_TUPLE_PRESENT 58 using std::tuple_size;
59 using std::tuple_element;
64 #include "compat/tuple" 86 namespace interface10 {
99 namespace interface11 {
102 namespace interface10 {
113 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 117 class edge_container {
120 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
122 void add_edge(C &
s) {
123 built_edges.push_back(&
s);
126 void delete_edge(C &
s) {
127 for (
typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
135 void copy_edges(edge_list_type &v) {
139 size_t edge_count() {
140 return (
size_t)(built_edges.size());
149 template<
typename S >
void sender_extract(
S &
s);
150 template<
typename R >
void receiver_extract(R &r);
153 edge_list_type built_edges;
168 namespace interface10 {
173 if (right == NULL)
return left;
175 if (left == NULL)
return right;
186 #if __TBB_PREVIEW_ASYNC_MSG 194 template<
typename T,
typename =
void >
202 return static_cast<const void*>(&t);
206 return static_cast<void*>(&t);
210 return *static_cast<const T*>(
p);
214 return *static_cast<T*>(
p);
235 template<
typename T >
303 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 304 typedef internal::edge_container<successor_type> built_successors_type;
306 typedef built_successors_type::edge_list_type successor_list_type;
307 virtual built_successors_type &built_successors() = 0;
308 virtual void internal_add_built_successor(
successor_type & ) = 0;
309 virtual void internal_delete_built_successor(
successor_type & ) = 0;
310 virtual void copy_successors( successor_list_type &) = 0;
311 virtual size_t successor_count() = 0;
315 template<
typename X >
321 template<
typename X >
337 #if __TBB_PREVIEW_OPENCL_NODE 351 if (!res)
return false;
366 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 367 typedef internal::edge_container<predecessor_type> built_predecessors_type;
368 typedef built_predecessors_type::edge_list_type predecessor_list_type;
369 virtual built_predecessors_type &built_predecessors() = 0;
372 virtual void copy_predecessors( predecessor_list_type & ) = 0;
373 virtual size_t predecessor_count() = 0;
396 template<
typename T >
417 __TBB_ASSERT(
false,
"async_msg interface does not support 'pull' protocol in try_get()");
427 __TBB_ASSERT(
false,
"async_msg interface does not support 'pull' protocol in try_reserve()");
433 template<
typename T >
462 #else // __TBB_PREVIEW_ASYNC_MSG 465 template<
typename T >
496 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 497 typedef typename internal::edge_container<successor_type> built_successors_type;
499 typedef typename built_successors_type::edge_list_type successor_list_type;
500 virtual built_successors_type &built_successors() = 0;
502 virtual void internal_delete_built_successor(
successor_type & ) = 0;
503 virtual void copy_successors( successor_list_type &) = 0;
504 virtual size_t successor_count() = 0;
509 template<
typename T >
524 if (!res)
return false;
545 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 546 typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
547 typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
548 virtual built_predecessors_type &built_predecessors() = 0;
551 virtual void copy_predecessors( predecessor_list_type & ) = 0;
552 virtual size_t predecessor_count() = 0;
562 #if __TBB_PREVIEW_OPENCL_NODE 567 #endif // __TBB_PREVIEW_ASYNC_MSG 612 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 613 typedef internal::edge_container<predecessor_type> built_predecessors_type;
614 typedef built_predecessors_type::edge_list_type predecessor_list_type;
615 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
619 my_built_predecessors.add_edge(
s );
624 my_built_predecessors.delete_edge(
s);
627 void copy_predecessors( predecessor_list_type &v)
__TBB_override {
629 my_built_predecessors.copy_edges(v);
634 return my_built_predecessors.edge_count();
656 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 659 built_predecessors_type my_built_predecessors;
673 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 674 my_built_predecessors.clear();
691 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 692 template <
typename K,
typename T>
709 namespace interface10 {
714 #if __TBB_PREVIEW_ASYNC_MSG 719 template <
typename C,
typename N>
722 if (
begin) current_node = my_graph->my_nodes;
726 template <
typename C,
typename N>
729 return *operator->();
732 template <
typename C,
typename N>
737 template <
typename C,
typename N>
739 if (current_node) current_node = current_node->next;
743 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
756 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
845 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 846 inline void graph::set_name(
const char *
name) {
852 my_graph.register_node(
this);
856 my_graph.remove_node(
this);
862 template <
typename Output >
874 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 880 template<
typename Body >
882 :
graph_node(g), my_active(is_active), init_my_active(is_active),
885 my_reserved(false), my_has_cached_item(false)
887 my_successors.set_owner(
this);
895 my_active(src.init_my_active),
896 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
897 my_reserved(false), my_has_cached_item(false)
899 my_successors.set_owner(
this);
907 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 916 my_successors.register_successor(r);
925 my_successors.remove_successor(r);
929 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 931 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
933 void internal_add_built_successor( successor_type &r)
__TBB_override {
935 my_successors.internal_add_built_successor(r);
938 void internal_delete_built_successor( successor_type &r)
__TBB_override {
940 my_successors.internal_delete_built_successor(r);
945 return my_successors.successor_count();
950 my_successors.copy_successors(v);
960 if ( my_has_cached_item ) {
962 my_has_cached_item =
false;
978 if ( my_has_cached_item ) {
991 __TBB_ASSERT( my_reserved && my_has_cached_item,
"releasing non-existent reservation" );
993 if(!my_successors.empty())
1001 __TBB_ASSERT( my_reserved && my_has_cached_item,
"consuming non-existent reservation" );
1002 my_reserved =
false;
1003 my_has_cached_item =
false;
1004 if ( !my_successors.empty() ) {
1014 if (!my_successors.empty())
1018 template<
typename Body>
1019 Body copy_function_object() {
1024 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1026 my_successors.built_successors().sender_extract(*
this);
1027 my_active = init_my_active;
1028 my_reserved =
false;
1029 if(my_has_cached_item) my_has_cached_item =
false;
1037 my_active = init_my_active;
1039 if(my_has_cached_item) {
1040 my_has_cached_item =
false;
1066 if ( my_reserved ) {
1069 if ( !my_has_cached_item ) {
1071 bool r = (*my_body)(my_cached_item);
1074 my_has_cached_item =
true;
1077 if ( my_has_cached_item ) {
1091 return (
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1106 if ( !try_reserve_apply_body(v) )
1119 template <
typename Input,
typename Output = continue_msg,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
1129 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1130 typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1131 typedef typename fOutput_type::successor_list_type successor_list_type;
1133 using input_impl_type::my_predecessors;
1139 template<
typename Body >
1157 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1163 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1165 my_predecessors.built_predecessors().receiver_extract(*
this);
1166 successors().built_successors().sender_extract(*
this);
1174 using input_impl_type::try_put_task;
1179 input_impl_type::reset_function_input(f);
1182 successors().clear();
1183 my_predecessors.clear();
1186 __TBB_ASSERT(this->my_predecessors.empty(),
"function_node predecessors not empty");
1193 template <
typename Input,
typename Output,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
1199 typename internal::wrap_tuple_elements<
1200 tbb::flow::tuple_size<Output>::value,
1201 internal::multifunction_output,
1217 using input_impl_type::my_predecessors;
1219 template<
typename Body>
1224 tbb::internal::fgt_multioutput_node_with_body<N>(
1225 tbb::internal::FLOW_MULTIFUNCTION_NODE,
1227 this->output_ports(), this->my_body
1233 tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1235 this->output_ports(), this->my_body );
1238 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1244 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1246 my_predecessors.built_predecessors().receiver_extract(*
this);
1247 base_type::extract();
1257 template<
typename TupleType,
typename Allocator=cache_aligned_allocator<TupleType> >
1264 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1265 typedef typename base_type::predecessor_type predecessor_type;
1266 typedef typename base_type::predecessor_list_type predecessor_list_type;
1268 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1279 tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1284 tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1288 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1312 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1317 void internal_add_built_predecessor(predecessor_type&)
__TBB_override {}
1320 void internal_delete_built_predecessor(predecessor_type&)
__TBB_override {}
1326 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predessors; }
1329 built_predecessors_type my_predessors;
1337 template <
typename Output,
typename Policy =
internal::Policy<
void> >
1349 template <
typename Body >
1360 template <
typename Body >
1362 graph &g,
int number_of_predecessors,
1374 internal::function_output<Output>() {
1380 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1386 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1388 input_impl_type::my_built_predecessors.receiver_extract(*
this);
1389 successors().built_successors().sender_extract(*
this);
1397 using input_impl_type::try_put_task;
1401 input_impl_type::reset_receiver(f);
1408 template <
typename T>
1415 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1421 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1422 internal::edge_container<predecessor_type> my_built_predecessors;
1442 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1460 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1463 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1465 void internal_add_built_successor(successor_type &r)
__TBB_override {
1466 my_successors.internal_add_built_successor(r);
1470 my_successors.internal_delete_built_successor(r);
1474 return my_successors.successor_count();
1478 my_successors.copy_successors(v);
1483 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
1487 my_built_predecessors.add_edge(
p);
1490 void internal_delete_built_predecessor( predecessor_type &
p)
__TBB_override {
1492 my_built_predecessors.delete_edge(
p);
1497 return my_built_predecessors.edge_count();
1502 my_built_predecessors.copy_edges(v);
1506 my_built_predecessors.receiver_extract(*
this);
1507 my_successors.built_successors().sender_extract(*
this);
1531 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1532 my_built_predecessors.clear();
1540 template <
typename T,
typename A=cache_aligned_allocator<T> >
1548 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1556 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1557 internal::edge_container<predecessor_type> my_built_predecessors;
1562 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1563 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1564 , add_blt_succ, del_blt_succ,
1565 add_blt_pred, del_blt_pred,
1566 blt_succ_cnt, blt_pred_cnt,
1567 blt_succ_cpy, blt_pred_cpy
1572 class buffer_operation :
public internal::aggregated_operation< buffer_operation > {
1575 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1580 predecessor_type *
p;
1582 successor_list_type *svec;
1583 predecessor_list_type *pvec;
1592 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1593 , ltask(NULL), elem(const_cast<T*>(&e))
1595 , elem(const_cast<T*>(&e)) , ltask(NULL)
1602 typedef internal::aggregating_functor<class_type, buffer_operation>
handler_type;
1604 internal::aggregator< handler_type, buffer_operation> my_aggregator;
1606 virtual void handle_operations(buffer_operation *op_list) {
1607 handle_operations_impl(op_list,
this);
1610 template<
typename derived_type>
1612 __TBB_ASSERT(static_cast<class_type*>(derived) ==
this,
"'this' is not a base class for derived");
1614 buffer_operation *tmp = NULL;
1615 bool try_forwarding =
false;
1618 op_list = op_list->next;
1619 switch (tmp->type) {
1620 case reg_succ: internal_reg_succ(tmp); try_forwarding =
true;
break;
1621 case rem_succ: internal_rem_succ(tmp);
break;
1622 case req_item: internal_pop(tmp);
break;
1623 case res_item: internal_reserve(tmp);
break;
1624 case rel_res: internal_release(tmp); try_forwarding =
true;
break;
1625 case con_res: internal_consume(tmp); try_forwarding =
true;
break;
1626 case put_item: try_forwarding = internal_push(tmp);
break;
1627 case try_fwd_task: internal_forward_task(tmp);
break;
1628 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1630 case add_blt_succ: internal_add_built_succ(tmp);
break;
1631 case del_blt_succ: internal_del_built_succ(tmp);
break;
1632 case add_blt_pred: internal_add_built_pred(tmp);
break;
1633 case del_blt_pred: internal_del_built_pred(tmp);
break;
1634 case blt_succ_cnt: internal_succ_cnt(tmp);
break;
1635 case blt_pred_cnt: internal_pred_cnt(tmp);
break;
1636 case blt_succ_cpy: internal_copy_succs(tmp);
break;
1637 case blt_pred_cpy: internal_copy_preds(tmp);
break;
1644 if (try_forwarding && !forwarder_busy) {
1646 forwarder_busy =
true;
1647 task *new_task =
new(task::allocate_additional_child_of(*(this->my_graph.root_task())))
internal::
1661 return op_data.ltask;
1665 task *ft = grab_forwarding_task(op_data);
1675 buffer_operation op_data(try_fwd_task);
1676 task *last_task = NULL;
1679 op_data.ltask = NULL;
1680 my_aggregator.execute(&op_data);
1691 virtual void internal_reg_succ(buffer_operation *op) {
1702 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1705 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1707 virtual void internal_add_built_succ(buffer_operation *op) {
1708 my_successors.internal_add_built_successor(*(op->r));
1712 virtual void internal_del_built_succ(buffer_operation *op) {
1713 my_successors.internal_delete_built_successor(*(op->r));
1719 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
1721 virtual void internal_add_built_pred(buffer_operation *op) {
1722 my_built_predecessors.add_edge(*(op->p));
1726 virtual void internal_del_built_pred(buffer_operation *op) {
1727 my_built_predecessors.delete_edge(*(op->p));
1731 virtual void internal_succ_cnt(buffer_operation *op) {
1732 op->cnt_val = my_successors.successor_count();
1736 virtual void internal_pred_cnt(buffer_operation *op) {
1737 op->cnt_val = my_built_predecessors.edge_count();
1741 virtual void internal_copy_succs(buffer_operation *op) {
1742 my_successors.copy_successors(*(op->svec));
1746 virtual void internal_copy_preds(buffer_operation *op) {
1747 my_built_predecessors.copy_edges(*(op->pvec));
1757 return this->my_item_valid(this->my_tail - 1);
1766 this->destroy_back();
1773 internal_forward_task_impl(op,
this);
1776 template<
typename derived_type>
1778 __TBB_ASSERT(static_cast<class_type*>(derived) ==
this,
"'this' is not a base class for derived");
1780 if (this->my_reserved || !derived->is_item_valid()) {
1782 this->forwarder_busy =
false;
1786 task * last_task = NULL;
1787 size_type counter = my_successors.
size();
1788 for (; counter > 0 && derived->is_item_valid(); --counter)
1789 derived->try_put_and_add_task(last_task);
1791 op->ltask = last_task;
1792 if (last_task && !counter) {
1797 forwarder_busy =
false;
1801 virtual bool internal_push(buffer_operation *op) {
1802 this->push_back(*(op->elem));
1808 if(this->pop_back(*(op->elem))) {
1817 if(this->reserve_front(*(op->elem))) {
1826 this->consume_front();
1831 this->release_front();
1838 forwarder_busy(false) {
1847 internal::reservable_item_buffer<T>(),
receiver<T>(),
sender<T>() {
1848 forwarder_busy =
false;
1855 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1868 buffer_operation op_data(reg_succ);
1870 my_aggregator.execute(&op_data);
1871 (
void)enqueue_forwarding_task(op_data);
1875 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1877 buffer_operation op_data(add_blt_succ);
1879 my_aggregator.execute(&op_data);
1883 buffer_operation op_data(del_blt_succ);
1885 my_aggregator.execute(&op_data);
1888 void internal_add_built_predecessor( predecessor_type &
p)
__TBB_override {
1889 buffer_operation op_data(add_blt_pred);
1891 my_aggregator.execute(&op_data);
1894 void internal_delete_built_predecessor( predecessor_type &
p)
__TBB_override {
1895 buffer_operation op_data(del_blt_pred);
1897 my_aggregator.execute(&op_data);
1901 buffer_operation op_data(blt_pred_cnt);
1902 my_aggregator.execute(&op_data);
1903 return op_data.cnt_val;
1907 buffer_operation op_data(blt_succ_cnt);
1908 my_aggregator.execute(&op_data);
1909 return op_data.cnt_val;
1912 void copy_predecessors( predecessor_list_type &v )
__TBB_override {
1913 buffer_operation op_data(blt_pred_cpy);
1915 my_aggregator.execute(&op_data);
1919 buffer_operation op_data(blt_succ_cpy);
1921 my_aggregator.execute(&op_data);
1930 r.remove_predecessor(*
this);
1931 buffer_operation op_data(rem_succ);
1933 my_aggregator.execute(&op_data);
1937 (
void)enqueue_forwarding_task(op_data);
1945 buffer_operation op_data(req_item);
1947 my_aggregator.execute(&op_data);
1948 (
void)enqueue_forwarding_task(op_data);
1958 my_aggregator.execute(&op_data);
1959 (
void)enqueue_forwarding_task(op_data);
1967 my_aggregator.execute(&op_data);
1968 (
void)enqueue_forwarding_task(op_data);
1976 my_aggregator.execute(&op_data);
1977 (
void)enqueue_forwarding_task(op_data);
1988 buffer_operation op_data(t, put_item);
1989 my_aggregator.execute(&op_data);
1990 task *ft = grab_forwarding_task(op_data);
2012 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2015 my_built_predecessors.receiver_extract(*
this);
2026 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2027 my_built_predecessors.clear();
2030 forwarder_busy =
false;
2035 template <
typename T,
typename A=cache_aligned_allocator<T> >
2047 return this->my_item_valid(this->my_head);
2054 graph& graph_ref = this->graph_reference();
2056 this->destroy_front();
2062 this->internal_forward_task_impl(op,
this);
2066 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2070 this->pop_front(*(op->elem));
2075 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2079 this->reserve_front(*(op->elem));
2084 this->consume_front();
2108 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2116 base_type::reset_node(f);
2121 template<
typename T,
typename A=cache_aligned_allocator<T> >
2133 template<
typename Sequencer >
2135 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(
s) ) {
2143 my_sequencer( src.my_sequencer->clone() ) {
2152 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2164 size_type tag = (*my_sequencer)(*(op->elem));
2165 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES 2166 if (tag < this->my_head) {
2173 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2175 if (this->
size(new_tail) > this->capacity()) {
2176 this->grow_my_array(this->
size(new_tail));
2178 this->my_tail = new_tail;
2187 template<
typename T,
typename Compare = std::less<T>,
typename A=cache_aligned_allocator<T> >
2211 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2221 base_type::reset_node(f);
2230 this->internal_forward_task_impl(op,
this);
2234 this->handle_operations_impl(op_list,
this);
2238 prio_push(*(op->elem));
2245 if ( this->my_reserved ==
true || this->my_tail == 0 ) {
2250 *(op->elem) = prio();
2258 if (this->my_reserved ==
true || this->my_tail == 0) {
2262 this->my_reserved =
true;
2263 *(op->elem) = prio();
2264 reserved_item = *(op->elem);
2271 this->my_reserved =
false;
2277 prio_push(reserved_item);
2278 this->my_reserved =
false;
2286 if (mark < this->my_tail) heapify();
2287 __TBB_ASSERT(mark == this->my_tail,
"mark unequal after heapify");
2291 return this->my_tail > 0;
2298 graph& graph_ref = this->graph_reference();
2312 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds before test");
2313 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2318 if ( this->my_tail >= this->my_array_size )
2319 this->grow_my_array( this->my_tail + 1 );
2320 (
void) this->place_item(this->my_tail, src);
2322 __TBB_ASSERT(mark < this->my_tail,
"mark outside bounds after push");
2329 if (prio_use_tail()) {
2332 this->destroy_item(this->my_tail-1);
2334 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds after pop");
2337 this->destroy_item(0);
2338 if(this->my_tail > 1) {
2340 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2341 this->move_item(0,this->my_tail - 1);
2344 if(mark > this->my_tail) --mark;
2345 if (this->my_tail > 1)
2347 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds after pop");
2351 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2356 if(this->my_tail == 0) {
2360 if (!mark) mark = 1;
2361 for (; mark<this->my_tail; ++mark) {
2364 this->fetch_item(mark,to_place);
2367 if (!compare(this->get_my_item(
parent), to_place))
2369 this->move_item(cur_pos,
parent);
2372 (
void) this->place_item(cur_pos, to_place);
2379 while (child < mark) {
2382 compare(this->get_my_item(child),
2383 this->get_my_item(child+1)))
2386 if (compare(this->get_my_item(target),
2387 this->get_my_item(cur_pos)))
2390 this->swap_items(cur_pos, target);
2392 child = (cur_pos<<1)+1;
2399 namespace interface11 {
2401 using namespace interface10;
2402 namespace internal = interface10::internal;
2408 template<
typename T,
typename DecrementType=continue_msg >
2409 class limiter_node :
public graph_node,
public receiver< T >,
public sender< T > {
2415 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2438 return ( my_count + my_tries < my_threshold && !my_predecessors.
empty() && !my_successors.
empty() );
2445 bool reserved =
false;
2448 if ( check_conditions() )
2465 if ( check_conditions() ) {
2467 task *rtask =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2483 if ( check_conditions() ) {
2485 task *rtask =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2503 if( delta > 0 &&
size_t(delta) > my_count )
2505 else if( delta < 0 &&
size_t(delta) > my_threshold - my_count )
2506 my_count = my_threshold;
2508 my_count -= size_t(delta);
2510 return forward_task();
2516 decrement.set_owner(
this);
2518 tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2527 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR 2529 "Deprecated interface of the limiter node can be used only in conjunction " 2530 "with continue_msg as the type of DecrementType template parameter." );
2531 #endif // Check for incompatible interface 2536 :
graph_node(g), my_threshold(threshold), my_count(0),
2538 my_tries(0), decrement(),
2539 init_decrement_predecessors(num_decrement_predecessors),
2540 decrement(num_decrement_predecessors)) {
2547 my_threshold(src.my_threshold), my_count(0),
2549 my_tries(0), decrement(),
2550 init_decrement_predecessors(src.init_decrement_predecessors),
2551 decrement(src.init_decrement_predecessors)) {
2555 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2564 bool was_empty = my_successors.
empty();
2567 if ( was_empty && !my_predecessors.
empty() && my_count + my_tries < my_threshold ) {
2569 task*
task =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2580 r.remove_predecessor(*
this);
2585 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2586 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
2587 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predecessors.built_predecessors(); }
2589 void internal_add_built_successor(successor_type &src)
__TBB_override {
2590 my_successors.internal_add_built_successor(src);
2593 void internal_delete_built_successor(successor_type &src)
__TBB_override {
2594 my_successors.internal_delete_built_successor(src);
2597 size_t successor_count()
__TBB_override {
return my_successors.successor_count(); }
2600 my_successors.copy_successors(v);
2603 void internal_add_built_predecessor(predecessor_type &src)
__TBB_override {
2604 my_predecessors.internal_add_built_predecessor(src);
2607 void internal_delete_built_predecessor(predecessor_type &src)
__TBB_override {
2608 my_predecessors.internal_delete_built_predecessor(src);
2611 size_t predecessor_count()
__TBB_override {
return my_predecessors.predecessor_count(); }
2613 void copy_predecessors(predecessor_list_type &v)
__TBB_override {
2614 my_predecessors.copy_predecessors(v);
2619 my_successors.built_successors().sender_extract(*
this);
2620 my_predecessors.built_predecessors().receiver_extract(*
this);
2621 decrement.built_predecessors().receiver_extract(decrement);
2628 my_predecessors.
add( src );
2630 task*
task =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2639 my_predecessors.
remove( src );
2652 if ( my_count + my_tries >= my_threshold )
2664 rtask =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2685 my_predecessors.
clear();
2690 my_predecessors.
reset( );
2692 decrement.reset_receiver(f);
2697 namespace interface10 {
2707 template<
typename OutputTuple,
typename JP=queueing>
class join_node;
2709 template<
typename OutputTuple>
2718 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2722 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2726 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2734 template<
typename OutputTuple>
2743 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2747 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2751 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2761 template<
typename OutputTuple,
typename K,
typename KHash>
2763 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2771 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 2775 template<
typename __TBB_B0,
typename __TBB_B1>
2777 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2780 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2>
2782 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2785 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3>
2787 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2790 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4>
2791 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2793 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2796 #if __TBB_VARIADIC_MAX >= 6 2797 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2799 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2801 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2805 #if __TBB_VARIADIC_MAX >= 7 2806 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2807 typename __TBB_B5,
typename __TBB_B6>
2808 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2810 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2814 #if __TBB_VARIADIC_MAX >= 8 2815 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2816 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7>
2817 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2818 __TBB_B7 b7) :
unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2819 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2823 #if __TBB_VARIADIC_MAX >= 9 2824 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2825 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7,
typename __TBB_B8>
2826 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2827 __TBB_B7 b7, __TBB_B8 b8) :
unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2828 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2832 #if __TBB_VARIADIC_MAX >= 10 2833 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2834 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7,
typename __TBB_B8,
typename __TBB_B9>
2835 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2836 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) :
unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2837 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2842 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2846 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2858 template<
typename T0,
typename T1=null_type,
typename T2=null_type,
typename T3=null_type,
2859 typename T4=null_type,
typename T5=null_type,
typename T6=null_type,
2863 template<
typename T0>
2866 static const int N = 1;
2872 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2877 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2881 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2888 template<
typename T0,
typename T1>
2891 static const int N = 2;
2897 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2902 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2906 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2913 template<
typename T0,
typename T1,
typename T2>
2916 static const int N = 3;
2922 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2927 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2931 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2938 template<
typename T0,
typename T1,
typename T2,
typename T3>
2941 static const int N = 4;
2947 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2952 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2963 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4>
2966 static const int N = 5;
2972 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2977 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2981 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2988 #if __TBB_VARIADIC_MAX >= 6 2989 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5>
2990 class indexer_node<T0, T1, T2, T3, T4, T5> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
2992 static const int N = 6;
2998 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3003 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3007 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3013 #endif //variadic max 6 3015 #if __TBB_VARIADIC_MAX >= 7 3016 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3018 class indexer_node<T0, T1, T2, T3, T4, T5, T6> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3020 static const int N = 7;
3026 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3031 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3035 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3041 #endif //variadic max 7 3043 #if __TBB_VARIADIC_MAX >= 8 3044 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3045 typename T6,
typename T7>
3046 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3048 static const int N = 8;
3054 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3059 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3063 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3069 #endif //variadic max 8 3071 #if __TBB_VARIADIC_MAX >= 9 3072 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3073 typename T6,
typename T7,
typename T8>
3074 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3076 static const int N = 9;
3082 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3087 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3091 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3097 #endif //variadic max 9 3099 #if __TBB_VARIADIC_MAX >= 10 3100 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3101 typename T6,
typename T7,
typename T8,
typename T9>
3104 static const int N = 10;
3106 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>
InputTuple;
3107 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3110 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3115 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3119 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3125 #endif //variadic max 10 3127 #if __TBB_PREVIEW_ASYNC_MSG 3130 template<
typename T >
3133 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3134 s.internal_add_built_predecessor(
p);
3135 p.internal_add_built_successor(
s);
3137 p.register_successor(
s );
3142 template<
typename T >
3147 #if __TBB_PREVIEW_ASYNC_MSG 3148 template<
typename TS,
typename TR,
3155 template<
typename T >
3160 template<
typename T >
3165 #endif // __TBB_PREVIEW_ASYNC_MSG 3167 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3169 template<
typename T,
typename V,
3170 typename =
typename T::output_ports_type,
typename =
typename V::input_ports_type >
3172 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3176 template<
typename T,
typename R,
3177 typename =
typename T::output_ports_type >
3179 make_edge(get<0>(output.output_ports()), input);
3183 template<
typename S,
typename V,
3184 typename =
typename V::input_ports_type >
3186 make_edge(output, get<0>(input.input_ports()));
3190 #if __TBB_PREVIEW_ASYNC_MSG 3193 template<
typename T >
3196 p.remove_successor(
s );
3197 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3199 p.internal_delete_built_successor(
s);
3200 s.internal_delete_built_predecessor(
p);
3206 template<
typename T >
3211 #if __TBB_PREVIEW_ASYNC_MSG 3212 template<
typename TS,
typename TR,
3219 template<
typename T >
3224 template<
typename T >
3228 #endif // __TBB_PREVIEW_ASYNC_MSG 3230 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3232 template<
typename T,
typename V,
3233 typename =
typename T::output_ports_type,
typename =
typename V::input_ports_type >
3235 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3239 template<
typename T,
typename R,
3240 typename =
typename T::output_ports_type >
3242 remove_edge(get<0>(output.output_ports()), input);
3245 template<
typename S,
typename V,
3246 typename =
typename V::input_ports_type >
3252 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3253 template<
typename C >
3254 template<
typename S >
3255 void internal::edge_container<C>::sender_extract(
S &
s ) {
3256 edge_list_type e = built_edges;
3257 for (
typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3262 template<
typename C >
3263 template<
typename R >
3264 void internal::edge_container<C>::receiver_extract( R &r ) {
3265 edge_list_type e = built_edges;
3266 for (
typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3273 template<
typename Body,
typename Node >
3275 return n.template copy_function_object<Body>();
3278 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3283 template<
typename... InputTypes,
typename... OutputTypes>
3294 static const size_t NUM_INPUTS =
sizeof...(InputTypes);
3295 static const size_t NUM_OUTPUTS =
sizeof...(OutputTypes);
3301 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3312 template<
typename T1,
typename T2>
3316 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3317 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3323 template<
typename... NodeTypes >
3326 template<
typename... NodeTypes >
3329 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3336 __TBB_ASSERT(my_input_ports,
"input ports not set, call set_external_ports to set input ports");
3337 return *my_input_ports;
3341 __TBB_ASSERT(my_output_ports,
"output ports not set, call set_external_ports to set output ports");
3342 return *my_output_ports;
3345 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3347 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3353 template<
typename... InputTypes>
3360 static const size_t NUM_INPUTS =
sizeof...(InputTypes);
3366 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3377 template<
typename T>
3381 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3386 template<
typename... NodeTypes >
3389 template<
typename... NodeTypes >
3392 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3399 __TBB_ASSERT(my_input_ports,
"input ports not set, call set_external_ports to set input ports");
3400 return *my_input_ports;
3403 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3405 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3412 template<
typename... OutputTypes>
3419 static const size_t NUM_OUTPUTS =
sizeof...(OutputTypes);
3425 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3436 template<
typename T>
3440 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3445 template<
typename... NodeTypes >
3448 template<
typename... NodeTypes >
3451 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3458 __TBB_ASSERT(my_output_ports,
"output ports not set, call set_external_ports to set output ports");
3459 return *my_output_ports;
3462 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3464 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3470 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES 3472 namespace internal {
3474 template<
typename Gateway>
3481 my_gateway = gateway;
3488 template<
typename Input,
typename Ports,
typename Gateway,
typename Body>
3498 my_body(v, *this->my_gateway);
3510 template <
typename Input,
typename Output,
3545 my_node->my_graph.reserve_wait();
3549 my_node->my_graph.release_wait();
3555 return my_node->try_put_impl(i);
3573 "Return status is inconsistent with the method operation." );
3575 while( !tasks.
empty() ) {
3579 return is_at_least_one_put_successful;
3583 template<
typename Body>
3591 tbb::internal::fgt_multioutput_node_with_body<1>(
3592 tbb::internal::FLOW_ASYNC_NODE,
3594 this->output_ports(), this->my_body
3599 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3600 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3602 tbb::internal::fgt_multioutput_node_with_body<1>( tbb::internal::FLOW_ASYNC_NODE,
3604 this->output_ports(), this->my_body );
3611 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3621 return internal::output_port<0>(*this).register_successor(r);
3626 return internal::output_port<0>(*this).remove_successor(r);
3629 template<
typename Body>
3633 mfn_body_type &body_ref = *this->my_body;
3635 return ab.get_body();
3638 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3639 typedef typename internal::edge_container<successor_type> built_successors_type;
3641 typedef typename built_successors_type::edge_list_type successor_list_type;
3643 return internal::output_port<0>(*this).built_successors();
3646 void internal_add_built_successor( successor_type &r )
__TBB_override {
3647 internal::output_port<0>(*this).internal_add_built_successor(r);
3650 void internal_delete_built_successor( successor_type &r )
__TBB_override {
3651 internal::output_port<0>(*this).internal_delete_built_successor(r);
3655 internal::output_port<0>(*this).copy_successors(l);
3659 return internal::output_port<0>(*this).successor_count();
3666 base_type::reset_node(f);
3670 #if __TBB_PREVIEW_STREAMING_NODE 3672 #endif // __TBB_PREVIEW_STREAMING_NODE 3677 namespace interface10a {
3679 using namespace interface10;
3680 namespace internal = interface10::internal;
3682 template<
typename T >
3689 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3723 bool ret =
s.try_put( my_buffer );
3732 task *rtask =
new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
3749 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3750 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
3751 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
3753 void internal_add_built_successor( successor_type &
s)
__TBB_override {
3755 my_successors.internal_add_built_successor(
s);
3758 void internal_delete_built_successor( successor_type &
s)
__TBB_override {
3760 my_successors.internal_delete_built_successor(
s);
3765 return my_successors.successor_count();
3770 my_successors.copy_successors(v);
3773 void internal_add_built_predecessor( predecessor_type &
p)
__TBB_override {
3775 my_built_predecessors.add_edge(
p);
3778 void internal_delete_built_predecessor( predecessor_type &
p)
__TBB_override {
3780 my_built_predecessors.delete_edge(
p);
3785 return my_built_predecessors.edge_count();
3788 void copy_predecessors( predecessor_list_type &v )
__TBB_override {
3790 my_built_predecessors.copy_edges(v);
3794 my_buffer_is_valid =
false;
3795 built_successors().sender_extract(*
this);
3796 built_predecessors().receiver_extract(*
this);
3803 if ( my_buffer_is_valid ) {
3823 return my_buffer_is_valid;
3828 my_buffer_is_valid =
false;
3838 return try_put_task_impl(v);
3843 my_buffer_is_valid =
true;
3857 o(owner),
s(succ) {};
3860 if (!
s.register_predecessor(o)) {
3861 o.register_successor(
s);
3872 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3873 internal::edge_container<predecessor_type> my_built_predecessors;
3880 my_buffer_is_valid =
false;
3882 my_successors.
clear();
3887 template<
typename T >
3910 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3922 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3932 using interface10::graph;
3933 using interface10::graph_node;
3934 using interface10::continue_msg;
3936 using interface10::source_node;
3937 using interface10::function_node;
3938 using interface10::multifunction_node;
3939 using interface10::split_node;
3941 using interface10::indexer_node;
3942 using interface10::internal::tagged_msg;
3945 using interface10::continue_node;
3946 using interface10a::overwrite_node;
3947 using interface10a::write_once_node;
3948 using interface10::broadcast_node;
3949 using interface10::buffer_node;
3950 using interface10::queue_node;
3951 using interface10::sequencer_node;
3952 using interface10::priority_queue_node;
3953 using interface11::limiter_node;
3954 using namespace interface10::internal::graph_policy_namespace;
3955 using interface10::join_node;
3961 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3962 using interface10::composite_node;
3964 using interface10::async_node;
3965 #if __TBB_PREVIEW_ASYNC_MSG 3966 using interface10::async_msg;
3968 #if __TBB_PREVIEW_STREAMING_NODE 3970 using interface10::streaming_node;
3971 #endif // __TBB_PREVIEW_STREAMING_NODE 3972 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES 3981 #undef __TBB_PFG_RESET_ARG 3984 #endif // __TBB_flow_graph_H internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
virtual void internal_release(buffer_operation *op)
internal::unfolded_indexer_node< InputTuple > unfolded_type
void add_visible_nodes(const NodeTypes &... n)
virtual ~untyped_receiver()
Destructor.
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
tuple< T0, T1, T2 > InputTuple
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
split_node(const split_node &other)
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
int my_initial_predecessor_count
~graph()
Destroys the graph.
receiver_gateway< output_type > gateway_type
output_ports_type & output_ports()
receiver< input_type > receiver_type
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
virtual bool try_consume()
Consumes the reserved item.
void add(untyped_sender &n)
Class for determining type of std::allocator<T>::value_type.
internal::unfolded_indexer_node< InputTuple > unfolded_type
tbb::flow::tuple< receiver< InputTypes > &... > input_ports_type
virtual bool try_release()
Releases the reserved item.
fOutput_type::successor_type successor_type
void reset_node(reset_flags) __TBB_override
void try_put_and_add_task(task *&last_task)
tbb::flow::tuple< sender< OutputTypes > &... > output_ports_type
async_body_base< Gateway > base_type
void reset_node(reset_flags f) __TBB_override
internal::unfolded_indexer_node< InputTuple > unfolded_type
void prepare_task_arena(bool reinit=false)
field of type K being used for matching.
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Forward declaration section.
void reset_node(reset_flags) __TBB_override
broadcast_cache_type & successors()
Implements a function node that supports Input -> Output.
receiver< TupleType > base_type
write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
void operator()(const Input &v, Ports &)
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
tbb::flow::tuple< receiver< InputTypes > &... > input_ports_type
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
const_iterator cbegin() const
start const iterator
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
internal::continue_input< Output, Policy > input_impl_type
bool try_put(const X &t)
Put an item to the receiver.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
void remove(untyped_sender &n)
friend class proxy_dependency_receiver
std::unique_ptr< input_ports_type > my_input_ports
unsigned int node_priority_t
static void alias_port(void *, PortsTuple &)
void set_external_ports(T &&output_ports_tuple)
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
internal::function_output< output_type > fOutput_type
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
static void fgt_end_body(void *)
internal::broadcast_cache< output_type > & successors() __TBB_override
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
try_put_functor(output_port_type &p, const Output &v)
void reset_node(reset_flags f) __TBB_override
write_once_node(graph &g)
Constructor.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
gateway_type * my_gateway
limiter_node(const limiter_node &src)
Copy constructor.
continue_node(const continue_node &src)
Copy constructor.
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
input_ports_type & input_ports()
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
indexer_node(const indexer_node &other)
virtual bool try_get(T &)
Request an item from the sender.
sender< output_type >::successor_type successor_type
internal::function_body< T, size_t > * my_sequencer
A cache of successors that are broadcast to.
static void fgt_node_with_body(string_index, void *, void *, void *)
void deactivate_graph(graph &g)
Forwards messages in arbitrary order.
internal::aggregating_functor< class_type, buffer_operation > handler_type
void try_put_and_add_task(task *&last_task)
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Breaks an infinite loop between the node reservation and register_successor call.
async_body_base(gateway_type *gateway)
The leaf for source_body.
internal::unfolded_indexer_node< InputTuple > unfolded_type
Forwards messages only if the threshold has not been reached.
source_node(const source_node &src)
Copy constructor.
base_type::size_type size_type
buffer_node< T, A > base_type
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
bool enqueue_forwarding_task(buffer_operation &op_data)
split_node: accepts a tuple as input, forwards each element of the tuple to its
priority_queue_node class_type
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
bool try_reserve(output_type &v)
task * try_put_task(const X &t)
indexer_node(const indexer_node &other)
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
static void fgt_make_edge(void *, void *)
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
void const char const char int ITT_FORMAT __itt_group_sync s
static void fgt_graph_desc(void *, const char *)
graph_iterator< const graph, const graph_node > const_iterator
The base of all graph nodes.
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
broadcast_node(const broadcast_node &src)
internal::broadcast_cache< output_type > & successors() __TBB_override
graph & graph_reference() __TBB_override
void reset_node(reset_flags f) __TBB_override
T input_type
The input type of this receiver.
graph_iterator< graph, graph_node > iterator
static void * to_void_ptr(T &t)
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
void set_external_ports(T1 &&input_ports_tuple, T2 &&output_ports_tuple)
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
void add_nodes(const NodeTypes &... n)
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
buffer_node< T, A > class_type
continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
tuple< T0, T1, T2, T3 > InputTuple
void internal_consume(prio_operation *op) __TBB_override
bool try_reserve(X &t)
Reserves an item in the sender.
item_buffer with reservable front-end. NOTE: if reserving, do not
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
task_list_type my_reset_task_list
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
indexer_node(const indexer_node &other)
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
multifunction_node(graph &g, size_t concurrency,)
A cache of predecessors that only supports try_get.
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Forwards messages in FIFO order.
receiver_gateway_impl(async_node *node)
fOutput_type::successor_type successor_type
tbb::flow::tuple< sender< OutputTypes > &... > output_ports_type
internal::async_body_base< gateway_type > async_body_base_type
async_storage_ptr my_storage
virtual bool try_reserve(T &)
Reserves an item in the sender.
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
K key_from_message(const T &t)
function_node(graph &g, size_t concurrency,)
Constructor.
void reset_node(reset_flags f) __TBB_override
bool try_get(output_type &v) __TBB_override
Request an item from the node.
priority_queue_node(const priority_queue_node &src)
Copy constructor.
task that does nothing. Useful for synchronization.
static void clear_this(P &p)
untyped_receiver successor_type
The successor type for this node.
internal::multifunction_output< Output > output_port_type
reference operator *() const
Dereference.
buffer_node< T, A >::buffer_operation prio_operation
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
sender< output_type >::successor_type successor_type
task * try_put_task(const input_type &) __TBB_override
Put item to successor; return task to run the successor if possible.
static void fgt_async_commit(void *, void *)
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
void set_owner(owner_type *owner)
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
static void fgt_multioutput_node_desc(const NodeType *, const char *)
Used to form groups of tasks.
#define __TBB_STATIC_ASSERT(condition, msg)
sender< output_type >::successor_type successor_type
void reset_node(reset_flags f) __TBB_override
Implements methods for a function node that takes a type Input as input and sends.
indexer_node(const indexer_node &other)
Base class for receivers of completion messages.
static void fgt_async_try_put_end(void *, void *)
buffer_node< T, A >::size_type size_type
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void internal_reserve(prio_operation *op) __TBB_override
continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
internal::tagged_msg< size_t, T0, T1 > output_type
indexer_node(const indexer_node &other)
void spawn_put()
Spawns a task that applies the body.
tuple< T0, T1, T2, T3, T4, T5, T6 > InputTuple
Input and scheduling for a function node that takes a type Input as input.
iterator begin()
start iterator
bool try_get(X &t)
Request an item from the sender.
buffer_node< T, A > base_type
receiver< input_type >::predecessor_type predecessor_type
static tbb::task *const SUCCESSFULLY_ENQUEUED
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
tuple< T0, T1, T2, T3, T4 > InputTuple
receiver_type::predecessor_type predecessor_type
bool gather_successful_try_puts(const X &t, task_list &tasks)
overwrite_node< T > base_type
void add_nodes(const NodeTypes &... n)
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
buffer_node< T, A >::size_type size_type
sequencer_node(graph &g, const Sequencer &s)
Constructor.
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
indexer_node(const indexer_node &other)
static void fgt_reserve_wait(void *)
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
bool internal_push(sequencer_operation *op) __TBB_override
static const void * to_void_ptr(const T &t)
indexer_node(const indexer_node &other)
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
buffer_node< T, A >::buffer_operation sequencer_operation
static const bool is_async_type
receiver< input_type >::predecessor_type predecessor_type
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
void enqueue_in_graph_arena(graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
internal::tagged_msg< size_t, T0 > output_type
tuple< T0, T1, T2, T3, T4, T5 > InputTuple
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
tbb::internal::uint64_t tag_value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
indexer_node(const indexer_node &other)
void set_owner(successor_type *owner)
bool is_graph_active(graph &g)
internal::unfolded_indexer_node< InputTuple > unfolded_type
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
receiver< input_type >::predecessor_type predecessor_type
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
task * try_put_task_impl(const input_type &v)
static void fgt_node(string_index, void *, void *)
virtual ~untyped_sender()
receiver< input_type >::predecessor_type predecessor_type
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
An abstract cache of successors.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Forwards messages of type T to all successors.
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
virtual void internal_reserve(buffer_operation *op)
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
internal::unfolded_indexer_node< InputTuple > unfolded_type
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Pure virtual template class that defines a receiver of messages of type T.
bool empty() const
True if list is empty; false otherwise.
internal::source_body< output_type > * my_body
queue_node(graph &g)
Constructor.
implements a function node that supports Input -> (set of outputs)
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
receiver< input_type >::predecessor_type predecessor_type
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
void set_external_ports(T &&input_ports_tuple)
static task * emit_this(graph &g, const T &t, P &p)
bool is_continue_receiver() __TBB_override
pointer operator->() const
Dereference.
tbb::task_group_context * my_context
continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
internal::broadcast_cache< T > my_successors
untyped_sender predecessor_type
The predecessor type for this node.
void register_successor(successor_type &r)
GraphNodeType & reference
successors_type my_successors
A lock that occupies a single byte.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
static void fgt_async_reserve(void *, void *)
base_type::output_ports_type output_ports_type
unfolded_type::input_ports_type input_ports_type
graph_node * my_nodes_last
bool try_put(const output_type &i)
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
const V & cast_to(T const &t)
leaf for multifunction. OutputSet can be a std::tuple or a vector.
std::unique_ptr< input_ports_type > my_input_ports
output_ports_type & output_ports()
internal::unfolded_indexer_node< InputTuple > unfolded_type
void prio_push(const T &src)
void remove_node(graph_node *n)
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
Forwards messages in priority order.
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
task & pop_front()
Pop the front task from the list.
Implements methods for both executable and function nodes that puts Output to its successors.
receiver< input_type >::predecessor_type predecessor_type
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
bool try_consume() __TBB_override
Consumes a reserved item.
A cache of successors that are put in a round-robin fashion.
bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
bool try_reserve(T &v) __TBB_override
Reserves an item.
const_iterator cend() const
end const iterator
graph & graph_reference() __TBB_override
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
void set_ref_count(int count)
Set reference count.
~sequencer_node()
Destructor.
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
graph & graph_reference() __TBB_override
async_node(graph &g, size_t concurrency,)
Enables one or the other code branches.
bool try_consume() __TBB_override
Consumes the reserved item.
output_type my_cached_item
Implements an executable node that supports continue_msg -> Output.
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
internal::source_body< output_type > * my_init_body
iterator end()
end iterator
void reset_node(reset_flags f) __TBB_override
continue_receiver(const continue_receiver &src)
Copy constructor.
priority_queue_node(graph &g)
Constructor.
static const node_priority_t no_priority
Body copy_function_object()
Base class for user-defined tasks.
internal::unfolded_indexer_node< InputTuple > unfolded_type
sender< output_type >::successor_type successor_type
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
interface10::internal::Policy< queueing, lightweight > queueing_lightweight
graph_iterator()
Default constructor.
void __TBB_store_with_release(volatile T &location, V value)
void reset(reset_flags f=rf_reset_protocol)
sender< output_type >::successor_type successor_type
join_node(const join_node &other)
static void fgt_begin_body(void *)
sequencer_node(const sequencer_node &src)
Copy constructor.
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
static void fgt_composite(void *, void *)
Detects whether two given types are the same.
void reset_node(reset_flags f) __TBB_override
void internal_reserve(queue_operation *op) __TBB_override
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8 > InputTuple
void add_visible_nodes(const NodeTypes &... n)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void internal_consume(queue_operation *op) __TBB_override
task * try_put_task(const X &t)
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
A task that calls a node's forward_task function.
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
input_impl_type::predecessor_type predecessor_type
sender< output_type >::successor_type successor_type
async_body(const Body &body, gateway_type *gateway)
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
base_type::buffer_operation queue_operation
internal::function_output< output_type > fOutput_type
~source_node()
The destructor.
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
static const T & from_void_ptr(const void *p)
task * grab_forwarding_task(buffer_operation &op_data)
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
static void fgt_release_wait(void *)
void reset_node(reset_flags f) __TBB_override
virtual source_body * clone()=0
void internal_pop(queue_operation *op) __TBB_override
function_body that takes an Input and a set of output ports
An cache of predecessors that supports requests and reservations.
graph & graph_reference() __TBB_override
overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Forwards messages in sequence order.
output_ports_type & output_ports()
buffer_node< T, A >::item_type item_type
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
join_node(const join_node &other)
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
void remove_successor(successor_type &r)
Base class for types that should not be assigned.
sender< output_type >::successor_type successor_type
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
internal::broadcast_cache< output_type > my_successors
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
bool try_consume() __TBB_override
Consumes a reserved item.
internal::async_helpers< T >::filtered_type filtered_type
virtual void internal_consume(buffer_operation *op)
virtual bool try_get_wrapper(void *p, bool is_async)=0
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
multifunction_node(const multifunction_node &other)
sender< output_type >::successor_type successor_type
static void fgt_graph(void *)
bool try_reserve_apply_body(output_type &v)
function_node(const function_node &src)
Copy constructor.
tbb::task_arena * my_task_arena
void internal_release(prio_operation *op) __TBB_override
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
sender< output_type >::successor_type successor_type
The type of successors of this node.
register_predecessor_task(predecessor_type &owner, successor_type &succ)
static void fgt_node_desc(const NodeType *, const char *)
async_msg< T > async_type
A task that calls a node's apply_body_bypass function with no input.
void increment_ref_count()
Atomically increment reference count.
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
internal::async_helpers< T >::filtered_type filtered_type
continue_msg input_type
The input type.
bool try_release() __TBB_override
Releases the reserved item.
static void fgt_multiinput_multioutput_node(string_index, void *, void *)
bool internal_push(prio_operation *op) __TBB_override
sender< output_type >::successor_type successor_type
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
queue_node(const queue_node &src)
Copy constructor.
void try_put_and_add_task(task *&last_task)
static void * to_void_ptr(T &t)
void add_nodes_impl(CompositeType *, bool)
bool try_reserve(T &v) __TBB_override
Reserves an item.
buffer_node(const buffer_node &src)
Copy constructor.
internal::port_ref_impl< N1, N2 > port_ref()
void add_nodes(const NodeTypes &... n)
void activate_graph(graph &g)
internal::tagged_msg< size_t, T0, T1, T2 > output_type
unfolded_type::input_ports_type input_ports_type
An empty class used for messages that mean "I'm done".
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
void add_visible_nodes(const NodeTypes &... n)
input_impl_type::predecessor_type predecessor_type
An executable node that acts as a source, i.e. it has no predecessors.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
virtual graph & graph_reference()=0
virtual void internal_pop(buffer_operation *op)
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
async_node(const async_node &other)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void reset_node(reset_flags f) __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync p
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Implements methods for an executable node that takes continue_msg as input.
internal::unfolded_indexer_node< InputTuple > unfolded_type
static void alias_port(void *, PortsTuple &)
input_ports_type & input_ports()
std::unique_ptr< output_ports_type > my_output_ports
void reset_node(reset_flags f) __TBB_override
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
internal::function_input_queue< input_type, Allocator > input_queue_type
Implements methods for a function node that takes a type Input as input.
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
task * decrement_counter(long long delta)
void register_node(graph_node *n)
task * try_put_task(const X &t)
indexer_node(const indexer_node &other)
internal::unfolded_indexer_node< InputTuple > unfolded_type
void internal_pop(prio_operation *op) __TBB_override
internal::function_input_queue< input_type, Allocator > input_queue_type
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
bool try_release() __TBB_override
Release a reserved item.
Base class for tasks generated by graph nodes.
receiver< input_type >::predecessor_type predecessor_type
T output_type
The output type of this sender.
static void fgt_async_try_put_begin(void *, void *)
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
graph()
Constructs a graph with isolated task_group_context.
void set_gateway(gateway_type *gateway)
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Output output_type
The type of the output message, which is complete.
tbb::spin_mutex nodelist_mutex
virtual bool is_continue_receiver()
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
virtual void finalize() const
Represents acquisition of a mutex.
void handle_operations(prio_operation *op_list) __TBB_override
tuple< T0, T1 > InputTuple
indexer_node(const indexer_node &other)
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
output_ports_type my_output_ports
std::unique_ptr< output_ports_type > my_output_ports
static void fgt_remove_edge(void *, void *)
void reset_node(reset_flags f) __TBB_override
receiver< input_type >::predecessor_type predecessor_type
bool try_release() __TBB_override
Release a reserved item.
tuple< T0, T1, T2, T3, T4, T5, T6, T7 > InputTuple
join_node(const join_node &other)
source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
unfolded_type::input_ports_type input_ports_type
void reset_node(reset_flags) __TBB_override
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.