41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Teuchos_StandardParameterEntryValidators.hpp"
44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
52 if (sendType == DISTRIBUTOR_ISEND) {
55 else if (sendType == DISTRIBUTOR_RSEND) {
58 else if (sendType == DISTRIBUTOR_SEND) {
61 else if (sendType == DISTRIBUTOR_SSEND) {
65 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid "
66 "EDistributorSendType enum value " << sendType <<
".");
74 case Details::DISTRIBUTOR_NOT_INITIALIZED:
75 return "Not initialized yet";
76 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
77 return "By createFromSends";
78 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
79 return "By createFromRecvs";
80 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
81 return "By createReverseDistributor";
82 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
83 return "By copy constructor";
90 Teuchos::Array<std::string>
93 Teuchos::Array<std::string> sendTypes;
94 sendTypes.push_back (
"Isend");
95 sendTypes.push_back (
"Rsend");
96 sendTypes.push_back (
"Send");
97 sendTypes.push_back (
"Ssend");
107 const bool tpetraDistributorDebugDefault =
false;
109 const bool barrierBetween_default =
false;
111 const bool useDistinctTags_default =
true;
114 int Distributor::getTag (
const int pathTag)
const {
115 return useDistinctTags_ ? pathTag : comm_->getTag ();
119 #ifdef TPETRA_DISTRIBUTOR_TIMERS
120 void Distributor::makeTimers () {
121 const std::string name_doPosts3 =
"Tpetra::Distributor: doPosts(3)";
122 const std::string name_doPosts4 =
"Tpetra::Distributor: doPosts(4)";
123 const std::string name_doWaits =
"Tpetra::Distributor: doWaits";
124 const std::string name_doPosts3_recvs =
"Tpetra::Distributor: doPosts(3): recvs";
125 const std::string name_doPosts4_recvs =
"Tpetra::Distributor: doPosts(4): recvs";
126 const std::string name_doPosts3_barrier =
"Tpetra::Distributor: doPosts(3): barrier";
127 const std::string name_doPosts4_barrier =
"Tpetra::Distributor: doPosts(4): barrier";
128 const std::string name_doPosts3_sends =
"Tpetra::Distributor: doPosts(3): sends";
129 const std::string name_doPosts4_sends =
"Tpetra::Distributor: doPosts(4): sends";
131 timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
132 timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
133 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
134 timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
135 timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
136 timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
137 timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
138 timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
139 timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
141 #endif // TPETRA_DISTRIBUTOR_TIMERS
144 Distributor::init (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
145 const Teuchos::RCP<Teuchos::FancyOStream>& out,
146 const Teuchos::RCP<Teuchos::ParameterList>& plist)
149 this->out_ = out.is_null () ?
150 Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
151 if (! plist.is_null ()) {
155 #ifdef TPETRA_DISTRIBUTOR_TIMERS
157 #endif // TPETRA_DISTRIBUTOR_TIMERS
159 if (verbose || debug_) {
160 TEUCHOS_TEST_FOR_EXCEPTION
161 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: "
162 "verbose and/or debug_ are true but out_ (pointer to the output "
163 "stream) is NULL. Please report this bug to the Tpetra developers.");
166 Teuchos::OSTab tab (out_);
167 std::ostringstream os;
168 os << comm_->getRank ()
169 <<
": Distributor ctor done" << std::endl;
176 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
177 , sendType_ (
Details::DISTRIBUTOR_SEND)
178 , barrierBetween_ (barrierBetween_default)
179 , debug_ (tpetraDistributorDebugDefault)
180 , selfMessage_ (false)
184 , totalReceiveLength_ (0)
185 , lastRoundBytesSend_ (0)
186 , lastRoundBytesRecv_ (0)
187 , useDistinctTags_ (useDistinctTags_default)
189 init (comm, Teuchos::null, Teuchos::null);
193 const Teuchos::RCP<Teuchos::FancyOStream>& out)
195 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
196 , sendType_ (
Details::DISTRIBUTOR_SEND)
197 , barrierBetween_ (barrierBetween_default)
198 , debug_ (tpetraDistributorDebugDefault)
199 , selfMessage_ (false)
203 , totalReceiveLength_ (0)
204 , lastRoundBytesSend_ (0)
205 , lastRoundBytesRecv_ (0)
206 , useDistinctTags_ (useDistinctTags_default)
208 init (comm, out, Teuchos::null);
212 const Teuchos::RCP<Teuchos::ParameterList>& plist)
214 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
215 , sendType_ (
Details::DISTRIBUTOR_SEND)
216 , barrierBetween_ (barrierBetween_default)
217 , debug_ (tpetraDistributorDebugDefault)
218 , selfMessage_ (false)
222 , totalReceiveLength_ (0)
223 , lastRoundBytesSend_ (0)
224 , lastRoundBytesRecv_ (0)
225 , useDistinctTags_ (useDistinctTags_default)
227 init (comm, Teuchos::null, plist);
231 const Teuchos::RCP<Teuchos::FancyOStream>& out,
232 const Teuchos::RCP<Teuchos::ParameterList>& plist)
234 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
235 , sendType_ (
Details::DISTRIBUTOR_SEND)
236 , barrierBetween_ (barrierBetween_default)
237 , debug_ (tpetraDistributorDebugDefault)
238 , selfMessage_ (false)
242 , totalReceiveLength_ (0)
243 , lastRoundBytesSend_ (0)
244 , lastRoundBytesRecv_ (0)
245 , useDistinctTags_ (useDistinctTags_default)
247 init (comm, out, plist);
251 : comm_ (distributor.comm_)
252 , out_ (distributor.out_)
253 , howInitialized_ (
Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
254 , sendType_ (distributor.sendType_)
255 , barrierBetween_ (distributor.barrierBetween_)
256 , debug_ (distributor.debug_)
257 , selfMessage_ (distributor.selfMessage_)
258 , numSends_ (distributor.numSends_)
259 , procsTo_ (distributor.procsTo_)
260 , startsTo_ (distributor.startsTo_)
261 , lengthsTo_ (distributor.lengthsTo_)
262 , maxSendLength_ (distributor.maxSendLength_)
263 , indicesTo_ (distributor.indicesTo_)
264 , numReceives_ (distributor.numReceives_)
265 , totalReceiveLength_ (distributor.totalReceiveLength_)
266 , lengthsFrom_ (distributor.lengthsFrom_)
267 , procsFrom_ (distributor.procsFrom_)
268 , startsFrom_ (distributor.startsFrom_)
269 , indicesFrom_ (distributor.indicesFrom_)
270 , reverseDistributor_ (distributor.reverseDistributor_)
271 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
272 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
273 , useDistinctTags_ (distributor.useDistinctTags_)
275 using Teuchos::ParameterList;
276 using Teuchos::parameterList;
286 RCP<const ParameterList> rhsList = distributor.getParameterList ();
287 if (! rhsList.is_null ()) {
288 this->setMyParamList (parameterList (* rhsList));
291 #ifdef TPETRA_DISTRIBUTOR_TIMERS
293 #endif // TPETRA_DISTRIBUTOR_TIMERS
296 if (verbose || debug_) {
297 TEUCHOS_TEST_FOR_EXCEPTION
298 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: "
299 "verbose and/or debug_ are true but out_ (pointer to the output "
300 "stream) is NULL. Please report this bug to the Tpetra developers.");
301 Teuchos::OSTab tab (out_);
304 std::ostringstream os;
305 os << comm_->getRank ()
306 <<
": Distributor copy ctor done" << std::endl;
312 using Teuchos::ParameterList;
313 using Teuchos::parameterList;
316 std::swap (comm_, rhs.comm_);
317 std::swap (out_, rhs.out_);
318 std::swap (howInitialized_, rhs.howInitialized_);
319 std::swap (sendType_, rhs.sendType_);
320 std::swap (barrierBetween_, rhs.barrierBetween_);
321 std::swap (debug_, rhs.debug_);
322 std::swap (selfMessage_, rhs.selfMessage_);
323 std::swap (numSends_, rhs.numSends_);
324 std::swap (procsTo_, rhs.procsTo_);
325 std::swap (startsTo_, rhs.startsTo_);
326 std::swap (lengthsTo_, rhs.lengthsTo_);
327 std::swap (maxSendLength_, rhs.maxSendLength_);
328 std::swap (indicesTo_, rhs.indicesTo_);
329 std::swap (numReceives_, rhs.numReceives_);
330 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
331 std::swap (lengthsFrom_, rhs.lengthsFrom_);
332 std::swap (procsFrom_, rhs.procsFrom_);
333 std::swap (startsFrom_, rhs.startsFrom_);
334 std::swap (indicesFrom_, rhs.indicesFrom_);
335 std::swap (reverseDistributor_, rhs.reverseDistributor_);
336 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
337 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
338 std::swap (useDistinctTags_, rhs.useDistinctTags_);
342 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
343 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
344 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
345 rhsList = parameterList (*rhsList);
347 if (! rhsList.is_null ()) {
348 this->setMyParamList (rhsList);
350 if (! lhsList.is_null ()) {
351 rhs.setMyParamList (lhsList);
380 using Teuchos::FancyOStream;
381 using Teuchos::getIntegralValue;
382 using Teuchos::includesVerbLevel;
383 using Teuchos::OSTab;
384 using Teuchos::ParameterList;
385 using Teuchos::parameterList;
390 plist->validateParametersAndSetDefaults (*validParams);
392 const bool barrierBetween =
393 plist->get<
bool> (
"Barrier between receives and sends");
395 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
396 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
397 const bool debug = plist->get<
bool> (
"Debug");
402 const bool enable_cuda_rdma =
403 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
404 TEUCHOS_TEST_FOR_EXCEPTION
405 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::"
406 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA "
407 "support\" = false. This is no longer valid. You don't need to "
408 "specify this option any more; Tpetra assumes it is always true. "
409 "This is a very light assumption on the MPI implementation, and in "
410 "fact does not actually involve hardware or system RDMA support. "
411 "Tpetra just assumes that the MPI implementation can tell whether a "
412 "pointer points to host memory or CUDA device memory.");
419 TEUCHOS_TEST_FOR_EXCEPTION(
420 ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
421 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
422 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier "
423 "between receives and sends." << endl <<
"This is invalid; you must "
424 "include the barrier if you use ready sends." << endl <<
"Ready sends "
425 "require that their corresponding receives have already been posted, "
426 "and the only way to guarantee that in general is with a barrier.");
429 sendType_ = sendType;
430 barrierBetween_ = barrierBetween;
431 useDistinctTags_ = useDistinctTags;
436 this->setMyParamList (plist);
439 Teuchos::RCP<const Teuchos::ParameterList>
442 using Teuchos::Array;
443 using Teuchos::ParameterList;
444 using Teuchos::parameterList;
446 using Teuchos::setStringToIntegralParameter;
448 const bool barrierBetween = barrierBetween_default;
449 const bool useDistinctTags = useDistinctTags_default;
450 const bool debug = tpetraDistributorDebugDefault;
453 const std::string defaultSendType (
"Send");
454 Array<Details::EDistributorSendType> sendTypeEnums;
455 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
456 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
457 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
458 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
460 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
461 plist->set (
"Barrier between receives and sends", barrierBetween,
462 "Whether to execute a barrier between receives and sends in do"
463 "[Reverse]Posts(). Required for correctness when \"Send type\""
464 "=\"Rsend\", otherwise correct but not recommended.");
465 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
466 defaultSendType,
"When using MPI, the variant of send to use in "
467 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
468 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct "
469 "MPI message tags for different code paths. Highly recommended"
470 " to avoid message collisions.");
471 plist->set (
"Debug", debug,
"Whether to print copious debugging output on "
473 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can "
474 "tell whether a pointer points to host memory or CUDA device "
475 "memory. You don't need to specify this option any more; "
476 "Tpetra assumes it is always true. This is a very light "
477 "assumption on the MPI implementation, and in fact does not "
478 "actually involve hardware or system RDMA support.");
486 Teuchos::setupVerboseObjectSublist (&*plist);
487 return Teuchos::rcp_const_cast<const ParameterList> (plist);
492 {
return totalReceiveLength_; }
495 {
return numReceives_; }
498 {
return selfMessage_; }
501 {
return numSends_; }
504 {
return maxSendLength_; }
507 {
return procsFrom_; }
510 {
return lengthsFrom_; }
516 {
return lengthsTo_; }
518 Teuchos::RCP<Distributor>
520 if (reverseDistributor_.is_null ()) {
521 createReverseDistributor ();
523 TEUCHOS_TEST_FOR_EXCEPTION
524 (reverseDistributor_.is_null (), std::logic_error,
"The reverse "
525 "Distributor is null after createReverseDistributor returned. "
526 "Please report this bug to the Tpetra developers.");
527 return reverseDistributor_;
532 Distributor::createReverseDistributor()
const
534 reverseDistributor_ = Teuchos::rcp (
new Distributor (comm_, out_));
535 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
536 reverseDistributor_->sendType_ = sendType_;
537 reverseDistributor_->barrierBetween_ = barrierBetween_;
538 reverseDistributor_->debug_ = debug_;
543 size_t totalSendLength =
544 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
549 size_t maxReceiveLength = 0;
550 const int myProcID = comm_->getRank();
551 for (
size_t i=0; i < numReceives_; ++i) {
552 if (procsFrom_[i] != myProcID) {
554 if (lengthsFrom_[i] > maxReceiveLength) {
555 maxReceiveLength = lengthsFrom_[i];
564 reverseDistributor_->selfMessage_ = selfMessage_;
565 reverseDistributor_->numSends_ = numReceives_;
566 reverseDistributor_->procsTo_ = procsFrom_;
567 reverseDistributor_->startsTo_ = startsFrom_;
568 reverseDistributor_->lengthsTo_ = lengthsFrom_;
569 reverseDistributor_->maxSendLength_ = maxReceiveLength;
570 reverseDistributor_->indicesTo_ = indicesFrom_;
571 reverseDistributor_->numReceives_ = numSends_;
572 reverseDistributor_->totalReceiveLength_ = totalSendLength;
573 reverseDistributor_->lengthsFrom_ = lengthsTo_;
574 reverseDistributor_->procsFrom_ = procsTo_;
575 reverseDistributor_->startsFrom_ = startsTo_;
576 reverseDistributor_->indicesFrom_ = indicesTo_;
585 reverseDistributor_->lastRoundBytesSend_ = 0;
586 reverseDistributor_->lastRoundBytesRecv_ = 0;
588 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
601 reverseDistributor_->reverseDistributor_ = Teuchos::null;
606 using Teuchos::Array;
607 using Teuchos::CommRequest;
608 using Teuchos::FancyOStream;
609 using Teuchos::includesVerbLevel;
610 using Teuchos::is_null;
611 using Teuchos::OSTab;
613 using Teuchos::waitAll;
616 Teuchos::OSTab tab (out_);
618 #ifdef TPETRA_DISTRIBUTOR_TIMERS
619 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
620 #endif // TPETRA_DISTRIBUTOR_TIMERS
622 const int myRank = comm_->getRank ();
626 std::ostringstream os;
627 os << myRank <<
": doWaits: # reqs = "
628 << requests_.size () << endl;
632 if (requests_.size() > 0) {
633 waitAll (*comm_, requests_());
635 #ifdef HAVE_TEUCHOS_DEBUG
637 for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
638 it != requests_.end(); ++it)
640 TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
641 Teuchos::typeName(*
this) <<
"::doWaits(): Communication requests "
642 "should all be null aftr calling Teuchos::waitAll() on them, but "
643 "at least one request is not null.");
645 #endif // HAVE_TEUCHOS_DEBUG
648 requests_.resize (0);
651 #ifdef HAVE_TEUCHOS_DEBUG
653 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
654 int globalSizeNonzero = 0;
655 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
657 Teuchos::outArg (globalSizeNonzero));
658 TEUCHOS_TEST_FOR_EXCEPTION(
659 globalSizeNonzero != 0, std::runtime_error,
660 "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
661 "a nonzero number of outstanding posts. There should be none at this "
662 "point. Please report this bug to the Tpetra developers.");
664 #endif // HAVE_TEUCHOS_DEBUG
667 std::ostringstream os;
668 os << myRank <<
": doWaits done" << endl;
675 if (! reverseDistributor_.is_null()) {
676 reverseDistributor_->doWaits();
681 std::ostringstream out;
683 out <<
"\"Tpetra::Distributor\": {";
684 const std::string label = this->getObjectLabel ();
686 out <<
"Label: " << label <<
", ";
688 out <<
"How initialized: "
692 << DistributorSendTypeEnumToString (sendType_)
693 <<
", Barrier between receives and sends: "
694 << (barrierBetween_ ?
"true" :
"false")
695 <<
", Use distinct tags: "
696 << (useDistinctTags_ ?
"true" :
"false")
697 <<
", Debug: " << (debug_ ?
"true" :
"false")
704 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const
706 using Teuchos::toString;
707 using Teuchos::VERB_HIGH;
708 using Teuchos::VERB_EXTREME;
712 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
713 return std::string ();
716 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
717 auto outp = Teuchos::getFancyOStream (outStringP);
718 Teuchos::FancyOStream& out = *outp;
720 const int myRank = comm_->getRank ();
721 const int numProcs = comm_->getSize ();
722 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
723 Teuchos::OSTab tab1 (out);
727 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
728 out <<
"procsTo: " << toString (procsTo_) << endl;
729 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
732 if (vl == VERB_EXTREME) {
733 out <<
"startsTo: " << toString (startsTo_) << endl;
734 out <<
"indicesTo: " << toString (indicesTo_) << endl;
736 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
739 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
740 out <<
"startsFrom: " << toString (startsFrom_) << endl;
741 out <<
"procsFrom: " << toString (procsFrom_) << endl;
745 return outStringP->str ();
751 const Teuchos::EVerbosityLevel verbLevel)
const
754 using Teuchos::VERB_DEFAULT;
755 using Teuchos::VERB_NONE;
756 using Teuchos::VERB_LOW;
757 using Teuchos::VERB_MEDIUM;
758 using Teuchos::VERB_HIGH;
759 using Teuchos::VERB_EXTREME;
760 const Teuchos::EVerbosityLevel vl =
761 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
763 if (vl == VERB_NONE) {
771 if (comm_.is_null ()) {
774 const int myRank = comm_->getRank ();
775 const int numProcs = comm_->getSize ();
784 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
790 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
793 out <<
"\"Tpetra::Distributor\":" << endl;
794 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
796 const std::string label = this->getObjectLabel ();
798 out <<
"Label: " << label << endl;
800 out <<
"Number of processes: " << numProcs << endl
801 <<
"How initialized: "
805 out <<
"Parameters: " << endl;
806 Teuchos::OSTab tab2 (out);
807 out <<
"\"Send type\": "
808 << DistributorSendTypeEnumToString (sendType_) << endl
809 <<
"\"Barrier between receives and sends\": "
810 << (barrierBetween_ ?
"true" :
"false") << endl
811 <<
"\"Use distinct tags\": "
812 << (useDistinctTags_ ?
"true" :
"false") << endl
813 <<
"\"Debug\": " << (debug_ ?
"true" :
"false") << endl;
819 const std::string lclStr = this->localDescribeToString (vl);
823 out <<
"Reverse Distributor:";
824 if (reverseDistributor_.is_null ()) {
825 out <<
" null" << endl;
829 reverseDistributor_->describe (out, vl);
834 Distributor::computeReceives ()
836 using Teuchos::Array;
837 using Teuchos::ArrayRCP;
839 using Teuchos::CommStatus;
840 using Teuchos::CommRequest;
841 using Teuchos::ireceive;
844 using Teuchos::REDUCE_SUM;
845 using Teuchos::receive;
846 using Teuchos::reduce;
847 using Teuchos::scatter;
849 using Teuchos::waitAll;
852 Teuchos::OSTab tab (out_);
853 const int myRank = comm_->getRank();
854 const int numProcs = comm_->getSize();
857 const int pathTag = 2;
858 const int tag = this->getTag (pathTag);
862 std::ostringstream os;
863 os << myRank <<
": computeReceives: "
864 "{selfMessage_: " << (selfMessage_ ?
"true" :
"false")
865 <<
", tag: " << tag <<
"}" << endl;
875 Array<int> toProcsFromMe (numProcs, 0);
876 #ifdef HAVE_TEUCHOS_DEBUG
877 bool counting_error =
false;
878 #endif // HAVE_TEUCHOS_DEBUG
879 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
880 #ifdef HAVE_TEUCHOS_DEBUG
881 if (toProcsFromMe[procsTo_[i]] != 0) {
882 counting_error =
true;
884 #endif // HAVE_TEUCHOS_DEBUG
885 toProcsFromMe[procsTo_[i]] = 1;
887 #ifdef HAVE_TEUCHOS_DEBUG
889 "Tpetra::Distributor::computeReceives: There was an error on at least "
890 "one process in counting the number of messages send by that process to "
891 "the other processs. Please report this bug to the Tpetra developers.",
893 #endif // HAVE_TEUCHOS_DEBUG
896 std::ostringstream os;
897 os << myRank <<
": computeReceives: Calling reduce and scatter" << endl;
954 Array<int> numRecvsOnEachProc;
955 if (myRank == root) {
956 numRecvsOnEachProc.resize (numProcs);
958 int numReceivesAsInt = 0;
959 reduce<int, int> (toProcsFromMe.getRawPtr (),
960 numRecvsOnEachProc.getRawPtr (),
961 numProcs, REDUCE_SUM, root, *comm_);
962 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
963 &numReceivesAsInt, 1, root, *comm_);
964 numReceives_ = static_cast<size_t> (numReceivesAsInt);
970 lengthsFrom_.assign (numReceives_, 0);
971 procsFrom_.assign (numReceives_, 0);
987 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
993 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
994 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
995 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
1000 const int anySourceProc = MPI_ANY_SOURCE;
1002 const int anySourceProc = -1;
1006 std::ostringstream os;
1007 os << myRank <<
": computeReceives: Posting "
1008 << actualNumReceives <<
" irecvs" << endl;
1013 for (
size_t i = 0; i < actualNumReceives; ++i) {
1018 lengthsFromBuffers[i].resize (1);
1019 lengthsFromBuffers[i][0] = as<size_t> (0);
1020 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1022 std::ostringstream os;
1023 os << myRank <<
": computeReceives: "
1024 "Posted any-proc irecv w/ specified tag " << tag << endl;
1030 std::ostringstream os;
1031 os << myRank <<
": computeReceives: "
1032 "posting " << numSends_ <<
" sends" << endl;
1043 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1044 if (procsTo_[i] != myRank) {
1048 const size_t*
const lengthsTo_i = &lengthsTo_[i];
1049 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1051 std::ostringstream os;
1052 os << myRank <<
": computeReceives: "
1053 "Posted send to Proc " << procsTo_[i] <<
" w/ specified tag "
1065 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1066 procsFrom_[numReceives_-1] = myRank;
1071 std::ostringstream os;
1072 os << myRank <<
": computeReceives: waitAll on "
1073 << requests.size () <<
" requests" << endl;
1082 waitAll (*comm_, requests (), statuses ());
1083 for (
size_t i = 0; i < actualNumReceives; ++i) {
1084 lengthsFrom_[i] = *lengthsFromBuffers[i];
1085 procsFrom_[i] = statuses[i]->getSourceRank ();
1091 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1094 totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1095 indicesFrom_.clear ();
1096 indicesFrom_.reserve (totalReceiveLength_);
1097 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1098 indicesFrom_.push_back(i);
1101 startsFrom_.clear ();
1102 startsFrom_.reserve (numReceives_);
1103 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1104 startsFrom_.push_back(j);
1105 j += lengthsFrom_[i];
1113 std::ostringstream os;
1114 os << myRank <<
": computeReceives: done" << endl;
1122 using Teuchos::outArg;
1123 using Teuchos::REDUCE_MAX;
1124 using Teuchos::reduceAll;
1127 Teuchos::OSTab tab (out_);
1128 const size_t numExports = exportProcIDs.size();
1129 const int myProcID = comm_->getRank();
1130 const int numProcs = comm_->getSize();
1133 std::ostringstream os;
1134 os << myProcID <<
": createFromSends" << endl;
1186 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1189 size_t numActive = 0;
1190 int needSendBuff = 0;
1192 #ifdef HAVE_TPETRA_DEBUG
1194 #endif // HAVE_TPETRA_DEBUG
1195 for (
size_t i = 0; i < numExports; ++i) {
1196 const int exportID = exportProcIDs[i];
1197 if (exportID >= numProcs) {
1198 #ifdef HAVE_TPETRA_DEBUG
1200 #endif // HAVE_TPETRA_DEBUG
1203 else if (exportID >= 0) {
1217 if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1224 #ifdef HAVE_TPETRA_DEBUG
1231 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1232 TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1233 Teuchos::typeName(*
this) <<
"::createFromSends(): Process " << gbl_badID
1234 <<
", perhaps among other processes, got a bad send process ID.");
1249 #endif // HAVE_TPETRA_DEBUG
1251 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1253 int global_needSendBuff;
1254 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1255 outArg (global_needSendBuff));
1257 global_needSendBuff != 0, std::runtime_error,
1258 "::createFromSends: Grouping export IDs together by process rank often "
1259 "improves performance.");
1265 if (starts[myProcID] != 0) {
1266 selfMessage_ =
true;
1269 selfMessage_ =
false;
1272 #ifdef HAVE_TEUCHOS_DEBUG
1273 bool index_neq_numActive =
false;
1274 bool send_neq_numSends =
false;
1276 if (! needSendBuff) {
1281 for (
int i = 0; i < numProcs; ++i) {
1289 indicesTo_.resize(0);
1292 procsTo_.assign(numSends_,0);
1293 startsTo_.assign(numSends_,0);
1294 lengthsTo_.assign(numSends_,0);
1301 size_t index = 0, procIndex = 0;
1302 for (
size_t i = 0; i < numSends_; ++i) {
1303 while (exportProcIDs[procIndex] < 0) {
1306 startsTo_[i] = procIndex;
1307 int procID = exportProcIDs[procIndex];
1308 procsTo_[i] = procID;
1309 index += starts[procID];
1310 procIndex += starts[procID];
1312 #ifdef HAVE_TEUCHOS_DEBUG
1313 if (index != numActive) {
1314 index_neq_numActive =
true;
1320 if (numSends_ > 0) {
1321 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1325 for (
size_t i = 0; i < numSends_; ++i) {
1326 int procID = procsTo_[i];
1327 lengthsTo_[i] = starts[procID];
1328 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1329 maxSendLength_ = lengthsTo_[i];
1340 if (starts[0] == 0 ) {
1346 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1348 i != starts.end(); ++i)
1350 if (*i != 0) ++numSends_;
1356 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1357 i=starts.rbegin()+1;
1358 i != starts.rend(); ++i)
1367 indicesTo_.resize(numActive);
1369 for (
size_t i = 0; i < numExports; ++i) {
1370 if (exportProcIDs[i] >= 0) {
1372 indicesTo_[starts[exportProcIDs[i]]] = i;
1374 ++starts[exportProcIDs[i]];
1386 for (
int proc = numProcs-1; proc != 0; --proc) {
1387 starts[proc] = starts[proc-1];
1390 starts[numProcs] = numActive;
1397 procsTo_.resize(numSends_);
1398 startsTo_.resize(numSends_);
1399 lengthsTo_.resize(numSends_);
1406 for (
int proc = 0; proc < numProcs; ++proc ) {
1407 if (starts[proc+1] != starts[proc]) {
1408 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1409 startsTo_[snd] = starts[proc];
1411 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1412 maxSendLength_ = lengthsTo_[snd];
1414 procsTo_[snd] = proc;
1418 #ifdef HAVE_TEUCHOS_DEBUG
1419 if (snd != numSends_) {
1420 send_neq_numSends =
true;
1424 #ifdef HAVE_TEUCHOS_DEBUG
1426 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1428 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1431 if (selfMessage_) --numSends_;
1437 std::ostringstream os;
1438 os << myProcID <<
": createFromSends: done" << endl;
1444 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1446 return totalReceiveLength_;
1452 const Teuchos::ArrayView<const int>& remoteProcIDs)
1461 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1464 int myProcID = comm_->getRank ();
1465 int numProcs = comm_->getSize();
1467 const size_t numExportIDs = exportProcIDs.size();
1468 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1470 size_t numActive = 0;
1471 int needSendBuff = 0;
1473 for(
size_t i = 0; i < numExportIDs; i++ )
1475 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1477 if( exportProcIDs[i] >= 0 )
1479 ++starts[ exportProcIDs[i] ];
1484 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1490 if (starts[0] == 0 ) {
1496 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1498 i != starts.end(); ++i)
1500 if (*i != 0) ++numSends_;
1506 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1507 i=starts.rbegin()+1;
1508 i != starts.rend(); ++i)
1517 indicesTo_.resize(numActive);
1519 for (
size_t i = 0; i < numExportIDs; ++i) {
1520 if (exportProcIDs[i] >= 0) {
1522 indicesTo_[starts[exportProcIDs[i]]] = i;
1524 ++starts[exportProcIDs[i]];
1527 for (
int proc = numProcs-1; proc != 0; --proc) {
1528 starts[proc] = starts[proc-1];
1531 starts[numProcs] = numActive;
1532 procsTo_.resize(numSends_);
1533 startsTo_.resize(numSends_);
1534 lengthsTo_.resize(numSends_);
1537 for (
int proc = 0; proc < numProcs; ++proc ) {
1538 if (starts[proc+1] != starts[proc]) {
1539 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1540 startsTo_[snd] = starts[proc];
1542 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1543 maxSendLength_ = lengthsTo_[snd];
1545 procsTo_[snd] = proc;
1555 for (
int i = 0; i < numProcs; ++i) {
1563 indicesTo_.resize(0);
1566 procsTo_.assign(numSends_,0);
1567 startsTo_.assign(numSends_,0);
1568 lengthsTo_.assign(numSends_,0);
1575 size_t index = 0, procIndex = 0;
1576 for (
size_t i = 0; i < numSends_; ++i) {
1577 while (exportProcIDs[procIndex] < 0) {
1580 startsTo_[i] = procIndex;
1581 int procID = exportProcIDs[procIndex];
1582 procsTo_[i] = procID;
1583 index += starts[procID];
1584 procIndex += starts[procID];
1589 if (numSends_ > 0) {
1590 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1594 for (
size_t i = 0; i < numSends_; ++i) {
1595 int procID = procsTo_[i];
1596 lengthsTo_[i] = starts[procID];
1597 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1598 maxSendLength_ = lengthsTo_[i];
1604 numSends_ -= selfMessage_;
1605 std::vector<int> recv_list;
1606 recv_list.reserve(numSends_);
1609 for(
int i=0; i<remoteProcIDs.size(); i++) {
1610 if(remoteProcIDs[i]>last_pid) {
1611 recv_list.push_back(remoteProcIDs[i]);
1612 last_pid = remoteProcIDs[i];
1614 else if (remoteProcIDs[i]<last_pid)
1615 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1617 numReceives_ = recv_list.size();
1619 procsFrom_.assign(numReceives_,0);
1620 lengthsFrom_.assign(numReceives_,0);
1621 indicesFrom_.assign(numReceives_,0);
1622 startsFrom_.assign(numReceives_,0);
1624 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1626 procsFrom_[i] = recv_list[i];
1628 for( ; j<(size_t)remoteProcIDs.size() &&
1629 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1630 lengthsFrom_[i] = j-jlast;
1632 totalReceiveLength_ = remoteProcIDs.size();
1633 indicesFrom_.clear ();
1634 indicesFrom_.reserve (totalReceiveLength_);
1635 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1636 indicesFrom_.push_back(i);
1639 numReceives_-=selfMessage_;