space_multi_thread_balance_length.cpp
Go to the documentation of this file.
1 
8 #include <argos3/core/simulator/simulator.h>
9 #include <argos3/core/utility/profiler/profiler.h>
10 
11 namespace argos {
12 
13  /****************************************/
14  /****************************************/
15 
17  pthread_mutex_t* StartSenseControlPhaseMutex;
18  pthread_mutex_t* StartActPhaseMutex;
19  pthread_mutex_t* StartPhysicsPhaseMutex;
20  pthread_mutex_t* StartMediaPhaseMutex;
21  pthread_mutex_t* FetchTaskMutex;
22  };
23 
24  static void CleanupThread(void* p_data) {
25  CSimulator& cSimulator = CSimulator::GetInstance();
26  if(cSimulator.IsProfiling()) {
28  }
29  SCleanupThreadData& sData =
30  *reinterpret_cast<SCleanupThreadData*>(p_data);
31  pthread_mutex_unlock(sData.FetchTaskMutex);
32  pthread_mutex_unlock(sData.StartSenseControlPhaseMutex);
33  pthread_mutex_unlock(sData.StartActPhaseMutex);
34  pthread_mutex_unlock(sData.StartPhysicsPhaseMutex);
35  pthread_mutex_unlock(sData.StartMediaPhaseMutex);
36  }
37 
38  void* LaunchThreadBalanceLength(void* p_data) {
39  /* Set up thread-safe buffers for this new thread */
40  LOG.AddThreadSafeBuffer();
41  LOGERR.AddThreadSafeBuffer();
42  /* Make this thread cancellable */
43  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
44  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
45  /* Get a handle to the thread launch data */
46  CSpaceMultiThreadBalanceLength::SThreadLaunchData* psData = reinterpret_cast<CSpaceMultiThreadBalanceLength::SThreadLaunchData*>(p_data);
47  /* Create cancellation data */
48  SCleanupThreadData sCancelData;
49  sCancelData.StartSenseControlPhaseMutex = &(psData->Space->m_tStartSenseControlPhaseMutex);
50  sCancelData.StartActPhaseMutex = &(psData->Space->m_tStartActPhaseMutex);
51  sCancelData.StartPhysicsPhaseMutex = &(psData->Space->m_tStartPhysicsPhaseMutex);
52  sCancelData.StartMediaPhaseMutex = &(psData->Space->m_tStartMediaPhaseMutex);
53  sCancelData.FetchTaskMutex = &(psData->Space->m_tFetchTaskMutex);
54  pthread_cleanup_push(CleanupThread, &sCancelData);
55  psData->Space->SlaveThread();
56  /* Dispose of cancellation data */
57  pthread_cleanup_pop(1);
58  return NULL;
59  }
60 
61  /****************************************/
62  /****************************************/
63 
65  /* Initialize the space */
66  CSpace::Init(t_tree);
67  /* Initialize thread related structures */
68  int nErrors;
69  /* Init mutexes */
70  if((nErrors = pthread_mutex_init(&m_tStartSenseControlPhaseMutex, NULL)) ||
71  (nErrors = pthread_mutex_init(&m_tStartActPhaseMutex, NULL)) ||
72  (nErrors = pthread_mutex_init(&m_tStartPhysicsPhaseMutex, NULL)) ||
73  (nErrors = pthread_mutex_init(&m_tStartMediaPhaseMutex, NULL)) ||
74  (nErrors = pthread_mutex_init(&m_tFetchTaskMutex, NULL))) {
75  THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
76  }
77  /* Init conditionals */
78  if((nErrors = pthread_cond_init(&m_tStartSenseControlPhaseCond, NULL)) ||
79  (nErrors = pthread_cond_init(&m_tStartActPhaseCond, NULL)) ||
80  (nErrors = pthread_cond_init(&m_tStartPhysicsPhaseCond, NULL)) ||
81  (nErrors = pthread_cond_init(&m_tStartMediaPhaseCond, NULL)) ||
82  (nErrors = pthread_cond_init(&m_tFetchTaskCond, NULL))) {
83  THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
84  }
85  /* Reset the idle thread count */
86  m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
87  m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
88  m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
89  m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
90  /* Start threads */
91  StartThreads();
92  }
93 
94  /****************************************/
95  /****************************************/
96 
98  /* Destroy the threads to update the controllable entities */
99  int nErrors;
100  if(m_ptThreads != NULL) {
101  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
102  if((nErrors = pthread_cancel(m_ptThreads[i]))) {
103  THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors));
104  }
105  }
106  void** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
107  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
108  if((nErrors = pthread_join(m_ptThreads[i], ppJoinResult + i))) {
109  THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors));
110  }
111  if(ppJoinResult[i] != PTHREAD_CANCELED) {
112  LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl;
113  }
114  }
115  delete[] ppJoinResult;
116  }
117  delete[] m_ptThreads;
118  /* Destroy the thread launch info */
119  if(m_psThreadData != NULL) {
120  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
121  delete m_psThreadData[i];
122  }
123  }
124  delete[] m_psThreadData;
125  pthread_mutex_destroy(&m_tStartSenseControlPhaseMutex);
126  pthread_mutex_destroy(&m_tStartActPhaseMutex);
127  pthread_mutex_destroy(&m_tStartPhysicsPhaseMutex);
128  pthread_mutex_destroy(&m_tStartMediaPhaseMutex);
129  pthread_mutex_destroy(&m_tFetchTaskMutex);
130  pthread_cond_destroy(&m_tStartSenseControlPhaseCond);
131  pthread_cond_destroy(&m_tStartActPhaseCond);
132  pthread_cond_destroy(&m_tStartPhysicsPhaseCond);
133  pthread_cond_destroy(&m_tStartMediaPhaseCond);
134  pthread_cond_destroy(&m_tFetchTaskCond);
135 
136  /* Destroy the base space */
137  CSpace::Destroy();
138  }
139 
140  /****************************************/
141  /****************************************/
142 
144  /* Reset the idle thread count */
145  m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
146  m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
147  m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
148  m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
149  /* Update the space */
150  CSpace::Update();
151  }
152 
153  /****************************************/
154  /****************************************/
155 
156 #define MAIN_START_PHASE(PHASE) \
157  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
158  m_un ## PHASE ## PhaseIdleCounter = 0; \
159  m_unTaskIndex = 0; \
160  pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
161  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
162 
163 #define MAIN_WAIT_FOR_END_OF(PHASE) \
164  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
165  while(m_un ## PHASE ## PhaseIdleCounter < CSimulator::GetInstance().GetNumThreads()) { \
166  pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
167  } \
168  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
169 
171  /* Act phase */
172  MAIN_START_PHASE(Act);
174  }
175 
176  /****************************************/
177  /****************************************/
178 
180  /* Physics phase */
181  MAIN_START_PHASE(Physics);
182  MAIN_WAIT_FOR_END_OF(Physics);
183  /* Perform entity transfer from engine to engine, if needed */
184  for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
185  if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
186  (*m_ptPhysicsEngines)[i]->TransferEntities();
187  }
188  }
189  }
190 
191  /****************************************/
192  /****************************************/
193 
195  /* Media phase */
196  MAIN_START_PHASE(Media);
197  MAIN_WAIT_FOR_END_OF(Media);
198  }
199 
200  /****************************************/
201  /****************************************/
202 
204  /* Sense/control phase */
205  MAIN_START_PHASE(SenseControl);
206  MAIN_WAIT_FOR_END_OF(SenseControl);
207  }
208 
209  /****************************************/
210  /****************************************/
211 
212  void CSpaceMultiThreadBalanceLength::StartThreads() {
213  int nErrors;
214  /* Create the threads to update the controllable entities */
215  m_ptThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
216  m_psThreadData = new SThreadLaunchData*[CSimulator::GetInstance().GetNumThreads()];
217  for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
218  /* Create the struct with the info to launch the thread */
219  m_psThreadData[i] = new SThreadLaunchData(i, this);
220  /* Create the thread */
221  if((nErrors = pthread_create(m_ptThreads + i,
222  NULL,
224  reinterpret_cast<void*>(m_psThreadData[i])))) {
225  THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
226  }
227  }
228  }
229 
230  /****************************************/
231  /****************************************/
232 
233 #define THREAD_WAIT_FOR_START_OF(PHASE) \
234  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
235  while(m_un ## PHASE ## PhaseIdleCounter == CSimulator::GetInstance().GetNumThreads()) { \
236  pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
237  } \
238  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
239  pthread_testcancel();
240 
241 #define THREAD_PERFORM_TASK(PHASE, TASKVEC, SNIPPET) \
242  while(1) { \
243  pthread_mutex_lock(&m_tFetchTaskMutex); \
244  if(m_unTaskIndex < (TASKVEC).size()) { \
245  unTaskIndex = m_unTaskIndex; \
246  ++m_unTaskIndex; \
247  pthread_mutex_unlock(&m_tFetchTaskMutex); \
248  pthread_testcancel(); \
249  { \
250  SNIPPET; \
251  } \
252  pthread_testcancel(); \
253  } \
254  else { \
255  pthread_mutex_unlock(&m_tFetchTaskMutex); \
256  pthread_testcancel(); \
257  pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
258  ++m_un ## PHASE ## PhaseIdleCounter; \
259  pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
260  pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
261  pthread_testcancel(); \
262  break; \
263  } \
264  } \
265  pthread_testcancel();
266 
267  void CSpaceMultiThreadBalanceLength::SlaveThread() {
268  /* Task index */
269  size_t unTaskIndex;
270  while(1) {
273  Act,
275  if(m_vecControllableEntities[unTaskIndex]->IsEnabled()) m_vecControllableEntities[unTaskIndex]->Act();
276  );
277  THREAD_WAIT_FOR_START_OF(Physics);
279  Physics,
281  (*m_ptPhysicsEngines)[unTaskIndex]->Update();
282  );
285  Media,
286  *m_ptMedia,
287  (*m_ptMedia)[unTaskIndex]->Update();
288  );
289  THREAD_WAIT_FOR_START_OF(SenseControl);
291  SenseControl,
293  if(m_vecControllableEntities[unTaskIndex]->IsEnabled()) {
294  m_vecControllableEntities[unTaskIndex]->Sense();
295  m_vecControllableEntities[unTaskIndex]->ControlStep();
296  }
297  );
298  }
299  }
300 
301  /****************************************/
302  /****************************************/
303 
304 }
CMedium::TVector * m_ptMedia
A pointer to the list of media.
Definition: space.h:454
CARGoSLog LOG(std::cout, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_GREEN))
Definition: argos_log.h:179
#define THROW_ARGOSEXCEPTION(message)
This macro throws an ARGoS exception with the passed message.
UInt32 GetNumThreads() const
Returns the number of threads used during the experiment.
Definition: simulator.h:260
#define MAIN_START_PHASE(PHASE)
ticpp::Element TConfigurationNode
The ARGoS configuration XML node.
virtual void Destroy()
Destroys the space and all its entities.
Definition: space.cpp:85
#define MAIN_WAIT_FOR_END_OF(PHASE)
void CollectThreadResourceUsage()
Definition: profiler.cpp:172
unsigned int UInt32
32-bit unsigned integer.
Definition: datatypes.h:97
CPhysicsEngine::TVector * m_ptPhysicsEngines
A pointer to the list of physics engines.
Definition: space.h:451
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the section of the XML configuration file.
Definition: space.cpp:37
#define THREAD_PERFORM_TASK(PHASE, TASKVEC, SNIPPET)
CARGoSLog LOGERR(std::cerr, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_RED))
Definition: argos_log.h:180
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the section of the XML configuration file.
CProfiler & GetProfiler()
Returns a reference to the profiler.
Definition: simulator.h:174
void * LaunchThreadBalanceLength(void *p_data)
#define THREAD_WAIT_FOR_START_OF(PHASE)
bool IsProfiling() const
Returns true if ARGoS is being profiled.
Definition: simulator.h:182
virtual void Update()
Updates the space.
Definition: space.cpp:121
CControllableEntity::TVector m_vecControllableEntities
A vector of controllable entities.
Definition: space.h:445
The namespace containing all the ARGoS related code.
Definition: ci_actuator.h:12
virtual void Destroy()
Destroys the space and all its entities.
The core class of ARGOS.
Definition: simulator.h:62
static CSimulator & GetInstance()
Returns the instance to the CSimulator class.
Definition: simulator.cpp:78