Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_concurrent_queue_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__concurrent_queue_impl_H
18 #define __TBB__concurrent_queue_impl_H
19 
20 #ifndef __TBB_concurrent_queue_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #include "../tbb_stddef.h"
25 #include "../tbb_machine.h"
26 #include "../atomic.h"
27 #include "../spin_mutex.h"
28 #include "../cache_aligned_allocator.h"
29 #include "../tbb_exception.h"
30 #include "../tbb_profiling.h"
31 #include <new>
32 #include __TBB_STD_SWAP_HEADER
33 #include <iterator>
34 
35 namespace tbb {
36 
37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
38 
39 // forward declaration
40 namespace strict_ppl {
41 template<typename T, typename A> class concurrent_queue;
42 }
43 
44 template<typename T, typename A> class concurrent_bounded_queue;
45 
46 #endif
47 
49 namespace strict_ppl {
50 
52 namespace internal {
53 
54 using namespace tbb::internal;
55 
56 typedef size_t ticket;
57 
58 template<typename T> class micro_queue ;
59 template<typename T> class micro_queue_pop_finalizer ;
60 template<typename T> class concurrent_queue_base_v3;
61 template<typename T> struct concurrent_queue_rep;
62 
64 
68  template<typename T> friend class micro_queue;
69  template<typename T> friend class concurrent_queue_base_v3;
70 
71 protected:
73  static const size_t phi = 3;
74 
75 public:
76  // must be power of 2
77  static const size_t n_queue = 8;
78 
80  struct page {
82  uintptr_t mask;
83  };
84 
86  char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
88  char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
89 
92 
94  size_t item_size;
95 
98 
99  char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
100 } ;
101 
103  return uintptr_t(p)>1;
104 }
105 
107 
111 {
112  template<typename T> friend class micro_queue ;
113  template<typename T> friend class micro_queue_pop_finalizer ;
114 protected:
116 private:
117  virtual concurrent_queue_rep_base::page* allocate_page() = 0;
118  virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
119 } ;
120 
121 #if _MSC_VER && !defined(__INTEL_COMPILER)
122 // unary minus operator applied to unsigned type, result still unsigned
123 #pragma warning( push )
124 #pragma warning( disable: 4146 )
125 #endif
126 
128 
130 template<typename T>
131 class micro_queue : no_copy {
132 public:
133  typedef void (*item_constructor_t)(T* location, const void* src);
134 private:
136 
140  public:
141  destroyer( T& value ) : my_value(value) {}
142  ~destroyer() {my_value.~T();}
143  };
144 
145  void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
146  construct_item( &get_ref(dst, dindex), src );
147  }
148 
149  void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
150  item_constructor_t construct_item )
151  {
152  T& src_item = get_ref( const_cast<page&>(src), sindex );
153  construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
154  }
155 
156  void assign_and_destroy_item( void* dst, page& src, size_t index ) {
157  T& from = get_ref(src,index);
158  destroyer d(from);
159  *static_cast<T*>(dst) = tbb::internal::move( from );
160  }
161 
162  void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
163 
164 public:
165  friend class micro_queue_pop_finalizer<T>;
166 
167  struct padded_page: page {
169  padded_page();
171  void operator=( const padded_page& );
173  T last;
174  };
175 
176  static T& get_ref( page& p, size_t index ) {
177  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
178  }
179 
182 
185 
187 
188  void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
189  item_constructor_t construct_item ) ;
190 
191  bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
192 
193  micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
194  item_constructor_t construct_item ) ;
195 
196  page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
197  size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
198 
199  void invalidate_page_and_rethrow( ticket k ) ;
200 };
201 
202 template<typename T>
204  for( atomic_backoff b(true);;b.pause() ) {
205  ticket c = counter;
206  if( c==k ) return;
207  else if( c&1 ) {
208  ++rb.n_invalid_entries;
210  }
211  }
212 }
213 
214 template<typename T>
215 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
216  item_constructor_t construct_item )
217 {
219  page* p = NULL;
220  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
221  if( !index ) {
222  __TBB_TRY {
224  p = pa.allocate_page();
225  } __TBB_CATCH (...) {
226  ++base.my_rep->n_invalid_entries;
227  invalidate_page_and_rethrow( k );
228  }
229  p->mask = 0;
230  p->next = NULL;
231  }
232 
233  if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
234  call_itt_notify(acquired, &tail_counter);
235 
236  if( p ) {
237  spin_mutex::scoped_lock lock( page_mutex );
238  page* q = tail_page;
239  if( is_valid_page(q) )
240  q->next = p;
241  else
242  head_page = p;
243  tail_page = p;
244  } else {
245  p = tail_page;
246  }
247 
248  __TBB_TRY {
249  copy_item( *p, index, item, construct_item );
250  // If no exception was thrown, mark item as present.
251  itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
252  call_itt_notify(releasing, &tail_counter);
253  tail_counter += concurrent_queue_rep_base::n_queue;
254  } __TBB_CATCH (...) {
255  ++base.my_rep->n_invalid_entries;
256  call_itt_notify(releasing, &tail_counter);
257  tail_counter += concurrent_queue_rep_base::n_queue;
258  __TBB_RETHROW();
259  }
260 }
261 
262 template<typename T>
265  if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
266  call_itt_notify(acquired, &head_counter);
267  if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
268  call_itt_notify(acquired, &tail_counter);
269  page *p = head_page;
270  __TBB_ASSERT( p, NULL );
271  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
272  bool success = false;
273  {
274  micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
275  if( p->mask & uintptr_t(1)<<index ) {
276  success = true;
277  assign_and_destroy_item( dst, *p, index );
278  } else {
279  --base.my_rep->n_invalid_entries;
280  }
281  }
282  return success;
283 }
284 
285 template<typename T>
287  item_constructor_t construct_item )
288 {
289  head_counter = src.head_counter;
290  tail_counter = src.tail_counter;
291 
292  const page* srcp = src.head_page;
293  if( is_valid_page(srcp) ) {
294  ticket g_index = head_counter;
295  __TBB_TRY {
296  size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
297  size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
298  size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
299 
300  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301  page* cur_page = head_page;
302 
303  if( srcp != src.tail_page ) {
304  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
305  cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
306  cur_page = cur_page->next;
307  }
308 
309  __TBB_ASSERT( srcp==src.tail_page, NULL );
310  size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
311  if( last_index==0 ) last_index = base.my_rep->items_per_page;
312 
313  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314  cur_page = cur_page->next;
315  }
316  tail_page = cur_page;
317  } __TBB_CATCH (...) {
318  invalidate_page_and_rethrow( g_index );
319  }
320  } else {
321  head_page = tail_page = NULL;
322  }
323  return *this;
324 }
325 
326 template<typename T>
328  // Append an invalid page at address 1 so that no more pushes are allowed.
329  page* invalid_page = (page*)uintptr_t(1);
330  {
331  spin_mutex::scoped_lock lock( page_mutex );
333  page* q = tail_page;
334  if( is_valid_page(q) )
335  q->next = invalid_page;
336  else
337  head_page = invalid_page;
338  tail_page = invalid_page;
339  }
340  __TBB_RETHROW();
341 }
342 
343 template<typename T>
345  const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
346  ticket& g_index, item_constructor_t construct_item )
347 {
349  page* new_page = pa.allocate_page();
350  new_page->next = NULL;
351  new_page->mask = src_page->mask;
352  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353  if( new_page->mask & uintptr_t(1)<<begin_in_page )
354  copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
355  return new_page;
356 }
357 
358 template<typename T>
365 public:
367  my_ticket(k), my_queue(queue), my_page(p), allocator(b)
368  {}
370 };
371 
372 template<typename T>
374  page* p = my_page;
375  if( is_valid_page(p) ) {
376  spin_mutex::scoped_lock lock( my_queue.page_mutex );
377  page* q = p->next;
378  my_queue.head_page = q;
379  if( !is_valid_page(q) ) {
380  my_queue.tail_page = NULL;
381  }
382  }
383  itt_store_word_with_release(my_queue.head_counter, my_ticket);
384  if( is_valid_page(p) ) {
385  allocator.deallocate_page( p );
386  }
387 }
388 
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
392 
393 template<typename T> class concurrent_queue_iterator_rep ;
394 template<typename T> class concurrent_queue_iterator_base_v3;
395 
397 
400 template<typename T>
402  micro_queue<T> array[n_queue];
403 
405  static size_t index( ticket k ) {
406  return k*phi%n_queue;
407  }
408 
410  // The formula here approximates LRU in a cache-oblivious way.
411  return array[index(k)];
412  }
413 };
414 
416 
420 template<typename T>
421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
422 private:
425 
426  friend struct concurrent_queue_rep<T>;
427  friend class micro_queue<T>;
430 
431 protected:
433 
434 private:
437 
439  concurrent_queue_rep<T>& r = *my_rep;
440  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
441  return reinterpret_cast<page*>(allocate_block ( n ));
442  }
443 
445  concurrent_queue_rep<T>& r = *my_rep;
446  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
447  deallocate_block( reinterpret_cast<void*>(p), n );
448  }
449 
451  virtual void *allocate_block( size_t n ) = 0;
452 
454  virtual void deallocate_block( void *p, size_t n ) = 0;
455 
456 protected:
458 
460 #if TBB_USE_ASSERT
461  size_t nq = my_rep->n_queue;
462  for( size_t i=0; i<nq; i++ )
463  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
464 #endif /* TBB_USE_ASSERT */
465  cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
466  }
467 
469  void internal_push( const void* src, item_constructor_t construct_item ) {
470  concurrent_queue_rep<T>& r = *my_rep;
471  ticket k = r.tail_counter++;
472  r.choose(k).push( src, k, *this, construct_item );
473  }
474 
476 
477  bool internal_try_pop( void* dst ) ;
478 
480  size_t internal_size() const ;
481 
483  bool internal_empty() const ;
484 
486  /* note that the name may be misleading, but it remains so due to a historical accident. */
487  void internal_finish_clear() ;
488 
492  }
493 
495  void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
496 
497 #if __TBB_CPP11_RVALUE_REF_PRESENT
498  void internal_swap( concurrent_queue_base_v3& src ) {
500  std::swap( my_rep, src.my_rep );
501  }
502 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
503 };
504 
505 template<typename T>
507  const size_t item_size = sizeof(T);
508  my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
509  __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
510  __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
511  __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
512  __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
513  memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
514  my_rep->item_size = item_size;
515  my_rep->items_per_page = item_size<= 8 ? 32 :
516  item_size<= 16 ? 16 :
517  item_size<= 32 ? 8 :
518  item_size<= 64 ? 4 :
519  item_size<=128 ? 2 :
520  1;
521 }
522 
523 template<typename T>
525  concurrent_queue_rep<T>& r = *my_rep;
526  ticket k;
527  do {
528  k = r.head_counter;
529  for(;;) {
530  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
531  // Queue is empty
532  return false;
533  }
534  // Queue had item with ticket k when we looked. Attempt to get that item.
535  ticket tk=k;
536 #if defined(_MSC_VER) && defined(_Wp64)
537  #pragma warning (push)
538  #pragma warning (disable: 4267)
539 #endif
540  k = r.head_counter.compare_and_swap( tk+1, tk );
541 #if defined(_MSC_VER) && defined(_Wp64)
542  #pragma warning (pop)
543 #endif
544  if( k==tk )
545  break;
546  // Another thread snatched the item, retry.
547  }
548  } while( !r.choose( k ).pop( dst, k, *this ) );
549  return true;
550 }
551 
552 template<typename T>
554  concurrent_queue_rep<T>& r = *my_rep;
555  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
556  ticket hc = r.head_counter;
557  size_t nie = r.n_invalid_entries;
558  ticket tc = r.tail_counter;
559  __TBB_ASSERT( hc!=tc || !nie, NULL );
560  ptrdiff_t sz = tc-hc-nie;
561  return sz<0 ? 0 : size_t(sz);
562 }
563 
564 template<typename T>
566  concurrent_queue_rep<T>& r = *my_rep;
567  ticket tc = r.tail_counter;
568  ticket hc = r.head_counter;
569  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
570  return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
571 }
572 
573 template<typename T>
575  concurrent_queue_rep<T>& r = *my_rep;
576  size_t nq = r.n_queue;
577  for( size_t i=0; i<nq; ++i ) {
578  page* tp = r.array[i].tail_page;
579  if( is_valid_page(tp) ) {
580  __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
581  deallocate_page( tp );
582  r.array[i].tail_page = NULL;
583  } else
584  __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
585  }
586 }
587 
588 template<typename T>
590  item_constructor_t construct_item )
591 {
592  concurrent_queue_rep<T>& r = *my_rep;
593  r.items_per_page = src.my_rep->items_per_page;
594 
595  // copy concurrent_queue_rep data
596  r.head_counter = src.my_rep->head_counter;
597  r.tail_counter = src.my_rep->tail_counter;
598  r.n_invalid_entries = src.my_rep->n_invalid_entries;
599 
600  // copy or move micro_queues
601  for( size_t i = 0; i < r.n_queue; ++i )
602  r.array[i].assign( src.my_rep->array[i], *this, construct_item);
603 
604  __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
605  "the source concurrent queue should not be concurrently modified." );
606 }
607 
608 template<typename Container, typename Value> class concurrent_queue_iterator;
609 
610 template<typename T>
613 public:
618  head_counter(queue.my_rep->head_counter),
619  my_queue(queue)
620  {
621  for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622  array[k] = queue.my_rep->array[k].head_page;
623  }
624 
626  bool get_item( T*& item, size_t k ) ;
627 };
628 
629 template<typename T>
630 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
631  if( k==my_queue.my_rep->tail_counter ) {
632  item = NULL;
633  return true;
634  } else {
636  __TBB_ASSERT(p,NULL);
637  size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
638  item = &micro_queue<T>::get_ref(*p,i);
639  return (p->mask & uintptr_t(1)<<i)!=0;
640  }
641 }
642 
644 
645 template<typename Value>
648 
650 
651  template<typename C, typename T, typename U>
653 
654  template<typename C, typename T, typename U>
656 protected:
658  Value* my_item;
659 
661  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
664 #endif
665  }
666 
669  : no_assign(), my_rep(NULL), my_item(NULL) {
670  assign(i);
671  }
672 
675 
677  void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
678 
680  void advance() ;
681 
685  my_rep = NULL;
686  }
687 };
688 
689 template<typename Value>
692  new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
693  size_t k = my_rep->head_counter;
694  if( !my_rep->get_item(my_item, k) ) advance();
695 }
696 
697 template<typename Value>
699  if( my_rep!=other.my_rep ) {
700  if( my_rep ) {
702  my_rep = NULL;
703  }
704  if( other.my_rep ) {
706  new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
707  }
708  }
709  my_item = other.my_item;
710 }
711 
712 template<typename Value>
714  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
715  size_t k = my_rep->head_counter;
716  const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
717 #if TBB_USE_ASSERT
718  Value* tmp;
719  my_rep->get_item(tmp,k);
720  __TBB_ASSERT( my_item==tmp, NULL );
721 #endif /* TBB_USE_ASSERT */
723  if( i==queue.my_rep->items_per_page-1 ) {
725  root = root->next;
726  }
727  // advance k
728  my_rep->head_counter = ++k;
729  if( !my_rep->get_item(my_item, k) ) advance();
730 }
731 
733 
734 template<typename T> struct tbb_remove_cv {typedef T type;};
735 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
736 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
737 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
738 
740 
742 template<typename Container, typename Value>
743 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
744  public std::iterator<std::forward_iterator_tag,Value> {
745 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
746  template<typename T, class A>
747  friend class ::tbb::strict_ppl::concurrent_queue;
748 #else
749 public:
750 #endif
753  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
754  {
755  }
756 
757 public:
759 
763  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
764  {}
765 
768  this->assign(other);
769  return *this;
770  }
771 
773  Value& operator*() const {
774  return *static_cast<Value*>(this->my_item);
775  }
776 
777  Value* operator->() const {return &operator*();}
778 
781  this->advance();
782  return *this;
783  }
784 
786  Value* operator++(int) {
787  Value* result = &operator*();
788  operator++();
789  return result;
790  }
791 }; // concurrent_queue_iterator
792 
793 
794 template<typename C, typename T, typename U>
796  return i.my_item==j.my_item;
797 }
798 
799 template<typename C, typename T, typename U>
801  return i.my_item!=j.my_item;
802 }
803 
804 } // namespace internal
805 
807 
808 } // namespace strict_ppl
809 
811 namespace internal {
812 
816 template<typename Container, typename Value> class concurrent_queue_iterator;
817 
819 
822 private:
825 
826  friend class concurrent_queue_rep;
827  friend struct micro_queue;
831 protected:
833  struct page {
835  uintptr_t mask;
836  };
837 
839  ptrdiff_t my_capacity;
840 
843 
845  size_t item_size;
846 
847  enum copy_specifics { copy, move };
848 
849 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
850 public:
851 #endif
852  template<typename T>
853  struct padded_page: page {
855  padded_page();
857  void operator=( const padded_page& );
859  T last;
860  };
861 
862 private:
863  virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
864  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
865 protected:
868 
870  void __TBB_EXPORTED_METHOD internal_push( const void* src );
871 
873  void __TBB_EXPORTED_METHOD internal_pop( void* dst );
874 
876  void __TBB_EXPORTED_METHOD internal_abort();
877 
879  bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
880 
882 
883  bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
884 
886  ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
887 
890 
892  void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
893 
895  virtual page *allocate_page() = 0;
896 
898  virtual void deallocate_page( page *p ) = 0;
899 
901  /* note that the name may be misleading, but it remains so due to a historical accident. */
903 
906 
909 
910 #if __TBB_CPP11_RVALUE_REF_PRESENT
913  std::swap( my_capacity, src.my_capacity );
914  std::swap( items_per_page, src.items_per_page );
915  std::swap( item_size, src.item_size );
916  std::swap( my_rep, src.my_rep );
917  }
918 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
919 
921  void internal_insert_item( const void* src, copy_specifics op_type );
922 
924  bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
925 
927  void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
928 private:
929  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
930 };
931 
933 
936 protected:
937  concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
938 
941 
944 
946  void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
947 private:
948  friend struct micro_queue;
949  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
950  virtual void move_item( page& dst, size_t index, const void* src ) = 0;
951 };
952 
954 
957 
959 
960  template<typename C, typename T, typename U>
962 
963  template<typename C, typename T, typename U>
965 
966  void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
967 protected:
969  void* my_item;
970 
973 
976  assign(i);
977  }
978 
980 
982 
985 
988 
991 
994 };
995 
997 
999 
1001 template<typename Container, typename Value>
1003  public std::iterator<std::forward_iterator_tag,Value> {
1004 
1005 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1006  template<typename T, class A>
1007  friend class ::tbb::concurrent_bounded_queue;
1008 #else
1009 public:
1010 #endif
1011 
1015  {
1016  }
1017 
1018 public:
1020 
1025  {}
1026 
1029  assign(other);
1030  return *this;
1031  }
1032 
1034  Value& operator*() const {
1035  return *static_cast<Value*>(my_item);
1036  }
1037 
1038  Value* operator->() const {return &operator*();}
1039 
1042  advance();
1043  return *this;
1044  }
1045 
1047  Value* operator++(int) {
1048  Value* result = &operator*();
1049  operator++();
1050  return result;
1051  }
1052 }; // concurrent_queue_iterator
1053 
1054 
1055 template<typename C, typename T, typename U>
1057  return i.my_item==j.my_item;
1058 }
1059 
1060 template<typename C, typename T, typename U>
1062  return i.my_item!=j.my_item;
1063 }
1064 
1065 } // namespace internal;
1066 
1068 
1069 } // namespace tbb
1070 
1071 #endif /* __TBB__concurrent_queue_impl_H */
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
Abstract class to define interface for page allocation/deallocation.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
void itt_hide_store_word(T &dst, T src)
Value & operator *() const
Reference to current item.
#define __TBB_override
Definition: tbb_stddef.h:240
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
bool internal_empty() const
check if the queue is empty; thread safe
static size_t index(ticket k)
Map ticket to an array index.
auto last(Container &c) -> decltype(begin(c))
concurrent_queue_iterator & operator++()
Advance to next item in queue.
concurrent_queue_rep_base::page page
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
#define __TBB_EXPORTED_METHOD
Definition: tbb_stddef.h:98
value_type compare_and_swap(value_type value, value_type comparand)
Definition: atomic.h:285
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
concurrent_queue_rep * my_rep
Internal representation.
Constness-independent portion of concurrent_queue_iterator.
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
void advance()
Advance iterator one step towards tail of queue.
virtual void move_item(page &dst, size_t index, const void *src)=0
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
The graph class.
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Internal representation of a ConcurrentQueue.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator &other)
Iterator assignment.
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
A queue using simple locking.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
representation of concurrent_queue_base
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
A lock that occupies a single byte.
Definition: spin_mutex.h:36
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
Class that implements exponential backoff.
Definition: tbb_machine.h:345
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:305
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
concurrent_queue_iterator & operator=(const concurrent_queue_iterator &other)
Iterator assignment.
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:361
Meets requirements of a forward iterator for STL.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
#define __TBB_TRY
Definition: tbb_stddef.h:283
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
Meets requirements of a forward iterator for STL.
concurrent_queue_rep< T > * my_rep
Internal representation.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
virtual concurrent_queue_rep_base::page * allocate_page()=0
void call_itt_notify(notify_type, void *)
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:51
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
Type-independent portion of concurrent_queue_iterator.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
#define __TBB_compiler_fence()
Definition: icc_generic.h:51
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
void const char const char int ITT_FORMAT __itt_group_sync p
Class used to ensure exception-safety of method "pop".
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
micro_queue< T >::item_constructor_t item_constructor_t
static T & get_ref(page &p, size_t index)
parts of concurrent_queue_rep that do not have references to micro_queue
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
#define __TBB_RETHROW()
Definition: tbb_stddef.h:286
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void assign_and_destroy_item(void *dst, page &src, size_t index)
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
padded_page()
Not defined anywhere - exists to quiet warnings.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
ptrdiff_t my_capacity
Capacity of the queue.
Represents acquisition of a mutex.
Definition: spin_mutex.h:50
void pause()
Pause for a while.
Definition: tbb_machine.h:360
void swap(atomic< T > &lhs, atomic< T > &rhs)
Definition: atomic.h:535

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.