space_multi_thread_balance_quantity.cpp
Go to the documentation of this file.
1 
7 #include <unistd.h>
8 #include <cstring>
9 #include <argos3/core/simulator/simulator.h>
10 #include <argos3/core/utility/profiler/profiler.h>
12 
13 namespace argos {
14 
15  /****************************************/
16  /****************************************/
17 
20  pthread_mutex_t* ActConditionalMutex;
21  pthread_mutex_t* PhysicsConditionalMutex;
22  pthread_mutex_t* MediaConditionalMutex;
23  };
24 
25  static void CleanupUpdateThread(void* p_data) {
26  CSimulator& cSimulator = CSimulator::GetInstance();
27  if(cSimulator.IsProfiling()) {
29  }
30  SCleanupUpdateThreadData& sData =
31  *reinterpret_cast<SCleanupUpdateThreadData*>(p_data);
32  pthread_mutex_unlock(sData.SenseControlStepConditionalMutex);
33  pthread_mutex_unlock(sData.ActConditionalMutex);
34  pthread_mutex_unlock(sData.PhysicsConditionalMutex);
35  pthread_mutex_unlock(sData.MediaConditionalMutex);
36  }
37 
38  void* LaunchUpdateThreadBalanceQuantity(void* p_data) {
39  LOG.AddThreadSafeBuffer();
40  LOGERR.AddThreadSafeBuffer();
41  CSpaceMultiThreadBalanceQuantity::SUpdateThreadData* psData = reinterpret_cast<CSpaceMultiThreadBalanceQuantity::SUpdateThreadData*>(p_data);
42  psData->Space->UpdateThread(psData->ThreadId);
43  return NULL;
44  }
45 
46  /****************************************/
47  /****************************************/
48 
50  m_psUpdateThreadData(NULL),
51  m_ptUpdateThreads(NULL),
52  m_bIsControllableEntityAssignmentRecalculationNeeded(true) {}
53 
54  /****************************************/
55  /****************************************/
56 
58  /* Initialize the space */
59  CSpace::Init(t_tree);
60  /* Initialize thread related structures */
61  int nErrors;
62  /* First the counters */
63  m_unSenseControlStepPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
64  m_unActPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
65  m_unPhysicsPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
66  m_unMediaPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
67  /* Then the mutexes */
68  if((nErrors = pthread_mutex_init(&m_tSenseControlStepConditionalMutex, NULL)) ||
69  (nErrors = pthread_mutex_init(&m_tActConditionalMutex, NULL)) ||
70  (nErrors = pthread_mutex_init(&m_tPhysicsConditionalMutex, NULL)) ||
71  (nErrors = pthread_mutex_init(&m_tMediaConditionalMutex, NULL))) {
72  THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
73  }
74  /* Finally the conditionals */
75  if((nErrors = pthread_cond_init(&m_tSenseControlStepConditional, NULL)) ||
76  (nErrors = pthread_cond_init(&m_tActConditional, NULL)) ||
77  (nErrors = pthread_cond_init(&m_tPhysicsConditional, NULL)) ||
78  (nErrors = pthread_cond_init(&m_tMediaConditional, NULL))) {
79  THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
80  }
81  /* Start threads */
82  StartThreads();
83  }
84 
85  /****************************************/
86  /****************************************/
87 
88  void CSpaceMultiThreadBalanceQuantity::StartThreads() {
89  int nErrors;
90  /* Create the threads to update the controllable entities */
91  m_ptUpdateThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
92  m_psUpdateThreadData = new SUpdateThreadData*[CSimulator::GetInstance().GetNumThreads()];
93  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
94  /* Create the struct with the info to launch the thread */
95  m_psUpdateThreadData[i] = new SUpdateThreadData(i, this);
96  /* Create the thread */
97  if((nErrors = pthread_create(m_ptUpdateThreads + i,
98  NULL,
100  reinterpret_cast<void*>(m_psUpdateThreadData[i])))) {
101  THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
102  }
103  }
104  }
105 
106  /****************************************/
107  /****************************************/
108 
110  /* Destroy the threads to update the controllable entities */
111  int nErrors;
112  if(m_ptUpdateThreads != NULL) {
113  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
114  if((nErrors = pthread_cancel(m_ptUpdateThreads[i]))) {
115  THROW_ARGOSEXCEPTION("Error canceling controllable entities update threads " << ::strerror(nErrors));
116  }
117  }
118  void** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
119  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
120  if((nErrors = pthread_join(m_ptUpdateThreads[i], ppJoinResult + i))) {
121  THROW_ARGOSEXCEPTION("Error joining controllable entities update threads " << ::strerror(nErrors));
122  }
123  if(ppJoinResult[i] != PTHREAD_CANCELED) {
124  LOGERR << "[WARNING] Controllable entities update thread #" << i<< " not canceled" << std::endl;
125  }
126  }
127  delete[] ppJoinResult;
128  }
129  delete[] m_ptUpdateThreads;
130  /* Destroy the thread launch info */
131  if(m_psUpdateThreadData != NULL) {
132  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
133  delete m_psUpdateThreadData[i];
134  }
135  }
136  delete[] m_psUpdateThreadData;
137  pthread_mutex_destroy(&m_tSenseControlStepConditionalMutex);
138  pthread_mutex_destroy(&m_tActConditionalMutex);
139  pthread_mutex_destroy(&m_tPhysicsConditionalMutex);
140  pthread_mutex_destroy(&m_tMediaConditionalMutex);
141  pthread_cond_destroy(&m_tSenseControlStepConditional);
142  pthread_cond_destroy(&m_tActConditional);
143  pthread_cond_destroy(&m_tPhysicsConditional);
144  pthread_cond_destroy(&m_tMediaConditional);
145  /* Destroy the base space */
146  CSpace::Destroy();
147  }
148 
149  /****************************************/
150  /****************************************/
151 
153  m_bIsControllableEntityAssignmentRecalculationNeeded = true;
155  }
156 
157  /****************************************/
158  /****************************************/
159 
161  m_bIsControllableEntityAssignmentRecalculationNeeded = true;
163  }
164 
165  /****************************************/
166  /****************************************/
167 
168 #define MAIN_SEND_GO_FOR_PHASE(PHASE) \
169  LOG.Flush(); \
170  LOGERR.Flush(); \
171  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
172  m_un ## PHASE ## PhaseDoneCounter = 0; \
173  pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \
174  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
175 
176 #define MAIN_WAIT_FOR_PHASE_END(PHASE) \
177  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
178  while(m_un ## PHASE ## PhaseDoneCounter < CSimulator::GetInstance().GetNumThreads()) { \
179  pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
180  } \
181  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
182 
186  /* Avoid recalculation at the next time step */
187  m_bIsControllableEntityAssignmentRecalculationNeeded = false;
188  }
189 
190  /****************************************/
191  /****************************************/
192 
194  /* Update the physics engines */
195  MAIN_SEND_GO_FOR_PHASE(Physics);
196  MAIN_WAIT_FOR_PHASE_END(Physics);
197  /* Perform entity transfer from engine to engine, if needed */
198  for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
199  if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
200  (*m_ptPhysicsEngines)[i]->TransferEntities();
201  }
202  }
203  }
204 
205  /****************************************/
206  /****************************************/
207 
209  /* Update the media */
210  MAIN_SEND_GO_FOR_PHASE(Media);
212  }
213 
214  /****************************************/
215  /****************************************/
216 
218  MAIN_SEND_GO_FOR_PHASE(SenseControlStep);
219  MAIN_WAIT_FOR_PHASE_END(SenseControlStep);
220  /* Avoid recalculation at the next time step */
221  m_bIsControllableEntityAssignmentRecalculationNeeded = false;
222  }
223 
224  /****************************************/
225  /****************************************/
226 
227 #define THREAD_WAIT_FOR_GO_SIGNAL(PHASE) \
228  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
229  while(m_un ## PHASE ## PhaseDoneCounter == CSimulator::GetInstance().GetNumThreads()) { \
230  pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
231  } \
232  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
233  pthread_testcancel();
234 
235 #define THREAD_SIGNAL_PHASE_DONE(PHASE) \
236  pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
237  ++m_un ## PHASE ## PhaseDoneCounter; \
238  pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \
239  pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
240  pthread_testcancel();
241 
243  size_t un_tot_plugins) {
244  /* This is the minimum number of plugins assigned to a thread */
245  size_t unMinPortion = un_tot_plugins / CSimulator::GetInstance().GetNumThreads();
246  /* If the division has a remainder, the extra plugins must be assigned too */
247  size_t unExtraPortion = un_tot_plugins % CSimulator::GetInstance().GetNumThreads();
248  /* Calculate the range */
249  if(unMinPortion == 0) {
250  /* Not all threads get a plugin */
251  if(un_id < unExtraPortion) {
252  /* This thread does */
253  return CRange<size_t>(un_id, un_id+1);
254  }
255  else {
256  /* This thread does not */
257  return CRange<size_t>();
258  }
259  }
260  else {
261  /* For sure this thread will get unMinPortion plugins, does it get an extra too? */
262  if(un_id < unExtraPortion) {
263  /* Yes, it gets an extra */
264  return CRange<size_t>( un_id * (unMinPortion+1),
265  (un_id+1) * (unMinPortion+1));
266  }
267  else {
268  /* No, it doesn't get an extra */
269  return CRange<size_t>(unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion) * unMinPortion,
270  unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion+1) * unMinPortion);
271  }
272  }
273  }
274 
275  void CSpaceMultiThreadBalanceQuantity::UpdateThread(UInt32 un_id) {
276  /* Copy the id */
277  UInt32 unId = un_id;
278  /* Create cancellation data */
279  SCleanupUpdateThreadData sCancelData;
280  sCancelData.SenseControlStepConditionalMutex = &m_tSenseControlStepConditionalMutex;
281  sCancelData.ActConditionalMutex = &m_tActConditionalMutex;
282  sCancelData.PhysicsConditionalMutex = &m_tPhysicsConditionalMutex;
283  sCancelData.MediaConditionalMutex = &m_tMediaConditionalMutex;
284  pthread_cleanup_push(CleanupUpdateThread, &sCancelData);
285  /* Id range for the physics engines assigned to this thread */
286  CRange<size_t> cPhysicsRange = CalculatePluginRangeForThread(unId, m_ptPhysicsEngines->size());
287  /* Id range for the physics engines assigned to this thread */
288  CRange<size_t> cMediaRange = CalculatePluginRangeForThread(unId, m_ptMedia->size());
289  /* Variables storing the portion of entities to update */
290  CRange<size_t> cEntityRange;
291  while(1) {
293  /* Calculate the portion of entities to update, if needed */
294  if(m_bIsControllableEntityAssignmentRecalculationNeeded) {
295  cEntityRange = CalculatePluginRangeForThread(unId, m_vecControllableEntities.size());
296  }
297  /* Cope with the fact that there may be less entities than threads */
298  if(cEntityRange.GetSpan() > 0) {
299  /* This thread has entities */
300  /* Actuate control choices */
301  for(size_t i = cEntityRange.GetMin(); i < cEntityRange.GetMax(); ++i) {
302  if(m_vecControllableEntities[i]->IsEnabled())
303  m_vecControllableEntities[i]->Act();
304  }
305  pthread_testcancel();
307  }
308  else {
309  /* This thread has no entities -> dummy computation */
311  }
312  /* Update physics engines, if this thread has been assigned to them */
313  THREAD_WAIT_FOR_GO_SIGNAL(Physics);
314  if(cPhysicsRange.GetSpan() > 0) {
315  /* This thread has engines, update them */
316  for(size_t i = cPhysicsRange.GetMin(); i < cPhysicsRange.GetMax(); ++i) {
317  (*m_ptPhysicsEngines)[i]->Update();
318  }
319  pthread_testcancel();
320  THREAD_SIGNAL_PHASE_DONE(Physics);
321  }
322  else {
323  /* This thread has no engines -> dummy computation */
324  THREAD_SIGNAL_PHASE_DONE(Physics);
325  }
326  /* Update media, if this thread has been assigned to them */
328  if(cMediaRange.GetSpan() > 0) {
329  /* This thread has media, update them */
330  for(size_t i = cMediaRange.GetMin(); i < cMediaRange.GetMax(); ++i) {
331  (*m_ptMedia)[i]->Update();
332  }
333  pthread_testcancel();
335  }
336  else {
337  /* This thread has no media -> dummy computation */
339  }
340  /* Update sensor readings and call controllers */
341  THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep);
342  /* Cope with the fact that there may be less entities than threads */
343  if(cEntityRange.GetSpan() > 0) {
344  /* This thread has entities */
345  for(size_t i = cEntityRange.GetMin(); i < cEntityRange.GetMax(); ++i) {
346  if(m_vecControllableEntities[i]->IsEnabled()) {
347  m_vecControllableEntities[i]->Sense();
348  m_vecControllableEntities[i]->ControlStep();
349  }
350  }
351  pthread_testcancel();
352  THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
353  }
354  else {
355  /* This thread has no entities -> dummy computation */
356  THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
357  }
358  }
359  pthread_cleanup_pop(1);
360  }
361 
362  /****************************************/
363  /****************************************/
364 
365 }
virtual void RemoveControllableEntity(CControllableEntity &c_entity)
void * LaunchUpdateThreadBalanceQuantity(void *p_data)
An entity that contains a pointer to the user-defined controller.
virtual void RemoveControllableEntity(CControllableEntity &c_entity)
Definition: space.cpp:151
#define THREAD_SIGNAL_PHASE_DONE(PHASE)
CMedium::TVector * m_ptMedia
A pointer to the list of media.
Definition: space.h:454
virtual void AddControllableEntity(CControllableEntity &c_entity)
CARGoSLog LOG(std::cout, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_GREEN))
Definition: argos_log.h:179
#define THROW_ARGOSEXCEPTION(message)
This macro throws an ARGoS exception with the passed message.
UInt32 GetNumThreads() const
Returns the number of threads used during the experiment.
Definition: simulator.h:260
virtual void AddControllableEntity(CControllableEntity &c_entity)
Definition: space.cpp:144
ticpp::Element TConfigurationNode
The ARGoS configuration XML node.
virtual void Destroy()
Destroys the space and all its entities.
Definition: space.cpp:85
virtual void Destroy()
Destroys the space and all its entities.
#define MAIN_WAIT_FOR_PHASE_END(PHASE)
void CollectThreadResourceUsage()
Definition: profiler.cpp:172
unsigned int UInt32
32-bit unsigned integer.
Definition: datatypes.h:97
CPhysicsEngine::TVector * m_ptPhysicsEngines
A pointer to the list of physics engines.
Definition: space.h:451
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the section of the XML configuration file.
Definition: space.cpp:37
CARGoSLog LOGERR(std::cerr, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_RED))
Definition: argos_log.h:180
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the section of the XML configuration file.
CProfiler & GetProfiler()
Returns a reference to the profiler.
Definition: simulator.h:174
CRange< size_t > CalculatePluginRangeForThread(size_t un_id, size_t un_tot_plugins)
bool IsProfiling() const
Returns true if ARGoS is being profiled.
Definition: simulator.h:182
CControllableEntity::TVector m_vecControllableEntities
A vector of controllable entities.
Definition: space.h:445
The namespace containing all the ARGoS related code.
Definition: ci_actuator.h:12
#define MAIN_SEND_GO_FOR_PHASE(PHASE)
The core class of ARGOS.
Definition: simulator.h:62
static CSimulator & GetInstance()
Returns the instance to the CSimulator class.
Definition: simulator.cpp:78
#define THREAD_WAIT_FOR_GO_SIGNAL(PHASE)