Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
aggregator.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__aggregator_H
18 #define __TBB__aggregator_H
19 
20 #if !TBB_PREVIEW_AGGREGATOR
21 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
22 #endif
23 
24 #include "atomic.h"
25 #include "tbb_profiling.h"
26 
27 namespace tbb {
28 namespace interface6 {
29 
30 using namespace tbb::internal;
31 
33  template<typename handler_type> friend class aggregator_ext;
34  uintptr_t status;
36 public:
37  enum aggregator_operation_status { agg_waiting=0, agg_finished };
38  aggregator_operation() : status(agg_waiting), my_next(NULL) {}
40  void start() { call_itt_notify(acquired, &status); }
42 
43  void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
46 };
47 
48 namespace internal {
49 
51  friend class basic_handler;
52  virtual void apply_body() = 0;
53 public:
55  virtual ~basic_operation_base() {}
56 };
57 
58 template<typename Body>
60  const Body& my_body;
61  void apply_body() __TBB_override { my_body(); }
62 public:
63  basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
64 };
65 
67 public:
69  void operator()(aggregator_operation* op_list) const {
70  while (op_list) {
71  // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
72  // The executing thread "acquires" the tag (see start()) and then performs
73  // the associated operation w/o triggering a race condition diagnostics.
74  // A thread that created the operation is waiting for its status (see execute_impl()),
75  // so when this thread is done with the operation, it will "release" the tag
76  // and update the status (see finish()) to give control back to the waiting thread.
77  basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
78  // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
79  op_list = op_list->next();
80  request.start();
81  request.apply_body();
82  request.finish();
83  }
84  }
85 };
86 
87 } // namespace internal
88 
90 
92 template <typename handler_type>
94 public:
95  aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
96 
98 
99  void process(aggregator_operation *op) { execute_impl(*op); }
100 
101 protected:
106 
107  // ITT note: &(op.status) tag is used to cover accesses to this operation. This
108  // thread has created the operation, and now releases it so that the handler
109  // thread may handle the associated operation w/o triggering a race condition;
110  // thus this tag will be acquired just before the operation is handled in the
111  // handle_operations functor.
113  // insert the operation into the list
114  do {
115  // ITT may flag the following line as a race; it is a false positive:
116  // This is an atomic read; we don't provide itt_hide_load_word for atomics
117  op.my_next = res = mailbox; // NOT A RACE
118  } while (mailbox.compare_and_swap(&op, res) != res);
119  if (!res) { // first in the list; handle the operations
120  // ITT note: &mailbox tag covers access to the handler_busy flag, which this
121  // waiting handler thread will try to set before entering handle_operations.
122  call_itt_notify(acquired, &mailbox);
123  start_handle_operations();
124  __TBB_ASSERT(op.status, NULL);
125  }
126  else { // not first; wait for op to be ready
130  }
131  }
132 
133 
134 private:
137 
139 
140  uintptr_t handler_busy;
141 
142  handler_type handle_operations;
143 
146  aggregator_operation *pending_operations;
147 
148  // ITT note: &handler_busy tag covers access to mailbox as it is passed
149  // between active and waiting handlers. Below, the waiting handler waits until
150  // the active handler releases, and the waiting handler acquires &handler_busy as
151  // it becomes the active_handler. The release point is at the end of this
152  // function, when all operations in mailbox have been handled by the
153  // owner of this aggregator.
154  call_itt_notify(prepare, &handler_busy);
155  // get handler_busy: only one thread can possibly spin here at a time
156  spin_wait_until_eq(handler_busy, uintptr_t(0));
157  call_itt_notify(acquired, &handler_busy);
158  // acquire fence not necessary here due to causality rule and surrounding atomics
159  __TBB_store_with_release(handler_busy, uintptr_t(1));
160 
161  // ITT note: &mailbox tag covers access to the handler_busy flag itself.
162  // Capturing the state of the mailbox signifies that handler_busy has been
163  // set and a new active handler will now process that list's operations.
164  call_itt_notify(releasing, &mailbox);
165  // grab pending_operations
166  pending_operations = mailbox.fetch_and_store(NULL);
167 
168  // handle all the operations
169  handle_operations(pending_operations);
170 
171  // release the handler
172  itt_store_word_with_release(handler_busy, uintptr_t(0));
173  }
174 };
175 
177 class aggregator : private aggregator_ext<internal::basic_handler> {
178 public:
179  aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
181 
183  template<typename Body>
184  void execute(const Body& b) {
185  internal::basic_operation<Body> op(b);
186  this->execute_impl(op);
187  }
188 };
189 
190 } // namespace interface6
191 
192 using interface6::aggregator;
193 using interface6::aggregator_ext;
194 using interface6::aggregator_operation;
195 
196 } // namespace tbb
197 
198 #endif // __TBB__aggregator_H
void itt_hide_store_word(T &dst, T src)
void process(aggregator_operation *op)
EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
Definition: aggregator.h:99
#define __TBB_override
Definition: tbb_stddef.h:240
aggregator_operation * next()
Definition: aggregator.h:44
void set_next(aggregator_operation *n)
Definition: aggregator.h:45
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
void start_handle_operations()
Trigger the handling of operations when the handler is free.
Definition: aggregator.h:145
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
Primary template for atomic.
Definition: atomic.h:403
The graph class.
aggregator_ext(const handler_type &h)
Definition: aggregator.h:95
Aggregator base class and expert interface.
Definition: aggregator.h:93
aggregator_operation * my_next
Definition: aggregator.h:35
uintptr_t handler_busy
Controls thread access to handle_operations.
Definition: aggregator.h:140
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
T itt_hide_load_word(const T &src)
void operator()(aggregator_operation *op_list) const
Definition: aggregator.h:69
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
value_type fetch_and_store(value_type value)
Definition: atomic.h:274
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
void start()
Call start before handling this operation.
Definition: aggregator.h:40
void finish()
Call finish when done handling this operation.
Definition: aggregator.h:43
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
void call_itt_notify(notify_type, void *)
Basic aggregator interface.
Definition: aggregator.h:177
T itt_load_word_with_acquire(const tbb::atomic< T > &src)
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:51
atomic< aggregator_operation * > mailbox
An atomically updated list (aka mailbox) of aggregator_operations.
Definition: aggregator.h:136
void execute(const Body &b)
BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
Definition: aggregator.h:184
void execute_impl(aggregator_operation &op)
Definition: aggregator.h:104
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function h

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.