space_multi_thread_balance_length.cpp
Go to the documentation of this file.
1 
8 #include <argos3/core/simulator/simulator.h>
9 #include <argos3/core/utility/profiler/profiler.h>
10 
11 namespace argos {
12 
13  /****************************************/
14  /****************************************/
15 
17  pthread_mutex_t* StartSenseControlPhaseMutex;
18  pthread_mutex_t* StartActPhaseMutex;
19  pthread_mutex_t* StartPhysicsPhaseMutex;
20  pthread_mutex_t* StartMediaPhaseMutex;
21  pthread_mutex_t* StartEntityIterPhaseMutex;
22  pthread_mutex_t* FetchTaskMutex;
23  };
24 
25  static void CleanupThread(void* p_data) {
26  CSimulator& cSimulator = CSimulator::GetInstance();
27  if(cSimulator.IsProfiling()) {
29  }
30  SCleanupThreadData& sData =
31  *reinterpret_cast<SCleanupThreadData*>(p_data);
32  pthread_mutex_unlock(sData.FetchTaskMutex);
33  pthread_mutex_unlock(sData.StartSenseControlPhaseMutex);
34  pthread_mutex_unlock(sData.StartActPhaseMutex);
35  pthread_mutex_unlock(sData.StartPhysicsPhaseMutex);
36  pthread_mutex_unlock(sData.StartMediaPhaseMutex);
37  pthread_mutex_unlock(sData.StartEntityIterPhaseMutex);
38  }
39 
40  void* LaunchThreadBalanceLength(void* p_data) {
41  /* Set up thread-safe buffers for this new thread */
42  LOG.AddThreadSafeBuffer();
43  LOGERR.AddThreadSafeBuffer();
44  /* Make this thread cancellable */
45  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
46  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, nullptr);
47  /* Get a handle to the thread launch data */
48  auto* psData = reinterpret_cast<CSpaceMultiThreadBalanceLength::SThreadLaunchData*>(p_data);
49  /* Create cancellation data */
50  SCleanupThreadData sCancelData;
51  sCancelData.StartSenseControlPhaseMutex = &(psData->Space->m_tStartSenseControlPhaseMutex);
52  sCancelData.StartActPhaseMutex = &(psData->Space->m_tStartActPhaseMutex);
53  sCancelData.StartPhysicsPhaseMutex = &(psData->Space->m_tStartPhysicsPhaseMutex);
54  sCancelData.StartMediaPhaseMutex = &(psData->Space->m_tStartMediaPhaseMutex);
55  sCancelData.StartEntityIterPhaseMutex = &(psData->Space->m_tStartEntityIterPhaseMutex);
56  sCancelData.FetchTaskMutex = &(psData->Space->m_tFetchTaskMutex);
57  pthread_cleanup_push(CleanupThread, &sCancelData);
58  psData->Space->SlaveThread();
59  /* Dispose of cancellation data */
60  pthread_cleanup_pop(1);
61  return nullptr;
62  }
63 
64  /****************************************/
65  /****************************************/
66 
68  /* Initialize the space */
69  CSpace::Init(t_tree);
70  /* Initialize thread related structures */
71  int nErrors;
72  /* Init mutexes */
73  if((nErrors = pthread_mutex_init(&m_tStartSenseControlPhaseMutex, nullptr)) ||
74  (nErrors = pthread_mutex_init(&m_tStartActPhaseMutex, nullptr)) ||
75  (nErrors = pthread_mutex_init(&m_tStartPhysicsPhaseMutex, nullptr)) ||
76  (nErrors = pthread_mutex_init(&m_tStartMediaPhaseMutex, nullptr)) ||
77  (nErrors = pthread_mutex_init(&m_tStartEntityIterPhaseMutex, nullptr)) ||
78  (nErrors = pthread_mutex_init(&m_tFetchTaskMutex, nullptr))) {
79  THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
80  }
81  /* Init conditionals */
82  if((nErrors = pthread_cond_init(&m_tStartSenseControlPhaseCond, nullptr)) ||
83  (nErrors = pthread_cond_init(&m_tStartActPhaseCond, nullptr)) ||
84  (nErrors = pthread_cond_init(&m_tStartPhysicsPhaseCond, nullptr)) ||
85  (nErrors = pthread_cond_init(&m_tStartMediaPhaseCond, nullptr)) ||
86  (nErrors = pthread_cond_init(&m_tStartEntityIterPhaseCond, nullptr)) ||
87  (nErrors = pthread_cond_init(&m_tFetchTaskCond, nullptr))) {
88  THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
89  }
90  /* Reset the idle thread count */
91  m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
92  m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
93  m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
94  m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
95  m_unEntityIterPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
96  /* Start threads */
97  StartThreads();
98  }
99 
100  /****************************************/
101  /****************************************/
102 
104  /* Destroy the threads to update the controllable entities */
105  int nErrors;
106  if(m_ptThreads != nullptr) {
107  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
108  if((nErrors = pthread_cancel(m_ptThreads[i]))) {
109  THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors));
110  }
111  }
112  auto** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
113  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
114  if((nErrors = pthread_join(m_ptThreads[i], ppJoinResult + i))) {
115  THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors));
116  }
117  if(ppJoinResult[i] != PTHREAD_CANCELED) {
118  LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl;
119  }
120  }
121  delete[] ppJoinResult;
122  }
123  delete[] m_ptThreads;
124  /* Destroy the thread launch info */
125  if(m_psThreadData != nullptr) {
126  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
127  delete m_psThreadData[i];
128  }
129  }
130  delete[] m_psThreadData;
131  pthread_mutex_destroy(&m_tStartSenseControlPhaseMutex);
132  pthread_mutex_destroy(&m_tStartActPhaseMutex);
133  pthread_mutex_destroy(&m_tStartPhysicsPhaseMutex);
134  pthread_mutex_destroy(&m_tStartMediaPhaseMutex);
135  pthread_mutex_destroy(&m_tStartEntityIterPhaseMutex);
136  pthread_mutex_destroy(&m_tFetchTaskMutex);
137 
138  pthread_cond_destroy(&m_tStartSenseControlPhaseCond);
139  pthread_cond_destroy(&m_tStartActPhaseCond);
140  pthread_cond_destroy(&m_tStartPhysicsPhaseCond);
141  pthread_cond_destroy(&m_tStartMediaPhaseCond);
142  pthread_cond_destroy(&m_tStartEntityIterPhaseCond);
143  pthread_cond_destroy(&m_tFetchTaskCond);
144 
145  /* Destroy the base space */
146  CSpace::Destroy();
147  }
148 
149  /****************************************/
150  /****************************************/
151 
153  /* Reset the idle thread count */
154  m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
155  m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
156  m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
157  m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
158  m_unEntityIterPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
159  /* Update the space */
160  CSpace::Update();
161  }
162 
163  /****************************************/
164  /****************************************/
165 
166 #define MAIN_START_PHASE(PHASE) \
167  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
168  m_un ## PHASE ## PhaseIdleCounter = 0; \
169  m_unTaskIndex = 0; \
170  pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
171  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
172 
173 #define MAIN_WAIT_FOR_END_OF(PHASE) \
174  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
175  while(m_un ## PHASE ## PhaseIdleCounter < CSimulator::GetInstance().GetNumThreads()) { \
176  pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
177  } \
178  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
179 
181  /* Act phase */
182  MAIN_START_PHASE(Act);
184  }
185 
186  /****************************************/
187  /****************************************/
188 
190  /* Physics phase */
191  MAIN_START_PHASE(Physics);
192  MAIN_WAIT_FOR_END_OF(Physics);
193  /* Perform entity transfer from engine to engine, if needed */
194  for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
195  if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
196  (*m_ptPhysicsEngines)[i]->TransferEntities();
197  }
198  }
199  }
200 
201  /****************************************/
202  /****************************************/
203 
205  /* Media phase */
206  MAIN_START_PHASE(Media);
207  MAIN_WAIT_FOR_END_OF(Media);
208  }
209 
210  /****************************************/
211  /****************************************/
212 
214  const TControllableEntityIterCBType &c_cb) {
216  /* Iterate over all robots in the swarm */
217  MAIN_START_PHASE(EntityIter);
218  MAIN_WAIT_FOR_END_OF(EntityIter);
219  } /* IterateOverControllableEntities() */
220 
221 
222  /****************************************/
223  /****************************************/
224 
226  /* Sense/control phase */
227  MAIN_START_PHASE(SenseControl);
228  MAIN_WAIT_FOR_END_OF(SenseControl);
229  }
230 
231  /****************************************/
232  /****************************************/
233 
234  void CSpaceMultiThreadBalanceLength::StartThreads() {
235  int nErrors;
236  /* Create the threads to update the controllable entities */
237  m_ptThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
238  m_psThreadData = new SThreadLaunchData*[CSimulator::GetInstance().GetNumThreads()];
239  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
240  /* Create the struct with the info to launch the thread */
241  m_psThreadData[i] = new SThreadLaunchData(i, this);
242  /* Create the thread */
243  if((nErrors = pthread_create(m_ptThreads + i,
244  nullptr,
246  reinterpret_cast<void*>(m_psThreadData[i])))) {
247  THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
248  }
249  }
250  }
251 
252  /****************************************/
253  /****************************************/
254 
255 #define THREAD_WAIT_FOR_START_OF(PHASE) \
256  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
257  while(m_un ## PHASE ## PhaseIdleCounter == CSimulator::GetInstance().GetNumThreads()) { \
258  pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
259  } \
260  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
261  pthread_testcancel();
262 
263 #define THREAD_PERFORM_TASK(PHASE, TASKVEC, CONDITION, SNIPPET) \
264  while(1) { \
265  pthread_mutex_lock(&m_tFetchTaskMutex); \
266  if((CONDITION) && m_unTaskIndex < (TASKVEC).size()) { \
267  unTaskIndex = m_unTaskIndex; \
268  ++m_unTaskIndex; \
269  pthread_mutex_unlock(&m_tFetchTaskMutex); \
270  pthread_testcancel(); \
271  { \
272  SNIPPET; \
273  } \
274  pthread_testcancel(); \
275  } \
276  else { \
277  pthread_mutex_unlock(&m_tFetchTaskMutex); \
278  pthread_testcancel(); \
279  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
280  ++m_un ## PHASE ## PhaseIdleCounter; \
281  pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
282  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
283  pthread_testcancel(); \
284  break; \
285  } \
286  } \
287  pthread_testcancel();
288 
289  void CSpaceMultiThreadBalanceLength::SlaveThread() {
290  /* Task index */
291  size_t unTaskIndex;
292  while(1) {
295  Act,
297  true,
298  if(m_vecControllableEntities[unTaskIndex]->IsEnabled()) m_vecControllableEntities[unTaskIndex]->Act();
299  );
300  THREAD_WAIT_FOR_START_OF(Physics);
302  Physics,
304  true,
305  (*m_ptPhysicsEngines)[unTaskIndex]->Update();
306  );
309  Media,
310  *m_ptMedia,
311  true,
312  (*m_ptMedia)[unTaskIndex]->Update();
313  );
314  /* loop functions PreStep() */
315  THREAD_WAIT_FOR_START_OF(EntityIter);
317  EntityIter,
321  THREAD_WAIT_FOR_START_OF(SenseControl);
323  SenseControl,
325  true,
326  if(m_vecControllableEntities[unTaskIndex]->IsEnabled()) {
327  m_vecControllableEntities[unTaskIndex]->Sense();
328  m_vecControllableEntities[unTaskIndex]->ControlStep();
329  }
330  );
331  /* loop functions PostStep() */
332  THREAD_WAIT_FOR_START_OF(EntityIter);
334  EntityIter,
338  } /* while(1) */
339  }
340 
341  /****************************************/
342  /****************************************/
343 
344 }
#define THREAD_WAIT_FOR_START_OF(PHASE)
#define MAIN_WAIT_FOR_END_OF(PHASE)
#define THREAD_PERFORM_TASK(PHASE, TASKVEC, CONDITION, SNIPPET)
#define MAIN_START_PHASE(PHASE)
#define THROW_ARGOSEXCEPTION(message)
This macro throws an ARGoS exception with the passed message.
unsigned int UInt32
32-bit unsigned integer.
Definition: datatypes.h:97
The namespace containing all the ARGoS related code.
Definition: ci_actuator.h:12
CARGoSLog LOGERR(std::cerr, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_RED))
Definition: argos_log.h:180
ticpp::Element TConfigurationNode
The ARGoS configuration XML node.
CARGoSLog LOG(std::cout, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_GREEN))
Definition: argos_log.h:179
void * LaunchThreadBalanceLength(void *p_data)
The core class of ARGOS.
Definition: simulator.h:62
CProfiler & GetProfiler()
Returns a reference to the profiler.
Definition: simulator.h:174
static CSimulator & GetInstance()
Returns the instance to the CSimulator class.
Definition: simulator.cpp:78
UInt32 GetNumThreads() const
Returns the number of threads used during the experiment.
Definition: simulator.h:260
bool IsProfiling() const
Returns true if ARGoS is being profiled.
Definition: simulator.h:182
CControllableEntity::TVector m_vecControllableEntities
A vector of controllable entities.
Definition: space.h:491
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
Definition: space.cpp:37
virtual void Destroy()
Destroys the space and all its entities.
Definition: space.cpp:85
bool ControllableEntityIterationEnabled() const
Definition: space.h:449
virtual void Update()
Updates the space.
Definition: space.cpp:119
CPhysicsEngine::TVector * m_ptPhysicsEngines
A pointer to the list of physics engines.
Definition: space.h:497
TControllableEntityIterCBType m_cbControllableEntityIter
Callback for iterating over entities from within the loop functions.
Definition: space.h:503
CMedium::TVector * m_ptMedia
A pointer to the list of media.
Definition: space.h:500
std::function< void(CControllableEntity *)> TControllableEntityIterCBType
The callback type for iteration over controllable entities within the PreStep() and/or PostStep() par...
Definition: space.h:90
virtual void IterateOverControllableEntities(const TControllableEntityIterCBType &c_cb)
Given a callback specified in the loop functions, iterate over all controllable entities currently pr...
virtual void Destroy()
Destroys the space and all its entities.
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
void CollectThreadResourceUsage()
Definition: profiler.cpp:172