[Debian-astro-commits] [gyoto] 42/221: * rename Scenery::is_worker to am_worker * give raytrace task earlier in Scenery::raytrace() * merge workers and manager into a team, use broadcast instead of P2P whenever appropriate
Thibaut Jean-Claude Paumard
thibaut at moszumanska.debian.org
Fri May 22 20:52:32 UTC 2015
This is an automated email from the git hooks/post-receive script.
thibaut pushed a commit to branch master
in repository gyoto.
commit 7b1e3a433b633ae92823a9b8fe11cdbbf10dc453
Author: Thibaut Paumard <paumard at users.sourceforge.net>
Date: Mon Oct 13 09:12:08 2014 +0200
* rename Scenery::is_worker to am_worker
* give raytrace task earlier in Scenery::raytrace()
* merge workers and manager into a team, use broadcast instead of P2P whenever appropriate
---
bin/gyoto-mpi-worker.C | 20 ++++++-----
bin/gyoto.C | 7 ++--
include/GyotoScenery.h | 6 ++--
lib/Scenery.C | 92 ++++++++++++++++++++++++--------------------------
4 files changed, 61 insertions(+), 64 deletions(-)
diff --git a/bin/gyoto-mpi-worker.C b/bin/gyoto-mpi-worker.C
index 701d14d..2a1cee1 100644
--- a/bin/gyoto-mpi-worker.C
+++ b/bin/gyoto-mpi-worker.C
@@ -75,9 +75,8 @@ int main(int argc, char** argv) {
MPI_Comm parent_c;
MPI_Comm_get_parent(&parent_c);
- mpi::intercommunicator manager(parent_c,mpi::comm_take_ownership);
mpi::communicator world;
- mpi::communicator team=manager.merge(true);
+ mpi::communicator team=mpi::intercommunicator(parent_c,mpi::comm_take_ownership).merge(true);
string pluglist= getenv("GYOTO_PLUGINS")?
getenv("GYOTO_PLUGINS"):
@@ -89,12 +88,15 @@ int main(int argc, char** argv) {
int rk=world.rank();
- //Gyoto::debug(1);
+ sc = new Scenery();
+ sc -> mpi_world_ = &world;
+ sc -> mpi_env_ = &env;
+ sc -> mpi_team_ = &team;
Scenery::mpi_tag task=Scenery::give_task;
- Scenery::is_worker=true;
+ Scenery::am_worker=true;
while (task != Scenery::terminate) {
- manager.recv(0, Scenery::give_task, task);
+ sc->mpiTask(task);
switch (task) {
case Scenery::read_scenery: {
std::string parfile;
@@ -102,9 +104,9 @@ int main(int argc, char** argv) {
curmsg = "In gyoto-mpi-worker.C: Error in Factory creation: ";
curretval = 1;
sc = Factory(const_cast<char*>(parfile.c_str())).getScenery();
- sc -> mpi_manager_ = &manager;
- sc -> mpi_world_ = &world;
- sc -> mpi_env_ = &env;
+ sc -> mpi_world_ = &world;
+ sc -> mpi_env_ = &env;
+ sc -> mpi_team_ = &team;
break;
}
case Scenery::raytrace:
@@ -113,11 +115,13 @@ int main(int argc, char** argv) {
sc -> rayTrace(0, 0, 0, 0, NULL, NULL);
break;
case Scenery::terminate:
+ sc = NULL;
break;
default:
std::cerr << "unknown task" << endl;
}
}
+ team.barrier();
return 0;
}
diff --git a/bin/gyoto.C b/bin/gyoto.C
index 0f21e56..8fe4142 100644
--- a/bin/gyoto.C
+++ b/bin/gyoto.C
@@ -93,7 +93,7 @@ int main(int argc, char** argv) {
double tobs=0., tmin=0., fov=0., dist=0., paln=0., incl=0., arg=0.;
size_t res=0, nthreads=0, nprocs=0;
// bool xtobs=0, xtmin=0, xfov=0, xres=0, xdist=0, xpaln=0, xincl=0, xarg=0;
- bool xtobs=0, xtmin=0, xfov=0, xres=0, xdist=0, xpaln=0, xincl=0, xarg=0, xnthreads=0, xnprocs;
+ bool xtobs=0, xtmin=0, xfov=0, xres=0, xdist=0, xpaln=0, xincl=0, xarg=0, xnthreads=0, xnprocs=0;
bool ipct=0;
long ipctdims[3]={0, 0, 0};
double ipcttime;
@@ -251,6 +251,7 @@ int main(int argc, char** argv) {
if (xnthreads) scenery -> nThreads ( nthreads );
#ifdef HAVE_MPI
if (xnprocs) {
+ cerr << "nprocs="<< nprocs<< endl;
scenery -> mpiSpawn(nprocs);
scenery -> mpiClone();
}
@@ -479,10 +480,6 @@ int main(int argc, char** argv) {
delete [] impactcoords;
}
-#ifdef HAVE_MPI
- if (xnprocs) scenery -> mpiTerminate();
-#endif
-
if (debug()) cerr << "DEBUG: gyoto.C: scenery==NULL" << endl;
scenery = NULL;
diff --git a/include/GyotoScenery.h b/include/GyotoScenery.h
index 399c822..1ad05cc 100644
--- a/include/GyotoScenery.h
+++ b/include/GyotoScenery.h
@@ -213,15 +213,15 @@ class Gyoto::Scenery : protected Gyoto::SmartPointee {
public:
boost::mpi::environment * mpi_env_;
boost::mpi::communicator * mpi_world_;
- boost::mpi::intercommunicator * mpi_workers_;
- boost::mpi::intercommunicator * mpi_manager_;
- static bool is_worker;
+ boost::mpi::communicator * mpi_team_;
+ static bool am_worker;
void mpiSpawn(int nbchildren);
void mpiTerminate (bool keep_env=false);
void mpiClone();
enum mpi_tag {give_task, read_scenery, terminate,
raytrace, raytrace_done, ready,
impactcoords, noimpactcoords};
+ void mpiTask(mpi_tag &tag);
# endif
// Constructors - Destructor
diff --git a/lib/Scenery.C b/lib/Scenery.C
index 2827320..084494d 100644
--- a/lib/Scenery.C
+++ b/lib/Scenery.C
@@ -52,7 +52,7 @@ Scenery::Scenery() :
screen_(NULL), delta_(GYOTO_DEFAULT_DELTA),
quantities_(0), ph_(), nthreads_(0)
#ifdef HAVE_MPI
- , mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
+ , mpi_env_(NULL), mpi_world_(NULL), mpi_team_(NULL)
#endif
{}
@@ -62,7 +62,7 @@ Scenery::Scenery(SmartPointer<Metric::Generic> met,
screen_(scr), delta_(GYOTO_DEFAULT_DELTA),
quantities_(0), ph_(), nthreads_(0)
#ifdef HAVE_MPI
- , mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
+ , mpi_env_(NULL), mpi_world_(NULL), mpi_team_(NULL)
#endif
{
metric(met);
@@ -76,7 +76,7 @@ Scenery::Scenery(const Scenery& o) :
quantities_(o.quantities_), ph_(o.ph_),
nthreads_(o.nthreads_)
#ifdef HAVE_MPI
- , mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
+ , mpi_env_(NULL), mpi_world_(NULL), mpi_team_(NULL)
#endif
{
if (o.screen_()) {
@@ -92,7 +92,7 @@ Scenery::~Scenery() {
# endif
screen_ = NULL;
# ifdef HAVE_MPI
- if (!Scenery::is_worker) mpiTerminate();
+ if (!Scenery::am_worker) mpiTerminate();
# endif
}
@@ -270,22 +270,23 @@ void Scenery::rayTrace(size_t imin, size_t imax,
if (data) setPropertyConverters(data);
#ifdef HAVE_MPI
- if (mpi_workers_ || Scenery::is_worker) {
+ if (mpi_team_) {
// We are in an MPI content, either the manager or a worker.
// dispatch over workers and monitor
+ if (!am_worker) {
+ mpi_tag tag=raytrace;
+ mpiTask(tag);
+ }
+
size_t ij[2]={imin, jmin};
size_t nbnuobs=0;
- Quantity_t quantities = GYOTO_QUANTITY_NONE;
+ Quantity_t quantities = (am_worker || !data)?GYOTO_QUANTITY_NONE:*data;
+ bool has_ipct=am_worker?false:bool(impactcoords);
- if (mpi_workers_) {
- if (data) quantities=Quantity_t(*data);
- for (int w=0; w<mpi_workers_->remote_size(); ++w)
- mpi_workers_->send(w, raytrace, quantities);
- } else {
- mpi_manager_->recv(0, raytrace, quantities);
- }
+ mpi::broadcast(*mpi_team_, quantities, 0);
+ mpi::broadcast(*mpi_team_, has_ipct, 0);
if (quantities & (GYOTO_QUANTITY_SPECTRUM | GYOTO_QUANTITY_BINSPECTRUM)) {
if (!spr) throwError("Spectral quantity requested but "
@@ -301,7 +302,7 @@ void Scenery::rayTrace(size_t imin, size_t imax,
size_t offset=1;
size_t curquant=0;
- if (Scenery::is_worker) {
+ if (Scenery::am_worker) {
// set all converters to the trivial one, conversion is
// performed in the manager.
intensity_converter_ = NULL;
@@ -353,14 +354,10 @@ void Scenery::rayTrace(size_t imin, size_t imax,
}
mpi::status s;
- if (mpi_workers_) { // We are the manager
- int working = mpi_workers_->remote_size();
+ if (!am_worker) { // We are the manager
+ int working = mpi_team_->size()-1;
// First tell the workers to join our task force
// The corresponding recv is in gyoto-scenery-worker.c
- for (int w=0; w<working; ++w) {
- mpi_workers_->send(w, give_task, raytrace);
- mpi_workers_->send(w, Scenery::impactcoords, bool(impactcoords));
- }
size_t *ijr = new size_t[working*2];
@@ -372,7 +369,7 @@ void Scenery::rayTrace(size_t imin, size_t imax,
// Wait for worker to ask for task.
// tag may be raytrace_done if worker has result to report,
// give_task if worker has no data yet.
- s = mpi_workers_ -> recv(mpi::any_source, mpi::any_tag, vect, nelt);
+ s = mpi_team_ -> recv(mpi::any_source, mpi::any_tag, vect, nelt);
w = s.source();
@@ -414,10 +411,10 @@ void Scenery::rayTrace(size_t imin, size_t imax,
ijr[2*w]=ij[0];
ijr[2*w+1]=ij[1];
size_t cell=(ijr[2*w+1]-1)*npix+ijr[2*w]-1;
- mpi_workers_ -> send(w, raytrace, ij, 2);
+ mpi_team_ -> send(w, raytrace, ij, 2);
if (impactcoords) {
- mpi_workers_ -> send(w, Scenery::impactcoords, impactcoords+cell*16, 16);
+ mpi_team_ -> send(w, Scenery::impactcoords, impactcoords+cell*16, 16);
}
if (++ij[0]>imax) {
@@ -425,34 +422,32 @@ void Scenery::rayTrace(size_t imin, size_t imax,
if (++ij[1]<=jmax) ij[0]=imin;
}
} else {
- mpi_workers_ -> send(w, raytrace_done, ij, 2);
+ mpi_team_ -> send(w, raytrace_done, ij, 2);
--working;
}
}
delete [] ijr;
} else {
// We are a worker, do we need to query for impactcoords?
- bool has_ipct;
double ipct[16];
- mpi_manager_->recv(0, Scenery::impactcoords, has_ipct);
if (has_ipct) impactcoords=&ipct[0];
// First send dummy result, using tag "give_tag".
// Manager will ignore the results and send first coordinates.
- mpi_manager_->send(0, give_task, vect, nelt);
+ mpi_team_->send(0, give_task, vect, nelt);
while (true) {
// Receive new coordinates to work on.
- s = mpi_manager_->recv(0, mpi::any_tag, ij, 2);
+ s = mpi_team_->recv(0, mpi::any_tag, ij, 2);
if (s.tag()==raytrace_done) {
break;
}
// Receive impactcoords if needed
if (has_ipct)
- s = mpi_manager_->recv(0, Scenery::impactcoords, impactcoords, 16);
+ s = mpi_team_->recv(0, Scenery::impactcoords, impactcoords, 16);
locdata->init(nbnuobs);
(*this)(ij[0], ij[1], locdata, impactcoords, &ph_);
// send result
- mpi_manager_->send(0, raytrace_done, vect, nelt);
+ mpi_team_->send(0, raytrace_done, vect, nelt);
}
}
delete locdata;
@@ -835,7 +830,7 @@ SmartPointer<Scenery> Gyoto::Scenery::Subcontractor(FactoryMessenger* fmp) {
}
#ifdef HAVE_MPI
- if (!Scenery::is_worker && mpi) {
+ if (!Scenery::am_worker && mpi) {
sc -> mpiSpawn(mpi);
sc -> mpiClone();
}
@@ -847,7 +842,7 @@ SmartPointer<Scenery> Gyoto::Scenery::Subcontractor(FactoryMessenger* fmp) {
#endif
#ifdef HAVE_MPI
-bool Gyoto::Scenery::is_worker=false;
+bool Gyoto::Scenery::am_worker=false;
void Gyoto::Scenery::mpiSpawn(int nbchildren) {
@@ -861,30 +856,29 @@ void Gyoto::Scenery::mpiSpawn(int nbchildren) {
setenv("PATH", PATH.c_str(), 1);
- if (mpi_workers_) {
- if (mpi_workers_->size()==nbchildren) return;
+ if (mpi_team_) {
+ if (mpi_team_->size()==nbchildren+1) return;
mpiTerminate(true);
}
if (!mpi_env_) mpi_env_ = new mpi::environment();
if (!mpi_world_) mpi_world_ = new mpi::communicator();
MPI_Comm children_c;
- MPI_Comm_spawn("gyoto-mpi-worker", MPI_ARGV_NULL, nbchildren,
+ MPI_Comm_spawn(const_cast<char*>("gyoto-mpi-worker"),
+ MPI_ARGV_NULL, nbchildren,
MPI_INFO_NULL, 0, MPI_COMM_SELF, &children_c,
MPI_ERRCODES_IGNORE);
- mpi_workers_ = new mpi::intercommunicator (children_c, mpi::comm_take_ownership);
- int size;
- MPI_Comm_remote_size(children_c, &size);
+ mpi_team_ = new mpi::communicator(mpi::intercommunicator (children_c, mpi::comm_take_ownership).merge(false));
}
void Gyoto::Scenery::mpiTerminate(bool keep_env) {
- if (mpi_workers_) {
- for (int i=0; i < mpi_workers_->remote_size(); ++i) {
- mpi_workers_->send(i, give_task, terminate);
- }
- delete mpi_workers_;
- mpi_workers_=NULL;
+ if (mpi_team_) {
+ mpi_tag tag=terminate;
+ mpiTask(tag);
+ mpi_team_->barrier();
+ delete mpi_team_;
+ mpi_team_=NULL;
}
if (mpi_world_ && !keep_env) {
delete mpi_world_;
@@ -901,11 +895,13 @@ void Gyoto::Scenery::mpiClone()
std::string xmldata=
Gyoto::Factory(this).format();
int errcode;
- for (int i=0; i < mpi_workers_->remote_size(); ++i) {
- mpi_workers_->send(i, give_task, read_scenery);
- }
- broadcast(mpi_workers_->merge(false), xmldata, 0);
+ mpi_tag tag=read_scenery;
+ mpiTask(tag);
+ broadcast(*mpi_team_, xmldata, 0);
+}
+void Gyoto::Scenery::mpiTask(mpi_tag &tag) {
+ mpi::broadcast(*mpi_team_, tag, 0);
}
#endif
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-astro/packages/gyoto.git
More information about the Debian-astro-commits
mailing list