space_multi_thread_balance_quantity.cpp
Go to the documentation of this file.
1 
7 #include <unistd.h>
8 #include <cstring>
9 #include <argos3/core/simulator/simulator.h>
10 #include <argos3/core/utility/profiler/profiler.h>
12 
13 namespace argos {
14 
15  /****************************************/
16  /****************************************/
17 
20  pthread_mutex_t* ActConditionalMutex;
21  pthread_mutex_t* PhysicsConditionalMutex;
22  pthread_mutex_t* MediaConditionalMutex;
23  pthread_mutex_t* EntityIterConditionalMutex;
24  };
25 
26  static void CleanupUpdateThread(void* p_data) {
27  CSimulator& cSimulator = CSimulator::GetInstance();
28  if(cSimulator.IsProfiling()) {
30  }
31  SCleanupUpdateThreadData& sData =
32  *reinterpret_cast<SCleanupUpdateThreadData*>(p_data);
33  pthread_mutex_unlock(sData.SenseControlStepConditionalMutex);
34  pthread_mutex_unlock(sData.ActConditionalMutex);
35  pthread_mutex_unlock(sData.PhysicsConditionalMutex);
36  pthread_mutex_unlock(sData.MediaConditionalMutex);
37  pthread_mutex_unlock(sData.EntityIterConditionalMutex);
38  }
39 
40  void* LaunchUpdateThreadBalanceQuantity(void* p_data) {
41  LOG.AddThreadSafeBuffer();
42  LOGERR.AddThreadSafeBuffer();
43  auto* psData = reinterpret_cast<CSpaceMultiThreadBalanceQuantity::SUpdateThreadData*>(p_data);
44  psData->Space->UpdateThread(psData->ThreadId);
45  return nullptr;
46  }
47 
48  /****************************************/
49  /****************************************/
50 
52  m_psUpdateThreadData(nullptr),
53  m_ptUpdateThreads(nullptr),
54  m_bIsControllableEntityAssignmentRecalculationNeeded(true) {}
55 
56  /****************************************/
57  /****************************************/
58 
60  /* Initialize the space */
61  CSpace::Init(t_tree);
62  /* Initialize thread related structures */
63  int nErrors;
64  /* First the counters */
65  m_unSenseControlStepPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
66  m_unActPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
67  m_unPhysicsPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
68  m_unMediaPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
69  m_unEntityIterPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
70 
71  /* Then the mutexes */
72  if((nErrors = pthread_mutex_init(&m_tSenseControlStepConditionalMutex, nullptr)) ||
73  (nErrors = pthread_mutex_init(&m_tActConditionalMutex, nullptr)) ||
74  (nErrors = pthread_mutex_init(&m_tPhysicsConditionalMutex, nullptr)) ||
75  (nErrors = pthread_mutex_init(&m_tMediaConditionalMutex, nullptr)) ||
76  (nErrors = pthread_mutex_init(&m_tEntityIterConditionalMutex, nullptr))) {
77  THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
78  }
79  /* Finally the conditionals */
80  if((nErrors = pthread_cond_init(&m_tSenseControlStepConditional, nullptr)) ||
81  (nErrors = pthread_cond_init(&m_tActConditional, nullptr)) ||
82  (nErrors = pthread_cond_init(&m_tPhysicsConditional, nullptr)) ||
83  (nErrors = pthread_cond_init(&m_tMediaConditional, nullptr)) ||
84  (nErrors = pthread_cond_init(&m_tEntityIterConditional, nullptr))) {
85  THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
86  }
87  /* Start threads */
88  StartThreads();
89  }
90 
91  /****************************************/
92  /****************************************/
93 
94  void CSpaceMultiThreadBalanceQuantity::StartThreads() {
95  int nErrors;
96  /* Create the threads to update the controllable entities */
97  m_ptUpdateThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
98  m_psUpdateThreadData = new SUpdateThreadData*[CSimulator::GetInstance().GetNumThreads()];
99  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
100  /* Create the struct with the info to launch the thread */
101  m_psUpdateThreadData[i] = new SUpdateThreadData(i, this);
102  /* Create the thread */
103  if((nErrors = pthread_create(m_ptUpdateThreads + i,
104  nullptr,
106  reinterpret_cast<void*>(m_psUpdateThreadData[i])))) {
107  THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
108  }
109  }
110  }
111 
112  /****************************************/
113  /****************************************/
114 
116  /* Destroy the threads to update the controllable entities */
117  int nErrors;
118  if(m_ptUpdateThreads != nullptr) {
119  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
120  if((nErrors = pthread_cancel(m_ptUpdateThreads[i]))) {
121  THROW_ARGOSEXCEPTION("Error canceling controllable entities update threads " << ::strerror(nErrors));
122  }
123  }
124  auto** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
125  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
126  if((nErrors = pthread_join(m_ptUpdateThreads[i], ppJoinResult + i))) {
127  THROW_ARGOSEXCEPTION("Error joining controllable entities update threads " << ::strerror(nErrors));
128  }
129  if(ppJoinResult[i] != PTHREAD_CANCELED) {
130  LOGERR << "[WARNING] Controllable entities update thread #" << i<< " not canceled" << std::endl;
131  }
132  }
133  delete[] ppJoinResult;
134  }
135  delete[] m_ptUpdateThreads;
136  /* Destroy the thread launch info */
137  if(m_psUpdateThreadData != nullptr) {
138  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
139  delete m_psUpdateThreadData[i];
140  }
141  }
142  delete[] m_psUpdateThreadData;
143  pthread_mutex_destroy(&m_tSenseControlStepConditionalMutex);
144  pthread_mutex_destroy(&m_tActConditionalMutex);
145  pthread_mutex_destroy(&m_tPhysicsConditionalMutex);
146  pthread_mutex_destroy(&m_tMediaConditionalMutex);
147  pthread_mutex_destroy(&m_tEntityIterConditionalMutex);
148 
149  pthread_cond_destroy(&m_tSenseControlStepConditional);
150  pthread_cond_destroy(&m_tActConditional);
151  pthread_cond_destroy(&m_tPhysicsConditional);
152  pthread_cond_destroy(&m_tMediaConditional);
153  pthread_cond_destroy(&m_tEntityIterConditional);
154 
155  /* Destroy the base space */
156  CSpace::Destroy();
157  }
158 
159  /****************************************/
160  /****************************************/
161 
163  m_bIsControllableEntityAssignmentRecalculationNeeded = true;
165  }
166 
167  /****************************************/
168  /****************************************/
169 
171  m_bIsControllableEntityAssignmentRecalculationNeeded = true;
173  }
174 
175  /****************************************/
176  /****************************************/
177 
178 #define MAIN_SEND_GO_FOR_PHASE(PHASE) \
179  LOG.Flush(); \
180  LOGERR.Flush(); \
181  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
182  m_un ## PHASE ## PhaseDoneCounter = 0; \
183  pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \
184  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
185 
186 #define MAIN_WAIT_FOR_PHASE_END(PHASE) \
187  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
188  while(m_un ## PHASE ## PhaseDoneCounter < CSimulator::GetInstance().GetNumThreads()) { \
189  pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
190  } \
191  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
192 
196  /* Avoid recalculation at the next time step */
197  m_bIsControllableEntityAssignmentRecalculationNeeded = false;
198  }
199 
200  /****************************************/
201  /****************************************/
202 
204  /* Update the physics engines */
205  MAIN_SEND_GO_FOR_PHASE(Physics);
206  MAIN_WAIT_FOR_PHASE_END(Physics);
207  /* Perform entity transfer from engine to engine, if needed */
208  for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
209  if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
210  (*m_ptPhysicsEngines)[i]->TransferEntities();
211  }
212  }
213  }
214 
215  /****************************************/
216  /****************************************/
217 
219  /* Update the media */
220  MAIN_SEND_GO_FOR_PHASE(Media);
222  }
223 
224  /****************************************/
225  /****************************************/
226 
228  const TControllableEntityIterCBType &c_cb) {
230 
231  /* Iterate over all robots in the swarm */
232  MAIN_SEND_GO_FOR_PHASE(EntityIter);
233  MAIN_WAIT_FOR_PHASE_END(EntityIter);
234  } /* IterateOverControllableEntities() */
235 
236  /****************************************/
237  /****************************************/
238 
239  void CSpaceMultiThreadBalanceQuantity::ControllableEntityIterationWaitAbort(void) {
241  } /* ControllableEntitiesIterationWaitAbort() */
242 
243 
244  /****************************************/
245  /****************************************/
246 
248  MAIN_SEND_GO_FOR_PHASE(SenseControlStep);
249  MAIN_WAIT_FOR_PHASE_END(SenseControlStep);
250  /* Avoid recalculation at the next time step */
251  m_bIsControllableEntityAssignmentRecalculationNeeded = false;
252  }
253 
254  /****************************************/
255  /****************************************/
256 
257 #define THREAD_WAIT_FOR_GO_SIGNAL(PHASE) \
258  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
259  while(m_un ## PHASE ## PhaseDoneCounter == CSimulator::GetInstance().GetNumThreads()) { \
260  pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
261  } \
262  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
263  pthread_testcancel();
264 
265 #define THREAD_SIGNAL_PHASE_DONE(PHASE) \
266  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
267  ++m_un ## PHASE ## PhaseDoneCounter; \
268  pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \
269  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
270  pthread_testcancel();
271 
273  size_t un_tot_plugins) {
274  /* This is the minimum number of plugins assigned to a thread */
275  size_t unMinPortion = un_tot_plugins / CSimulator::GetInstance().GetNumThreads();
276  /* If the division has a remainder, the extra plugins must be assigned too */
277  size_t unExtraPortion = un_tot_plugins % CSimulator::GetInstance().GetNumThreads();
278  /* Calculate the range */
279  if(unMinPortion == 0) {
280  /* Not all threads get a plugin */
281  if(un_id < unExtraPortion) {
282  /* This thread does */
283  return CRange<size_t>(un_id, un_id+1);
284  }
285  else {
286  /* This thread does not */
287  return CRange<size_t>();
288  }
289  }
290  else {
291  /* For sure this thread will get unMinPortion plugins, does it get an extra too? */
292  if(un_id < unExtraPortion) {
293  /* Yes, it gets an extra */
294  return CRange<size_t>( un_id * (unMinPortion+1),
295  (un_id+1) * (unMinPortion+1));
296  }
297  else {
298  /* No, it doesn't get an extra */
299  return CRange<size_t>(unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion) * unMinPortion,
300  unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion+1) * unMinPortion);
301  }
302  }
303  }
304 
305  void CSpaceMultiThreadBalanceQuantity::UpdateThread(UInt32 un_id) {
306  /* Copy the id */
307  UInt32 unId = un_id;
308  /* Create cancellation data */
309  SCleanupUpdateThreadData sCancelData;
310  sCancelData.SenseControlStepConditionalMutex = &m_tSenseControlStepConditionalMutex;
311  sCancelData.ActConditionalMutex = &m_tActConditionalMutex;
312  sCancelData.PhysicsConditionalMutex = &m_tPhysicsConditionalMutex;
313  sCancelData.MediaConditionalMutex = &m_tMediaConditionalMutex;
314  sCancelData.EntityIterConditionalMutex = &m_tEntityIterConditionalMutex;
315 
316  pthread_cleanup_push(CleanupUpdateThread, &sCancelData);
317 
318  /* Id range for the physics engines assigned to this thread */
319  CRange<size_t> cPhysicsRange = CalculatePluginRangeForThread(unId,
320  m_ptPhysicsEngines->size());
321  /* Id range for the media assigned to this thread */
322  CRange<size_t> cMediaRange = CalculatePluginRangeForThread(unId,
323  m_ptMedia->size());
324 
325  /*
326  * Id range for the entities to update assigned to this thread. Can change
327  * as simulation progressesso periodic re-calculation may be necessary
328  * before *ANY* phase which iterates over the entities, as the
329  * CLoopFunctions::PreStep()/CLoopFunctions::PostStep() may have
330  * added/removed entities.
331  */
332  CRange<size_t> cEntityRange;
333  while (1) {
334  /* Actuate entities assigned to this thread */
335  UpdateThreadEntityAct(un_id, cEntityRange);
336 
337  /* Update physics engines assigned to this thread */
338  UpdateThreadPhysics(cPhysicsRange);
339 
340  /* Update media assigned to this thread */
341  UpdateThreadMedia(cMediaRange);
342 
343  /* loop functions PreStep() iteration (maybe) */
344  UpdateThreadIterateOverEntities(un_id, cEntityRange);
345 
346  /* Update sensor readings/execute control step for entities */
347  UpdateThreadEntitySenseControl(un_id, cEntityRange);
348 
349  /* loop functions PostStep() iteration (maybe) */
350  UpdateThreadIterateOverEntities(un_id, cEntityRange);
351  } /* while(1) */
352 
353  pthread_cleanup_pop(1);
354  } /* UpdateThread */
355 
356  /****************************************/
357  /****************************************/
358 
359  void CSpaceMultiThreadBalanceQuantity::UpdateThreadEntityAct(UInt32 un_id,
360  CRange<size_t>& c_range) {
362  /* Calculate the portion of entities to update, if needed */
363  if (m_bIsControllableEntityAssignmentRecalculationNeeded) {
364  c_range = CalculatePluginRangeForThread(un_id,
366  }
367  /* Cope with the fact that there may be less entities than threads */
368  if (c_range.GetSpan() > 0) {
369  /* This thread has entities */
370  /* Actuate control choices */
371  for(size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
372  if(m_vecControllableEntities[i]->IsEnabled())
373  m_vecControllableEntities[i]->Act();
374  }
375  pthread_testcancel();
377  }
378  else {
379  /* This thread has no entities -> dummy computation */
381  }
382  } /* UpdateThreadEntityAct() */
383 
384  /****************************************/
385  /****************************************/
386 
387  void CSpaceMultiThreadBalanceQuantity::UpdateThreadPhysics(
388  const CRange<size_t>& c_range) {
389  /* Update physics engines, if this thread has been assigned to them */
390  THREAD_WAIT_FOR_GO_SIGNAL(Physics);
391  if (c_range.GetSpan() > 0) {
392  /* This thread has engines, update them */
393  for (size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
394  (*m_ptPhysicsEngines)[i]->Update();
395  }
396  pthread_testcancel();
397  THREAD_SIGNAL_PHASE_DONE(Physics);
398  }
399  else {
400  /* This thread has no engines -> dummy computation */
401  THREAD_SIGNAL_PHASE_DONE(Physics);
402  }
403  } /* UpdateThreadPhysics() */
404 
405  /****************************************/
406  /****************************************/
407 
408  void CSpaceMultiThreadBalanceQuantity::UpdateThreadMedia(
409  const CRange<size_t>& c_range) {
410  /* Update media, if this thread has been assigned to them */
412  if(c_range.GetSpan() > 0) {
413  /* This thread has media, update them */
414  for(size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
415  (*m_ptMedia)[i]->Update();
416  }
417  pthread_testcancel();
419  }
420  else {
421  /* This thread has no media -> dummy computation */
423  }
424  } /* UpdateThreadMedia() */
425 
426  /****************************************/
427  /****************************************/
428 
429  void CSpaceMultiThreadBalanceQuantity::UpdateThreadIterateOverEntities(UInt32 un_id,
430  CRange<size_t>& c_range) {
431  THREAD_WAIT_FOR_GO_SIGNAL(EntityIter);
432  /* Calculate the portion of entities to update, if needed */
433  if (m_bIsControllableEntityAssignmentRecalculationNeeded) {
434  c_range = CalculatePluginRangeForThread(un_id,
436  }
437  /* Cope with the fact that there may be less entities than threads */
438  if (c_range.GetSpan() > 0 && ControllableEntityIterationEnabled()) {
439  /* This thread has entities */
440  for (size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
442  } /* for(i...) */
443  pthread_testcancel();
444  THREAD_SIGNAL_PHASE_DONE(EntityIter);
445  }
446  else {
447  THREAD_SIGNAL_PHASE_DONE(EntityIter);
448  }
449  } /* UpdateThreadIterateOverEntities() */
450 
451  /****************************************/
452  /****************************************/
453 
454  void CSpaceMultiThreadBalanceQuantity::UpdateThreadEntitySenseControl(UInt32 un_id,
455  CRange<size_t>& c_range) {
456  /* Update sensor readings and call controllers */
457  THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep);
458 
459  /* Calculate the portion of entities to update, if needed */
460  if (m_bIsControllableEntityAssignmentRecalculationNeeded) {
461  c_range = CalculatePluginRangeForThread(un_id,
463  }
464  /* Cope with the fact that there may be less entities than threads */
465  if (c_range.GetSpan() > 0) {
466  /* This thread has entities */
467  for (size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
468  if (m_vecControllableEntities[i]->IsEnabled()) {
469  m_vecControllableEntities[i]->Sense();
470  m_vecControllableEntities[i]->ControlStep();
471  }
472  }
473  pthread_testcancel();
474  THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
475  }
476  else {
477  /* This thread has no entities -> dummy computation */
478  THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
479  }
480  } /* UpdateThreadEntitySenseControl() */
481 
482  /****************************************/
483  /****************************************/
484 }
#define THREAD_WAIT_FOR_GO_SIGNAL(PHASE)
#define MAIN_SEND_GO_FOR_PHASE(PHASE)
#define MAIN_WAIT_FOR_PHASE_END(PHASE)
#define THREAD_SIGNAL_PHASE_DONE(PHASE)
#define THROW_ARGOSEXCEPTION(message)
This macro throws an ARGoS exception with the passed message.
unsigned int UInt32
32-bit unsigned integer.
Definition: datatypes.h:97
The namespace containing all the ARGoS related code.
Definition: ci_actuator.h:12
CRange< size_t > CalculatePluginRangeForThread(size_t un_id, size_t un_tot_plugins)
CARGoSLog LOGERR(std::cerr, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_RED))
Definition: argos_log.h:180
ticpp::Element TConfigurationNode
The ARGoS configuration XML node.
CARGoSLog LOG(std::cout, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_GREEN))
Definition: argos_log.h:179
void * LaunchUpdateThreadBalanceQuantity(void *p_data)
An entity that contains a pointer to the user-defined controller.
The core class of ARGOS.
Definition: simulator.h:62
CProfiler & GetProfiler()
Returns a reference to the profiler.
Definition: simulator.h:174
static CSimulator & GetInstance()
Returns the instance to the CSimulator class.
Definition: simulator.cpp:78
UInt32 GetNumThreads() const
Returns the number of threads used during the experiment.
Definition: simulator.h:260
bool IsProfiling() const
Returns true if ARGoS is being profiled.
Definition: simulator.h:182
CControllableEntity::TVector m_vecControllableEntities
A vector of controllable entities.
Definition: space.h:491
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
Definition: space.cpp:37
virtual void Destroy()
Destroys the space and all its entities.
Definition: space.cpp:85
bool ControllableEntityIterationEnabled() const
Definition: space.h:449
CPhysicsEngine::TVector * m_ptPhysicsEngines
A pointer to the list of physics engines.
Definition: space.h:497
TControllableEntityIterCBType m_cbControllableEntityIter
Callback for iterating over entities from within the loop functions.
Definition: space.h:503
CMedium::TVector * m_ptMedia
A pointer to the list of media.
Definition: space.h:500
virtual void AddControllableEntity(CControllableEntity &c_entity)
Definition: space.cpp:167
std::function< void(CControllableEntity *)> TControllableEntityIterCBType
The callback type for iteration over controllable entities within the PreStep() and/or PostStep() par...
Definition: space.h:90
virtual void RemoveControllableEntity(CControllableEntity &c_entity)
Definition: space.cpp:174
virtual void AddControllableEntity(CControllableEntity &c_entity)
virtual void Destroy()
Destroys the space and all its entities.
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
virtual void IterateOverControllableEntities(const TControllableEntityIterCBType &c_cb)
Given a callback specified in the loop functions, iterate over all controllable entities currently pr...
virtual void RemoveControllableEntity(CControllableEntity &c_entity)
void CollectThreadResourceUsage()
Definition: profiler.cpp:172