[Debian-astro-commits] [gyoto] 42/221: * rename Scenery::is_worker to am_worker * give raytrace task earlier in Scenery::raytrace() * merge workers and manager into a team, use broadcast instead of P2P whenever appropriate

Thibaut Jean-Claude Paumard thibaut at moszumanska.debian.org
Fri May 22 20:52:32 UTC 2015


This is an automated email from the git hooks/post-receive script.

thibaut pushed a commit to branch master
in repository gyoto.

commit 7b1e3a433b633ae92823a9b8fe11cdbbf10dc453
Author: Thibaut Paumard <paumard at users.sourceforge.net>
Date:   Mon Oct 13 09:12:08 2014 +0200

    * rename Scenery::is_worker to am_worker
    * give raytrace task earlier in Scenery::raytrace()
    * merge workers and manager into a team, use broadcast instead of P2P whenever appropriate
---
 bin/gyoto-mpi-worker.C | 20 ++++++-----
 bin/gyoto.C            |  7 ++--
 include/GyotoScenery.h |  6 ++--
 lib/Scenery.C          | 92 ++++++++++++++++++++++++--------------------------
 4 files changed, 61 insertions(+), 64 deletions(-)

diff --git a/bin/gyoto-mpi-worker.C b/bin/gyoto-mpi-worker.C
index 701d14d..2a1cee1 100644
--- a/bin/gyoto-mpi-worker.C
+++ b/bin/gyoto-mpi-worker.C
@@ -75,9 +75,8 @@ int main(int argc, char** argv) {
   MPI_Comm parent_c;
   MPI_Comm_get_parent(&parent_c);
 
-  mpi::intercommunicator manager(parent_c,mpi::comm_take_ownership);
   mpi::communicator world;
-  mpi::communicator team=manager.merge(true);
+  mpi::communicator team=mpi::intercommunicator(parent_c,mpi::comm_take_ownership).merge(true);
 
   string pluglist= getenv("GYOTO_PLUGINS")?
     getenv("GYOTO_PLUGINS"):
@@ -89,12 +88,15 @@ int main(int argc, char** argv) {
 
   int rk=world.rank();
 
-  //Gyoto::debug(1);
+  sc = new Scenery();
+  sc -> mpi_world_   = &world;
+  sc -> mpi_env_     = &env;
+  sc -> mpi_team_    = &team;
 
   Scenery::mpi_tag task=Scenery::give_task;
-  Scenery::is_worker=true;
+  Scenery::am_worker=true;
   while (task != Scenery::terminate) {
-    manager.recv(0, Scenery::give_task, task);
+    sc->mpiTask(task);
     switch (task) {
     case Scenery::read_scenery: {
       std::string parfile;
@@ -102,9 +104,9 @@ int main(int argc, char** argv) {
       curmsg = "In gyoto-mpi-worker.C: Error in Factory creation: ";
       curretval = 1;
       sc = Factory(const_cast<char*>(parfile.c_str())).getScenery();
-      sc -> mpi_manager_ = &manager;
-      sc -> mpi_world_ = &world;
-      sc -> mpi_env_ = &env;
+      sc -> mpi_world_   = &world;
+      sc -> mpi_env_     = &env;
+      sc -> mpi_team_    = &team;
      break;
     }
     case Scenery::raytrace:
@@ -113,11 +115,13 @@ int main(int argc, char** argv) {
       sc -> rayTrace(0, 0, 0, 0, NULL, NULL);
       break;
     case Scenery::terminate:
+      sc = NULL;
       break;
     default:
       std::cerr << "unknown task" << endl;
     }
   }
 
+  team.barrier();
   return 0;
 }
diff --git a/bin/gyoto.C b/bin/gyoto.C
index 0f21e56..8fe4142 100644
--- a/bin/gyoto.C
+++ b/bin/gyoto.C
@@ -93,7 +93,7 @@ int main(int argc, char** argv) {
   double tobs=0., tmin=0., fov=0., dist=0., paln=0., incl=0., arg=0.;
   size_t res=0, nthreads=0, nprocs=0;
   //  bool  xtobs=0, xtmin=0, xfov=0, xres=0, xdist=0, xpaln=0, xincl=0, xarg=0;
-  bool  xtobs=0, xtmin=0, xfov=0, xres=0, xdist=0, xpaln=0, xincl=0, xarg=0, xnthreads=0, xnprocs;
+  bool  xtobs=0, xtmin=0, xfov=0, xres=0, xdist=0, xpaln=0, xincl=0, xarg=0, xnthreads=0, xnprocs=0;
   bool  ipct=0;
   long  ipctdims[3]={0, 0, 0};
   double ipcttime;
@@ -251,6 +251,7 @@ int main(int argc, char** argv) {
     if (xnthreads)  scenery -> nThreads    ( nthreads  );
 #ifdef HAVE_MPI
     if (xnprocs) {
+      cerr << "nprocs="<< nprocs<< endl;
       scenery -> mpiSpawn(nprocs);
       scenery -> mpiClone();
     }
@@ -479,10 +480,6 @@ int main(int argc, char** argv) {
       delete [] impactcoords;
     }
 
-#ifdef HAVE_MPI
-    if (xnprocs) scenery -> mpiTerminate();
-#endif
-
     if (debug()) cerr << "DEBUG: gyoto.C: scenery==NULL" << endl;
     scenery = NULL;
 
diff --git a/include/GyotoScenery.h b/include/GyotoScenery.h
index 399c822..1ad05cc 100644
--- a/include/GyotoScenery.h
+++ b/include/GyotoScenery.h
@@ -213,15 +213,15 @@ class Gyoto::Scenery : protected Gyoto::SmartPointee {
  public:
   boost::mpi::environment * mpi_env_;
   boost::mpi::communicator * mpi_world_;
-  boost::mpi::intercommunicator * mpi_workers_;
-  boost::mpi::intercommunicator * mpi_manager_;
-  static bool is_worker;
+  boost::mpi::communicator * mpi_team_;
+  static bool am_worker;
   void mpiSpawn(int nbchildren);
   void mpiTerminate (bool keep_env=false);
   void mpiClone();
   enum mpi_tag {give_task, read_scenery, terminate,
 		raytrace, raytrace_done, ready,
 		impactcoords, noimpactcoords};
+  void mpiTask(mpi_tag &tag);
 # endif
 
   // Constructors - Destructor
diff --git a/lib/Scenery.C b/lib/Scenery.C
index 2827320..084494d 100644
--- a/lib/Scenery.C
+++ b/lib/Scenery.C
@@ -52,7 +52,7 @@ Scenery::Scenery() :
   screen_(NULL), delta_(GYOTO_DEFAULT_DELTA),
   quantities_(0), ph_(), nthreads_(0)
 #ifdef HAVE_MPI
-  , mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
+  , mpi_env_(NULL), mpi_world_(NULL), mpi_team_(NULL)
 #endif
 {}
 
@@ -62,7 +62,7 @@ Scenery::Scenery(SmartPointer<Metric::Generic> met,
   screen_(scr), delta_(GYOTO_DEFAULT_DELTA),
   quantities_(0), ph_(), nthreads_(0)
 #ifdef HAVE_MPI
-  , mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
+  , mpi_env_(NULL), mpi_world_(NULL), mpi_team_(NULL)
 #endif
 {
   metric(met);
@@ -76,7 +76,7 @@ Scenery::Scenery(const Scenery& o) :
   quantities_(o.quantities_), ph_(o.ph_), 
   nthreads_(o.nthreads_)
 #ifdef HAVE_MPI
-  , mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
+  , mpi_env_(NULL), mpi_world_(NULL), mpi_team_(NULL)
 #endif
 {
   if (o.screen_()) {
@@ -92,7 +92,7 @@ Scenery::~Scenery() {
 # endif
   screen_ = NULL;
 # ifdef HAVE_MPI
-  if (!Scenery::is_worker) mpiTerminate();
+  if (!Scenery::am_worker) mpiTerminate();
 # endif
  }
 
@@ -270,22 +270,23 @@ void Scenery::rayTrace(size_t imin, size_t imax,
   if (data) setPropertyConverters(data);
 
 #ifdef HAVE_MPI
-  if (mpi_workers_ || Scenery::is_worker) {
+  if (mpi_team_) {
     // We are in an MPI content, either the manager or a worker.
     // dispatch over workers and monitor
 
+    if (!am_worker) {
+      mpi_tag tag=raytrace;
+      mpiTask(tag);
+    }
+
     size_t ij[2]={imin, jmin};
 
     size_t nbnuobs=0;
-    Quantity_t quantities = GYOTO_QUANTITY_NONE;
+    Quantity_t quantities = (am_worker || !data)?GYOTO_QUANTITY_NONE:*data;
+    bool has_ipct=am_worker?false:bool(impactcoords);
     
-    if (mpi_workers_) {
-      if (data) quantities=Quantity_t(*data);
-      for (int w=0; w<mpi_workers_->remote_size(); ++w)
-	mpi_workers_->send(w, raytrace, quantities);
-    } else {
-      mpi_manager_->recv(0, raytrace, quantities);
-    }
+    mpi::broadcast(*mpi_team_, quantities, 0);
+    mpi::broadcast(*mpi_team_, has_ipct, 0);
 
     if (quantities & (GYOTO_QUANTITY_SPECTRUM | GYOTO_QUANTITY_BINSPECTRUM)) {
       if (!spr) throwError("Spectral quantity requested but "
@@ -301,7 +302,7 @@ void Scenery::rayTrace(size_t imin, size_t imax,
     size_t offset=1;
     size_t curquant=0;
 
-    if (Scenery::is_worker) {
+    if (Scenery::am_worker) {
       // set all converters to the trivial one, conversion is
       // performed in the manager.
       intensity_converter_ = NULL;
@@ -353,14 +354,10 @@ void Scenery::rayTrace(size_t imin, size_t imax,
     }
 
     mpi::status s;
-    if (mpi_workers_) { // We are the manager
-      int working = mpi_workers_->remote_size();
+    if (!am_worker) { // We are the manager
+      int working = mpi_team_->size()-1;
       // First tell the workers to join our task force
       // The corresponding recv is in gyoto-scenery-worker.c
-      for (int w=0; w<working; ++w) {
-	mpi_workers_->send(w, give_task, raytrace);
-	mpi_workers_->send(w, Scenery::impactcoords, bool(impactcoords));
-      }
 
       size_t *ijr = new size_t[working*2];
 
@@ -372,7 +369,7 @@ void Scenery::rayTrace(size_t imin, size_t imax,
 	// Wait for worker to ask for task.
 	// tag may be raytrace_done if worker has result to report, 
 	// give_task if worker has no data yet.
-	s = mpi_workers_ -> recv(mpi::any_source, mpi::any_tag, vect, nelt);
+	s = mpi_team_ -> recv(mpi::any_source, mpi::any_tag, vect, nelt);
 
 	w = s.source();
 	
@@ -414,10 +411,10 @@ void Scenery::rayTrace(size_t imin, size_t imax,
 	  ijr[2*w]=ij[0];
 	  ijr[2*w+1]=ij[1];
 	  size_t cell=(ijr[2*w+1]-1)*npix+ijr[2*w]-1;
-	  mpi_workers_ -> send(w, raytrace, ij, 2);
+	  mpi_team_ -> send(w, raytrace, ij, 2);
 
 	  if (impactcoords) {
-	    mpi_workers_ -> send(w, Scenery::impactcoords, impactcoords+cell*16, 16);
+	    mpi_team_ -> send(w, Scenery::impactcoords, impactcoords+cell*16, 16);
 	  }
 
 	  if (++ij[0]>imax) {
@@ -425,34 +422,32 @@ void Scenery::rayTrace(size_t imin, size_t imax,
 	    if (++ij[1]<=jmax) ij[0]=imin;
 	  }
 	} else {
-	  mpi_workers_ -> send(w, raytrace_done, ij, 2);
+	  mpi_team_ -> send(w, raytrace_done, ij, 2);
 	  --working;
 	}
       }
       delete [] ijr;
     } else {
       // We are a worker, do we need to query for impactcoords?
-      bool has_ipct;
       double ipct[16];
-      mpi_manager_->recv(0, Scenery::impactcoords, has_ipct);
       if (has_ipct) impactcoords=&ipct[0];
 
       // First send dummy result, using tag "give_tag".
       // Manager will ignore the results and send first coordinates.
-      mpi_manager_->send(0, give_task, vect, nelt);
+      mpi_team_->send(0, give_task, vect, nelt);
       while (true) {
 	// Receive new coordinates to work on.
-	s = mpi_manager_->recv(0, mpi::any_tag, ij, 2);
+	s = mpi_team_->recv(0, mpi::any_tag, ij, 2);
 	if (s.tag()==raytrace_done) {
 	  break;
 	}
 	// Receive impactcoords if needed
 	if (has_ipct)
-	  s = mpi_manager_->recv(0, Scenery::impactcoords, impactcoords, 16);
+	  s = mpi_team_->recv(0, Scenery::impactcoords, impactcoords, 16);
 	locdata->init(nbnuobs);
 	(*this)(ij[0], ij[1], locdata, impactcoords, &ph_);
 	// send result
-	mpi_manager_->send(0, raytrace_done, vect, nelt);
+	mpi_team_->send(0, raytrace_done, vect, nelt);
       }
     }
     delete locdata;
@@ -835,7 +830,7 @@ SmartPointer<Scenery> Gyoto::Scenery::Subcontractor(FactoryMessenger* fmp) {
   }
 #ifdef HAVE_MPI
 
-  if (!Scenery::is_worker && mpi) {
+  if (!Scenery::am_worker && mpi) {
     sc -> mpiSpawn(mpi);
     sc -> mpiClone();
   }
@@ -847,7 +842,7 @@ SmartPointer<Scenery> Gyoto::Scenery::Subcontractor(FactoryMessenger* fmp) {
 #endif
 
 #ifdef HAVE_MPI
-bool Gyoto::Scenery::is_worker=false;
+bool Gyoto::Scenery::am_worker=false;
 
 void Gyoto::Scenery::mpiSpawn(int nbchildren) {
 
@@ -861,30 +856,29 @@ void Gyoto::Scenery::mpiSpawn(int nbchildren) {
 
   setenv("PATH", PATH.c_str(), 1);
 
-  if (mpi_workers_) {
-    if (mpi_workers_->size()==nbchildren) return;
+  if (mpi_team_) {
+    if (mpi_team_->size()==nbchildren+1) return;
     mpiTerminate(true);
   }
   if (!mpi_env_)   mpi_env_   = new mpi::environment();
   if (!mpi_world_) mpi_world_ = new mpi::communicator();
 
   MPI_Comm children_c;
-  MPI_Comm_spawn("gyoto-mpi-worker", MPI_ARGV_NULL, nbchildren,
+  MPI_Comm_spawn(const_cast<char*>("gyoto-mpi-worker"),
+		 MPI_ARGV_NULL, nbchildren,
                  MPI_INFO_NULL, 0, MPI_COMM_SELF, &children_c,
                  MPI_ERRCODES_IGNORE);
 
-  mpi_workers_ = new mpi::intercommunicator (children_c, mpi::comm_take_ownership); 
-  int size;
-  MPI_Comm_remote_size(children_c, &size);
+  mpi_team_ = new mpi::communicator(mpi::intercommunicator (children_c, mpi::comm_take_ownership).merge(false));
 }
 
 void Gyoto::Scenery::mpiTerminate(bool keep_env) {
-  if (mpi_workers_) {
-    for (int i=0; i < mpi_workers_->remote_size(); ++i) {
-      mpi_workers_->send(i, give_task, terminate);
-    }
-    delete mpi_workers_;
-    mpi_workers_=NULL;
+  if (mpi_team_) {
+    mpi_tag tag=terminate;
+    mpiTask(tag);
+    mpi_team_->barrier();
+    delete mpi_team_;
+    mpi_team_=NULL;
   }
   if (mpi_world_ && !keep_env) {
     delete mpi_world_;
@@ -901,11 +895,13 @@ void Gyoto::Scenery::mpiClone()
   std::string xmldata=
     Gyoto::Factory(this).format();
   int errcode;
-  for (int i=0; i < mpi_workers_->remote_size(); ++i) {
-    mpi_workers_->send(i, give_task, read_scenery);
-  }
-  broadcast(mpi_workers_->merge(false), xmldata, 0);
+  mpi_tag tag=read_scenery;
+  mpiTask(tag);
+  broadcast(*mpi_team_, xmldata, 0);
+}
 
+void Gyoto::Scenery::mpiTask(mpi_tag &tag) {
+  mpi::broadcast(*mpi_team_, tag, 0);
 }
 
 #endif

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-astro/packages/gyoto.git



More information about the Debian-astro-commits mailing list