Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_monitor.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_monitor_H
18 #define __TBB_concurrent_monitor_H
19 
20 #include "tbb/tbb_stddef.h"
21 #include "tbb/atomic.h"
22 #include "tbb/spin_mutex.h"
23 #include "tbb/tbb_exception.h"
24 #include "tbb/aligned_space.h"
25 
26 #include "semaphore.h"
27 
28 namespace tbb {
29 namespace internal {
30 
32 
34 public:
35  struct node_t {
38  explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
39  };
40 
41  // ctor
43  // dtor
45 
46  inline size_t size() const {return count;}
47  inline bool empty() const {return size()==0;}
48  inline node_t* front() const {return head.next;}
49  inline node_t* last() const {return head.prev;}
50  inline node_t* begin() const {return front();}
51  inline const node_t* end() const {return &head;}
52 
54  inline void add( node_t* n ) {
56  n->prev = head.prev;
57  n->next = &head;
58  head.prev->next = n;
59  head.prev = n;
60  }
61 
63  inline void remove( node_t& n ) {
64  __TBB_ASSERT( count > 0, "attempt to remove an item from an empty list" );
66  n.prev->next = n.next;
67  n.next->prev = n.prev;
68  }
69 
72  if( const size_t l_count = __TBB_load_relaxed(count) ) {
73  __TBB_store_relaxed(lst.count, l_count);
74  lst.head.next = head.next;
75  lst.head.prev = head.prev;
76  head.next->prev = &lst.head;
77  head.prev->next = &lst.head;
78  clear();
79  }
80  }
81 
83 private:
86 };
87 
90 
92 
94 public:
97  friend class concurrent_monitor;
98  public:
99  thread_context() : skipped_wakeup(false), aborted(false), ready(false), context(0) {
100  epoch = 0;
101  in_waitset = false;
102  }
104  if (ready) {
105  if( skipped_wakeup ) semaphore().P();
107  }
108  }
109  binary_semaphore& semaphore() { return *sema.begin(); }
110  private:
112  // Inlining of the method is undesirable, due to extra instructions for
113  // exception support added at caller side.
114  __TBB_NOINLINE( void init() );
115  tbb::aligned_space<binary_semaphore> sema;
116  __TBB_atomic unsigned epoch;
117  tbb::atomic<bool> in_waitset;
119  bool aborted;
120  bool ready;
121  uintptr_t context;
122  };
123 
126 
129 
131  void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
132 
134 
135  inline bool commit_wait( thread_context& thr ) {
136  const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
137  // this check is just an optimization
138  if( do_it ) {
139  __TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
140  thr.semaphore().P();
141  __TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
142  if( thr.aborted )
144  } else {
145  cancel_wait( thr );
146  }
147  return do_it;
148  }
150  void cancel_wait( thread_context& thr );
151 
153  template<typename WaitUntil, typename Context>
154  void wait( WaitUntil until, Context on );
155 
158 
160  void notify_one_relaxed();
161 
164 
166  void notify_all_relaxed();
167 
169  template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
170 
172  template<typename P> void notify_relaxed( const P& predicate );
173 
176 
178  void abort_all_relaxed();
179 
180 private:
183  __TBB_atomic unsigned epoch;
184  thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
185 };
186 
187 template<typename WaitUntil, typename Context>
188 void concurrent_monitor::wait( WaitUntil until, Context on )
189 {
190  bool slept = false;
191  thread_context thr_ctx;
192  prepare_wait( thr_ctx, on() );
193  while( !until() ) {
194  if( (slept = commit_wait( thr_ctx ) )==true )
195  if( until() ) break;
196  slept = false;
197  prepare_wait( thr_ctx, on() );
198  }
199  if( !slept )
200  cancel_wait( thr_ctx );
201 }
202 
203 template<typename P>
204 void concurrent_monitor::notify_relaxed( const P& predicate ) {
205  if( waitset_ec.empty() )
206  return;
207  waitset_t temp;
208  waitset_node_t* nxt;
209  const waitset_node_t* end = waitset_ec.end();
210  {
213  for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
214  nxt = n->prev;
215  thread_context* thr = to_thread_context( n );
216  if( predicate( thr->context ) ) {
217  waitset_ec.remove( *n );
218  thr->in_waitset = false;
219  temp.add( n );
220  }
221  }
222  }
223 
224  end = temp.end();
225  for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
226  nxt = n->next;
227  to_thread_context(n)->semaphore().V();
228  }
229 #if TBB_USE_ASSERT
230  temp.clear();
231 #endif
232 }
233 
234 } // namespace internal
235 } // namespace tbb
236 
237 #endif /* __TBB_concurrent_monitor_H */
tbb::internal::binary_semaphore::V
void V()
post/release
Definition: semaphore.h:240
spin_mutex.h
internal
Definition: _flow_graph_async_msg_impl.h:24
tbb::internal::concurrent_monitor::thread_context::context
uintptr_t context
Definition: concurrent_monitor.h:121
tbb::internal::concurrent_monitor::epoch
__TBB_atomic unsigned epoch
Definition: concurrent_monitor.h:183
tbb::internal::circular_doubly_linked_list_with_sentinel::end
const node_t * end() const
Definition: concurrent_monitor.h:51
end
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
Definition: ittnotify_static.h:182
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
tbb::internal::concurrent_monitor::notify_one_relaxed
void notify_one_relaxed()
Notify one thread about the event. Relaxed version.
Definition: concurrent_monitor.cpp:66
tbb::internal::concurrent_monitor::thread_context::__TBB_NOINLINE
__TBB_NOINLINE(void init())
The method for lazy initialization of the thread_context's semaphore.
tbb::internal::concurrent_monitor::thread_context::skipped_wakeup
bool skipped_wakeup
Definition: concurrent_monitor.h:118
tbb::internal::concurrent_monitor::thread_context::ready
bool ready
Definition: concurrent_monitor.h:120
tbb::internal::concurrent_monitor::mutex_ec
tbb::spin_mutex mutex_ec
Definition: concurrent_monitor.h:181
tbb::internal::throw_exception
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
Definition: tbb_exception.h:105
tbb::internal::concurrent_monitor::wait
void wait(WaitUntil until, Context on)
Wait for a condition to be satisfied with waiting-on context.
Definition: concurrent_monitor.h:188
tbb::internal::circular_doubly_linked_list_with_sentinel::empty
bool empty() const
Definition: concurrent_monitor.h:47
tbb
The graph class.
Definition: serial/tbb/parallel_for.h:46
tbb::internal::circular_doubly_linked_list_with_sentinel::clear
void clear()
Definition: concurrent_monitor.h:82
tbb::internal::concurrent_monitor::thread_context::semaphore
binary_semaphore & semaphore()
Definition: concurrent_monitor.h:109
tbb::internal::circular_doubly_linked_list_with_sentinel::node_t::prev
node_t * prev
Definition: concurrent_monitor.h:37
tbb::internal::circular_doubly_linked_list_with_sentinel::flush_to
void flush_to(circular_doubly_linked_list_with_sentinel &lst)
move all elements to 'lst' and initialize the 'this' list
Definition: concurrent_monitor.h:71
tbb::internal::concurrent_monitor::thread_context
Definition: concurrent_monitor.h:96
tbb::internal::circular_doubly_linked_list_with_sentinel::circular_doubly_linked_list_with_sentinel
circular_doubly_linked_list_with_sentinel()
Definition: concurrent_monitor.h:42
tbb::internal::concurrent_monitor::notify
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
Definition: concurrent_monitor.h:169
tbb::internal::concurrent_monitor::thread_context::aborted
bool aborted
Definition: concurrent_monitor.h:119
tbb::spin_mutex::scoped_lock
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
semaphore.h
tbb::internal::circular_doubly_linked_list_with_sentinel::node_t
Definition: concurrent_monitor.h:35
tbb::internal::circular_doubly_linked_list_with_sentinel::add
void add(node_t *n)
add to the back of the list
Definition: concurrent_monitor.h:54
aligned_space.h
tbb::internal::concurrent_monitor::notify_all_relaxed
void notify_all_relaxed()
Notify all waiting threads of the event; Relaxed version.
Definition: concurrent_monitor.cpp:84
__TBB_atomic
#define __TBB_atomic
Definition: tbb_stddef.h:237
tbb::internal::circular_doubly_linked_list_with_sentinel::remove
void remove(node_t &n)
remove node 'n'
Definition: concurrent_monitor.h:63
tbb::internal::circular_doubly_linked_list_with_sentinel::begin
node_t * begin() const
Definition: concurrent_monitor.h:50
tbb::internal::concurrent_monitor::thread_context::~thread_context
~thread_context()
Definition: concurrent_monitor.h:103
tbb::internal::__TBB_store_relaxed
void __TBB_store_relaxed(volatile T &location, V value)
Definition: tbb_machine.h:739
tbb::internal::concurrent_monitor::notify_one
void notify_one()
Notify one thread about the event.
Definition: concurrent_monitor.h:157
tbb::internal::circular_doubly_linked_list_with_sentinel::node_t::next
node_t * next
Definition: concurrent_monitor.h:36
tbb_exception.h
tbb::internal::binary_semaphore::~binary_semaphore
~binary_semaphore()
dtor
Definition: semaphore.h:230
tbb::internal::concurrent_monitor::commit_wait
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
Definition: concurrent_monitor.h:135
tbb::internal::concurrent_monitor::thread_context::in_waitset
tbb::atomic< bool > in_waitset
Definition: concurrent_monitor.h:117
tbb::internal::binary_semaphore::P
void P()
wait/acquire
Definition: semaphore.h:235
tbb::internal::concurrent_monitor::to_thread_context
thread_context * to_thread_context(waitset_node_t *n)
Definition: concurrent_monitor.h:184
tbb::internal::circular_doubly_linked_list_with_sentinel::head
node_t head
Definition: concurrent_monitor.h:85
atomic.h
tbb::internal::concurrent_monitor::waitset_ec
waitset_t waitset_ec
Definition: concurrent_monitor.h:182
tbb::internal::circular_doubly_linked_list_with_sentinel::count
__TBB_atomic size_t count
Definition: concurrent_monitor.h:84
tbb::internal::circular_doubly_linked_list_with_sentinel::node_t::node_t
node_t()
Definition: concurrent_monitor.h:38
tbb::internal::circular_doubly_linked_list_with_sentinel::front
node_t * front() const
Definition: concurrent_monitor.h:48
tbb::internal::no_copy
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
tbb::internal::waitset_t
circular_doubly_linked_list_with_sentinel waitset_t
Definition: concurrent_monitor.h:88
tbb::internal::concurrent_monitor::thread_context::sema
tbb::aligned_space< binary_semaphore > sema
Definition: concurrent_monitor.h:115
tbb::internal::concurrent_monitor::~concurrent_monitor
~concurrent_monitor()
dtor
Definition: concurrent_monitor.cpp:27
tbb::internal::concurrent_monitor::concurrent_monitor
concurrent_monitor()
ctor
Definition: concurrent_monitor.h:125
tbb::internal::circular_doubly_linked_list_with_sentinel::last
node_t * last() const
Definition: concurrent_monitor.h:49
tbb::internal::eid_user_abort
@ eid_user_abort
Definition: tbb_exception.h:85
tbb::internal::concurrent_monitor
concurrent_monitor
Definition: concurrent_monitor.h:93
tbb::internal::concurrent_monitor::abort_all
void abort_all()
Abort any sleeping threads at the time of the call.
Definition: concurrent_monitor.h:175
tbb::internal::concurrent_monitor::prepare_wait
void prepare_wait(thread_context &thr, uintptr_t ctx=0)
prepare wait by inserting 'thr' into the wait queue
Definition: concurrent_monitor.cpp:32
tbb::internal::concurrent_monitor::thread_context::thread_context
thread_context()
Definition: concurrent_monitor.h:99
tbb::internal::circular_doubly_linked_list_with_sentinel::size
size_t size() const
Definition: concurrent_monitor.h:46
tbb_stddef.h
tbb::internal::waitset_node_t
circular_doubly_linked_list_with_sentinel::node_t waitset_node_t
Definition: concurrent_monitor.h:89
tbb::internal::circular_doubly_linked_list_with_sentinel::~circular_doubly_linked_list_with_sentinel
~circular_doubly_linked_list_with_sentinel()
Definition: concurrent_monitor.h:44
tbb::atomic_fence
void atomic_fence()
Sequentially consistent full memory fence.
Definition: tbb_machine.h:339
tbb::internal::concurrent_monitor::abort_all_relaxed
void abort_all_relaxed()
Abort any sleeping threads at the time of the call; Relaxed version.
Definition: concurrent_monitor.cpp:107
tbb::internal::concurrent_monitor::notify_relaxed
void notify_relaxed(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate; Relaxed version.
Definition: concurrent_monitor.h:204
tbb::internal::concurrent_monitor::thread_context::epoch
__TBB_atomic unsigned epoch
Definition: concurrent_monitor.h:116
tbb::spin_mutex
A lock that occupies a single byte.
Definition: spin_mutex.h:39
tbb::internal::circular_doubly_linked_list_with_sentinel
Circular doubly-linked list with sentinel.
Definition: concurrent_monitor.h:33
tbb::internal::concurrent_monitor::notify_all
void notify_all()
Notify all waiting threads of the event.
Definition: concurrent_monitor.h:163
tbb::internal::binary_semaphore
binary_semaphore for concurrent monitor
Definition: semaphore.h:222
tbb::internal::concurrent_monitor::cancel_wait
void cancel_wait(thread_context &thr)
Cancel the wait. Removes the thread from the wait queue if not removed yet.
Definition: concurrent_monitor.cpp:50
tbb::internal::__TBB_load_relaxed
T __TBB_load_relaxed(const volatile T &location)
Definition: tbb_machine.h:735

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.