Tpetra parallel linear algebra  Version of the Day
Tpetra_Details_iallreduce.cpp
1 // @HEADER
2 // ***********************************************************************
3 //
4 // Tpetra: Templated Linear Algebra Services Package
5 // Copyright (2008) Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38 //
39 // ************************************************************************
40 // @HEADER
41 
43 
44 #ifdef HAVE_TPETRACORE_MPI
45 # include "Teuchos_DefaultMpiComm.hpp" // only needs to be in .cpp file
46 #endif // HAVE_TPETRACORE_MPI
47 #include "Teuchos_DefaultSerialComm.hpp" // only needs to be in .cpp file
48 
49 #ifdef HAVE_TPETRACORE_MPI
50 namespace { // (anonymous)
51 std::string getMpiErrorString (const int errCode) {
52  // Space for storing the error string returned by MPI.
53  // Leave room for null termination, since I don't know if MPI does this.
54  char errString [MPI_MAX_ERROR_STRING+1];
55  int errStringLen = MPI_MAX_ERROR_STRING; // output argument
56  (void) MPI_Error_string (errCode, errString, &errStringLen);
57  // errStringLen on output is the number of characters written.
58  // I'm not sure (the MPI 3.0 Standard doesn't say) if this
59  // includes the '\0', so I'll make sure. We reserved space for
60  // the extra '\0' if needed.
61  if (errString[errStringLen-1] != '\0') {
62  errString[errStringLen] = '\0';
63  }
64  return std::string (errString); // This copies the original string.
65 }
66 } // namespace (anonymous)
67 #endif // HAVE_TPETRACORE_MPI
68 
69 namespace Tpetra {
70 namespace Details {
71 namespace Impl {
72 
73 #ifdef HAVE_TPETRACORE_MPI
74 MpiCommRequest::
75 MpiCommRequest () :
76  req_ (MPI_REQUEST_NULL)
77 {}
78 
79 MpiCommRequest::
80 MpiCommRequest (const MPI_Request req) :
81  req_ (req)
82 {}
83 
84 void
85 MpiCommRequest::
86 waitWithStatus (MPI_Status& status)
87 {
88  if (req_ != MPI_REQUEST_NULL) {
89  MPI_Request req = req_;
90  const int err = MPI_Wait (&req, &status);
91  TEUCHOS_TEST_FOR_EXCEPTION
92  (err != MPI_SUCCESS, std::runtime_error,
93  "MpiCommRequest::waitWithStatus: MPI_Wait failed with error \""
94  << getMpiErrorString (err));
95  // MPI_Wait should set the MPI_Request to MPI_REQUEST_NULL on
96  // success. We'll do it here just to be conservative.
97  req_ = MPI_REQUEST_NULL;
98  }
99 }
100 
101 void
102 MpiCommRequest::
103 wait ()
104 {
105  if (req_ != MPI_REQUEST_NULL) {
106  MPI_Request req = req_;
107  const int err = MPI_Wait (&req, MPI_STATUS_IGNORE);
108  TEUCHOS_TEST_FOR_EXCEPTION
109  (err != MPI_SUCCESS, std::runtime_error,
110  "MpiCommRequest::wait: MPI_Wait failed with error \""
111  << getMpiErrorString (err));
112  // MPI_Wait should set the MPI_Request to MPI_REQUEST_NULL on
113  // success. We'll do it here just to be conservative.
114  req_ = MPI_REQUEST_NULL;
115  }
116 }
117 
118 void
119 MpiCommRequest::
120 cancel ()
121 {
122  if (req_ != MPI_REQUEST_NULL) {
123  const int err = MPI_Cancel (&req_);
124  TEUCHOS_TEST_FOR_EXCEPTION
125  (err != MPI_SUCCESS, std::runtime_error,
126  "MpiCommRequest::cancel: MPI_Cancel failed with the following error: "
127  << getMpiErrorString (err));
128 
129  // Wait on the request. MPI requires doing this after cancel, and
130  // promises that a wait after cancel is a local operation.
131  this->wait ();
132 
133  // The returned status may still be useful; for example, one may
134  // call MPI_Test_cancelled to test an MPI_Status from a
135  // nonblocking send. For now, we'll ignore it.
136  }
137 }
138 
139 MpiCommRequest::
140 ~MpiCommRequest ()
141 {
142  if (req_ != MPI_REQUEST_NULL) {
143  // We're in a destructor, so don't throw errors. However, if
144  // MPI_Cancel fails, it's probably a bad idea to call MPI_Wait.
145  const int err = MPI_Cancel (&req_);
146  if (err == MPI_SUCCESS) {
147  // The MPI_Cancel succeeded. Now wait on the request. Ignore
148  // any reported error, since we can't do anything about those in
149  // the destructor (other than kill the program). If successful,
150  // MPI_Wait will set the MPI_Request to MPI_REQUEST_NULL. We
151  // ignore the returned MPI_Status, since if users let the
152  // request fall out of scope, then they must not care about the
153  // status.
154  //
155  // mfh 21 Oct 2012: The MPI standard requires completing a
156  // canceled request by calling a function like MPI_Wait,
157  // MPI_Test, or MPI_Request_free. MPI_Wait on a canceled
158  // request behaves like a local operation (it does not
159  // communicate or block waiting for communication). One could
160  // also call MPI_Request_free instead of MPI_Wait, but
161  // MPI_Request_free is intended more for persistent requests
162  // (created with functions like MPI_Recv_init).
163  (void) MPI_Wait (&req_, MPI_STATUS_IGNORE);
164  }
165  }
166 }
167 
168 #endif // HAVE_TPETRACORE_MPI
169 
170 std::shared_ptr<CommRequest>
171 emptyCommRequest ()
172 {
173  return std::shared_ptr<CommRequest> (new DeferredActionCommRequest ());
174 }
175 
176 DeferredActionCommRequest::
177 DeferredActionCommRequest () :
178  action_ ([] () {}), // do nothing
179  actionTaken_ (false)
180 {}
181 
183 DeferredActionCommRequest (std::function<void () > action) :
184  action_ (action),
185  actionTaken_ (false)
186 {}
187 
188 void
191 {
192  if (! actionTaken_) {
193  action_ ();
194  actionTaken_ = true;
195  }
196 }
197 
198 void
201 {
202  actionTaken_ = true;
203 }
204 
205 #ifdef HAVE_TPETRACORE_MPI
206 
207 std::shared_ptr<CommRequest>
208 iallreduceRawVoid (const void* sendbuf,
209  void* recvbuf,
210  const int count,
211  MPI_Datatype mpiDatatype,
212  const bool mpiDatatypeNeedsFree,
213  const Teuchos::EReductionType op,
214  MPI_Comm comm)
215 {
216  MPI_Op rawOp = ::Teuchos::Details::getMpiOpForEReductionType (op);
217 
218 #if MPI_VERSION >= 3
219  const bool useMpi3 = true;
220 #else
221  const bool useMpi3 = false;
222 #endif // MPI_VERSION >= 3
223 
224  // Fix for #852: always build the fall-back (MPI_VERSION < 3)
225  // implementation.
226  if (useMpi3) {
227 #if MPI_VERSION >= 3
228  MPI_Request rawRequest = MPI_REQUEST_NULL;
229  int err = MPI_SUCCESS;
230  if (sendbuf == recvbuf) {
231  // Fix for #850. This only works if rawComm is an
232  // intracommunicator. Intercommunicators don't have an in-place
233  // option for collectives.
234  err = MPI_Iallreduce (MPI_IN_PLACE, recvbuf, count, mpiDatatype,
235  rawOp, comm, &rawRequest);
236  }
237  else {
238  err = MPI_Iallreduce (sendbuf, recvbuf, count, mpiDatatype,
239  rawOp, comm, &rawRequest);
240  }
241  TEUCHOS_TEST_FOR_EXCEPTION
242  (err != MPI_SUCCESS, std::runtime_error,
243  "MPI_Iallreduce failed with the following error: "
244  << getMpiErrorString (err));
245  if (mpiDatatypeNeedsFree) {
246  // As long as the MPI_Datatype goes into MPI_Iallreduce, it's OK
247  // to free it, even if the MPI_Iallreduce has not yet completed.
248  // There's no sense in checking the error code here.
249  (void) MPI_Type_free (&mpiDatatype);
250  }
251  return std::shared_ptr<CommRequest> (new MpiCommRequest (rawRequest));
252 #else
253  TEUCHOS_TEST_FOR_EXCEPTION
254  (true, std::logic_error, "Should never get here. "
255  "Please report this bug to the Tpetra developers.");
256 #endif // MPI_VERSION >= 3
257  }
258  else { // ! useMpi3
259  // We don't have MPI_Iallreduce. The next best thing is to defer an
260  // MPI_Allreduce call until wait. We do this by returning a
261  // "DeferredActionCommRequest," which is just a wrapped
262  // std::function.
263  //
264  // NOTE (mfh 12 Nov 2016, 14 Nov 2016) One issue with this approach
265  // is that we have to make sure that the MPI_Datatype won't go away
266  // before the MPI_Allreduce gets called. We handle this for now by
267  // calling MPI_Type_dup and stashing the destructor in the request.
268  // (Don't use the MPI_COMM_SELF trick here, unless you first check
269  // whether you've seen that MPI_Datatype before -- otherwise you'll
270  // get memory growth linear in the number of iallreduce calls.)
271  return std::shared_ptr<CommRequest> (new DeferredActionCommRequest ([=] () {
272  // It could be that this action got deferred beyond
273  // MPI_Finalize. In that case, do nothing.
274  int mpiInitialized = 0;
275  (void) MPI_Initialized (&mpiInitialized);
276  int mpiFinalized = 0;
277  (void) MPI_Finalized (&mpiFinalized);
278  if (mpiFinalized == 0 && mpiInitialized != 0) {
279  // FIXME (mfh 14 Nov 2016) Unfortunately, there is no
280  // MPI_Op_dup, so I can't guarantee that the input MPI_Op
281  // will still exist to the point where it is actually
282  // used.
283  //
284  // FIXME (mfh 14 Nov 2016) Best practice would be to
285  // duplicate the input MPI_Comm, so that we can ensure its
286  // survival to this point. However, we can't guarantee
287  // survival of the input MPI_Op, so we might as well just
288  // not bother.
289  if (mpiDatatypeNeedsFree) {
290  // Copy the original MPI_Datatype, so that we can safely
291  // defer this call past survival of the original.
292  MPI_Datatype dupDatatype;
293  (void) MPI_Type_dup (mpiDatatype, &dupDatatype);
294 #if MPI_VERSION >= 3
295  if (sendbuf == recvbuf) {
296  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf, count, dupDatatype,
297  rawOp, comm);
298  }
299  else {
300  (void) MPI_Allreduce (sendbuf, recvbuf, count, dupDatatype,
301  rawOp, comm);
302  }
303 #else // MPI_VERSION < 3
304  if (sendbuf == recvbuf) {
305  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf,
306  count, dupDatatype, rawOp, comm);
307  }
308  else {
309  // OpenMPI 1.6.5 insists on void*, not const void*, for sendbuf.
310  (void) MPI_Allreduce (const_cast<void*> (sendbuf), recvbuf,
311  count, dupDatatype, rawOp, comm);
312  }
313 #endif // MPI_VERSION >= 3
314  (void) MPI_Type_free (&dupDatatype);
315  }
316  else {
317 #if MPI_VERSION >= 3
318  if (sendbuf == recvbuf) {
319  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf, count, mpiDatatype,
320  rawOp, comm);
321  }
322  else {
323  (void) MPI_Allreduce (sendbuf, recvbuf, count, mpiDatatype,
324  rawOp, comm);
325  }
326 #else // MPI_VERSION < 3
327  if (sendbuf == recvbuf) {
328  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf,
329  count, mpiDatatype, rawOp, comm);
330  }
331  else {
332  // OpenMPI 1.6.5 insists on void*, not const void*, for sendbuf.
333  (void) MPI_Allreduce (const_cast<void*> (sendbuf), recvbuf,
334  count, mpiDatatype, rawOp, comm);
335  }
336 #endif // MPI_VERSION >= 3
337  }
338  }
339  }));
340  } // useMpi3
341 }
342 
343 #endif // HAVE_TPETRACORE_MPI
344 
345 } // namespace Impl
346 } // namespace Details
347 } // namespace Tpetra
348 
349 
Tpetra::Details::Impl::DeferredActionCommRequest::cancel
void cancel()
Cancel the pending communication request, without taking the specified action.
Definition: Tpetra_Details_iallreduce.cpp:200
Details
Implementation details of Tpetra.
Tpetra::Details::Impl::DeferredActionCommRequest::wait
void wait()
Wait on this communication request to complete.
Definition: Tpetra_Details_iallreduce.cpp:190
Tpetra
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Tpetra::Details::Impl::DeferredActionCommRequest::DeferredActionCommRequest
DeferredActionCommRequest()
Default constructor (take no action on wait).
Definition: Tpetra_Details_iallreduce.cpp:177
Tpetra_Details_iallreduce.hpp
Declaration of Tpetra::Details::iallreduce.