cloudy  trunk
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
mpi_utilities.cpp
Go to the documentation of this file.
1 /* This file is part of Cloudy and is copyright (C)1978-2017 by Gary J. Ferland and
2  * others. For conditions of distribution and use see copyright notice in license.txt */
3 #include "cddefines.h"
4 #include "save.h"
5 #include "dynamics.h"
6 #include "grid.h"
7 #if defined(__unix) || defined(__APPLE__)
8 #include <cstddef>
9 #include <sys/types.h>
10 #include <sys/wait.h>
11 #include <unistd.h>
12 #else
13 #define pid_t int
14 #define fork() TotalInsanityAsStub<pid_t>()
15 #define wait(X) TotalInsanityAsStub<pid_t>()
16 #endif
17 #ifdef __linux__
18 #include <sys/vfs.h>
19 // these come from /usr/include/linux/magic.h, but from linux kernel 3.7
20 // onwards, the file seems to have moved to /usr/include/uapi/linux/magic.h...
21 // to avoid the mess we copy the constants here
22 static const long NFS_SUPER_MAGIC = 0x6969L;
23 static const long SMB_SUPER_MAGIC = 0x517BL;
24 static const long AUTOFS_SUPER_MAGIC = 0x0187L;
25 // this one is not included in my header file (sshfs is also based on fuse)
26 static const long FUSE_SUPER_MAGIC = 0x65735546L;
27 #endif
28 
29 STATIC void GridGatherOutput(const string&,long,const vector<long>&);
30 STATIC void GridGatherOutputSequential(const string&,long);
31 STATIC void GridGatherOutputParallel(const string&,long,const vector<long>&);
32 STATIC bool lgIsRemote(const string&);
33 STATIC void check_grid_file(const string&,int,int);
34 
35 #ifndef MPI_ENABLED
36 
41 
42 int MPI_SUCCESS = 0;
43 int MPI_ERR_INTERN = -1;
45 
47 {
48  return TotalInsanityAsStub<int>();
49 }
50 
51 #endif
52 
56 
57 // NB NB this routine cannot throw any exceptions as it is executed outside
58 // the try{} block -- this includes mechanisms like ASSERT and cdEXIT!
59 void load_balance::init( unsigned int nJobs, unsigned int nCPU )
60 {
61  if( nJobs == 0 )
62  return;
63 
64  bool lgMPI = cpu.i().lgMPI();
65 
66  p_jobs.resize( nJobs );
67  p_rank = cpu.i().nRANK();
68  p_ptr = p_rank;
69  p_ncpu = min(nCPU,nJobs);
70  // the master rank will now set up a random sequence for the jobs
71  // this way we hope to get statistical load balancing of the ranks
72  if( p_rank == 0 )
73  {
74  for( unsigned int i=0; i < nJobs; ++i )
75  p_jobs[i] = i;
76 
77  if( p_ncpu > 1 )
78  {
79  // This may or may not seed the random number generator used by
80  // random_shuffle. There is no portable C++ interface to do this :-(
81  srand( unsigned( time(NULL) ) );
82  random_shuffle( p_jobs.begin(), p_jobs.end() );
83  }
84  }
85  // now broadcast the random sequence to the other ranks...
86  if( lgMPI )
87  MPI_Bcast( &p_jobs[0], nJobs, MPI_type(p_jobs[0]), 0, MPI_COMM_WORLD );
88  else
89  for( unsigned int i=1; i < p_ncpu; ++i )
90  {
91  fflush(NULL);
92  pid_t pid = fork();
93  if( pid < 0 )
94  {
95  fprintf( ioQQQ, "creating the child process failed\n" );
96  // this _should_ be exit() not cdEXIT()!
97  exit(EXIT_FAILURE);
98  }
99  else if( pid == 0 )
100  {
101  /* this is child process */
102  p_rank = i;
103  p_ptr = p_rank;
104  cpu.i().set_nRANK( p_rank );
105  return;
106  }
107  }
108 }
109 
111 {
112  // wait for all jobs to finish
113  if( cpu.i().lgMPI() )
114  MPI_Barrier( MPI_COMM_WORLD );
115  else
116  {
117  if( p_rank == 0 )
118  for( unsigned int i=1; i < p_ncpu; ++i )
119  (void)wait(NULL);
120  else
121  // this _should_ be exit() not cdEXIT()!
122  exit(exit_status);
123  }
124 }
125 
128 {
129  DEBUG_ENTRY( "process_output()" );
130 
131  // NOTE: when this routine is called all file handles have already been closed
132 
133  try
134  {
135  string main_input = save.chRedirectPrefix + ".in";
136  string main_output = save.chRedirectPrefix + ".out";
137 
138  // balance work over the ranks
139  // rank n will process file numbers i with bound[n] <= i < bound[n+1]
140  // in non-MPI runs, this will result in:
141  // bound[0] = 0; bound[1] = grid.totNumModels;
142  int nCPU = cpu.i().lgMPI() ? cpu.i().nCPU() : 1;
143  int nRANK = cpu.i().lgMPI() ? cpu.i().nRANK() : 0;
144  long stride = grid.totNumModels/nCPU;
145  vector<long> bound( nCPU+1, stride );
146  long remainder = grid.totNumModels - stride*nCPU;
147  for( long i=1; i <= remainder; ++i )
148  ++bound[i];
149  bound[0] = 0;
150  for( long i=1; i <= nCPU; ++i )
151  bound[i] += bound[i-1];
152 
153  ASSERT( bound[nCPU] == grid.totNumModels );
154 
155  // first process main output files
157  GridGatherOutput( main_output, grid.totNumModels, bound );
158 
159  // remove input files for individual grid points
160  for( long j=0; j < grid.totNumModels; ++j )
161  {
162  if( j >= bound[nRANK] && j < bound[nRANK+1] )
163  {
164  string in_name = GridPointPrefix(j) + save.chRedirectPrefix + ".in";
165  remove( in_name.c_str() );
166  }
167  }
168 
169  fstream main_input_handle;
170  open_data( main_input_handle, main_input.c_str(), mode_r, AS_LOCAL_ONLY );
171  string line;
172 
173  int ipPun = 0;
174 
175  while( getline( main_input_handle, line ) )
176  {
177  string caps_line;
178  // create all caps version
179  string::const_iterator p = line.begin();
180  while( p != line.end() )
181  caps_line.push_back( toupper(*p++) );
182  if( caps_line.compare( 0, 4, "SAVE" ) == 0 || caps_line.compare( 0, 4, "PUNC" ) == 0 )
183  {
184  ASSERT( ipPun < save.nsave );
185  string fnam = save.chFilenamePrefix;
186  string::size_type p = line.find( '"' );
187  fnam += line.substr( ++p );
188  fnam.erase( fnam.find( '"' ) );
189  // first do a minimal check on the validity of the save files
190  for( int j=0; j < grid.totNumModels; ++j )
191  if( j >= bound[nRANK] && j < bound[nRANK+1] )
192  check_grid_file( fnam, j, ipPun );
193  // and concatenate the output if necessary
194  if( save.lgSaveToSeparateFiles[ipPun] )
195  {
196  if( cpu.i().lgMaster() )
197  {
198  // open in binary mode in case we are writing a FITS file
199  FILE *dest = open_data( fnam.c_str(), "ab" );
200  // keep the save files for each grid point separate
201  // the main save file contains the save header
202  // salvage it by prepending it to the first save file
203  // this gives the same behavior as in non-MPI runs
204  string gridnam = GridPointPrefix(0) + fnam;
205  append_file( dest, gridnam.c_str() );
206  fclose( dest );
207  // this will overwrite the old file gridnam
208  rename( fnam.c_str(), gridnam.c_str() );
209  }
210  }
211  else
212  {
213  GridGatherOutput( fnam, grid.totNumModels, bound );
214  }
215  if( caps_line.find( "XSPE", 4 ) != string::npos )
216  {
217  if( cpu.i().lgMaster() )
218  {
219  FILE *dest = open_data( fnam.c_str(), "ab" );
220  // dest points to an empty file, so generate the complete FITS file now
221  ASSERT( save.FITStype[ipPun] >= 0 &&
222  save.FITStype[ipPun] < NUM_OUTPUT_TYPES );
223  saveFITSfile( dest, save.FITStype[ipPun] );
224  fseek( dest, 0, SEEK_END );
225  ASSERT( ftell(dest)%2880 == 0 );
226  fclose( dest );
227  }
228  }
229  ++ipPun;
230  }
231  }
232  }
233  catch( ... )
234  {
235  fprintf( ioQQQ, "PROBLEM - an internal error occurred while post-processing the grid output\n" );
236  }
237 }
238 
239 STATIC void GridGatherOutput(const string& basenam,
240  long nfiles,
241  const vector<long>& bound)
242 {
243  if( cpu.i().lgMPI() && !lgIsRemote(basenam) )
244  GridGatherOutputParallel( basenam, nfiles, bound );
245  else {
246  if( cpu.i().lgMaster() )
247  GridGatherOutputSequential( basenam, nfiles );
248  }
249 }
250 
251 STATIC void GridGatherOutputSequential(const string& basenam,
252  long nfiles)
253 {
254  // open in binary mode in case we are writing a FITS file
255  FILE* output_handle = open_data( basenam.c_str(), "ab" );
256  for( long j=0; j < nfiles; ++j )
257  {
258  string gridnam = GridPointPrefix(j) + basenam;
259  append_file( output_handle, gridnam.c_str() );
260  remove( gridnam.c_str() );
261  }
262  fclose( output_handle );
263 }
264 
265 STATIC void GridGatherOutputParallel(const string& basenam,
266  long nfiles,
267  const vector<long>& bound)
268 {
269  // determine total amount of data each rank has to copy
270  // by summing the individual file sizes -- also remove input files
271  MPI_Offset mySize = 0;
272  for( long j=0; j < nfiles; ++j )
273  {
274  if( j >= bound[cpu.i().nRANK()] && j < bound[cpu.i().nRANK()+1] )
275  {
276  string gridnam = GridPointPrefix(j) + basenam;
277  FILE* fh = open_data( gridnam.c_str(), "r", AS_LOCAL_ONLY );
278  fseek( fh, 0, SEEK_END );
279  mySize += static_cast<MPI_Offset>( ftell(fh) );
280  fclose(fh);
281  }
282  }
283 
284  // broadcast the computed amounts to all ranks so that each
285  // rank can compute the offset where it needs to start writing
286  vector<MPI_Offset> offset(cpu.i().nCPU());
287  for( int i=0; i < cpu.i().nCPU(); ++i )
288  {
289  MPI_Offset myCopy = mySize;
290  // directly using &offset[i] below instead of the detour via
291  // &myCopy leads to segfaults for reasons that I cannot fathom...
292  MPI_Bcast( &myCopy, 1, MPI_type(myCopy), i, MPI_COMM_WORLD );
293  offset[i] = myCopy;
294  }
295 
296  MPI_File output_handle = open_data( basenam.c_str(), mpi_mode_a );
297 
298  // compute offset where each rank needs to start writing
299  MPI_Offset totalSize = 0;
300  (void)MPI_File_get_size( output_handle, &totalSize );
301  for( int j=0; j < cpu.i().nCPU(); ++j )
302  {
303  MPI_Offset tmp = offset[j];
304  offset[j] = totalSize;
305  totalSize += tmp;
306  }
307 
308  // now gather the output and remove the individual files
309  (void)MPI_File_set_view( output_handle, offset[cpu.i().nRANK()],
310  MPI_CHAR, MPI_CHAR, const_cast<char*>("native"),
311  MPI_INFO_NULL );
312  for( long j=0; j < nfiles; ++j )
313  {
314  if( j >= bound[cpu.i().nRANK()] && j < bound[cpu.i().nRANK()+1] )
315  {
316  string gridnam = GridPointPrefix(j) + basenam;
317  append_file( output_handle, gridnam.c_str() );
318  remove( gridnam.c_str() );
319  }
320  }
321  MPI_File_close( &output_handle );
322 }
323 
324 // determine if a file resides on a remote share
325 // this is needed to determine if MPI-IO can operate safely on the file
326 #ifdef __linux__
327 STATIC bool lgIsRemote(const string& fnam)
328 {
329  struct statfs buf;
330  int res = statfs( fnam.c_str(), &buf );
331  if( res != 0 )
332  return true;
333  // parallel file systems do not count as remote since MPI-IO is supported on those
334  if( buf.f_type == NFS_SUPER_MAGIC ||
335  buf.f_type == SMB_SUPER_MAGIC ||
336  buf.f_type == AUTOFS_SUPER_MAGIC ||
337  buf.f_type == FUSE_SUPER_MAGIC )
338  return true;
339  else
340  return false;
341 }
342 #else
343 STATIC bool lgIsRemote(const string&)
344 {
345  // we do not know how to determine this, so we assume the worst
346  return true;
347 }
348 #endif
349 
351 STATIC void check_grid_file( const string& fnam, int j, int ipPun )
352 {
353  DEBUG_ENTRY( "check_grid_file()" );
354 
355  // these are binary files, don't touch them...
356  if( save.lgFITS[ipPun] )
357  return;
358 
359  bool lgForceNoDelimiter = false;
360  // in these cases there should not be a GRID_DELIMIT string...
361  if( !save.lgHashEndIter[ipPun] || !save.lg_separate_iterations[ipPun] ||
362  dynamics.lgTimeDependentStatic || strcmp( save.chHashString , "TIME_DEP" ) == 0 || strcmp( save.chHashString , "\n" ) == 0 )
363  lgForceNoDelimiter = true;
364 
365  bool lgAppendDelimiter = true;
366  bool lgAppendNewline = false;
367  string gridnam = GridPointPrefix(j) + fnam;
368  fstream str;
369  open_data( str, gridnam.c_str(), mode_r, AS_LOCAL_ONLY_TRY );
370  if( str.is_open() )
371  {
372  str.seekg( 0, ios_base::end );
373  if( str.good() && str.tellg() > 0 )
374  {
375  // check if the file ends in a newline
376  str.seekg( -1, ios_base::cur );
377  char chr;
378  str.get( chr );
379  lgAppendNewline = ( chr != '\n' );
380  // check if the GRID_DELIMIT string is present
381  string line;
382  str.seekg( 0, ios_base::beg );
383  while( getline( str, line ) )
384  {
385  if( line.find( "GRID_DELIMIT" ) != string::npos )
386  lgAppendDelimiter = false;
387  }
388  }
389  str.close();
390  }
391  if( lgForceNoDelimiter )
392  lgAppendDelimiter = false;
393  if( lgAppendNewline || lgAppendDelimiter )
394  {
395  open_data( str, gridnam.c_str(), mode_a, AS_LOCAL_ONLY_TRY );
396  if( str.is_open() )
397  {
398  if( lgAppendNewline )
399  str << endl;
400  if( lgAppendDelimiter )
401  {
402  str << save.chHashString << " GRID_DELIMIT -- grid";
403  str << setfill( '0' ) << setw(9) << j << endl;
404  }
405  str.close();
406  }
407  }
408 }
409 
411 void append_file( FILE *dest, const char *source )
412 {
413  DEBUG_ENTRY( "append_file()" );
414 
415  FILE *src = open_data( source, "rb", AS_LOCAL_ONLY_TRY );
416  if( src == NULL )
417  return;
418 
419  // limited testing shows that using a 4 KiB buffer should
420  // give performance that is at least very close to optimal
421  // tests were done by concatenating 10 copies of a 62.7 MiB file
422  const size_t BUF_SIZE = 4096;
423  char buf[BUF_SIZE];
424 
425  while( ! feof(src) )
426  {
427  size_t nb = fread( buf, sizeof(char), BUF_SIZE, src );
428  fwrite( buf, sizeof(char), nb, dest );
429  }
430  fclose(src);
431  return;
432 }
433 
435 void append_file( MPI_File dest, const char *source )
436 {
437  DEBUG_ENTRY( "append_file()" );
438 
439  FILE *src = open_data( source, "rb", AS_LOCAL_ONLY_TRY );
440  if( src == NULL )
441  return;
442 
443  // use larger buffer for parallel file systems
444  const size_t BUF_SIZE = 32768;
445  char buf[BUF_SIZE];
446 
447  while( ! feof(src) )
448  {
449  size_t nb = fread( buf, sizeof(char), BUF_SIZE, src );
450  MPI_Status status;
451  (void)MPI_File_write( dest, buf, nb, MPI_CHAR, &status );
452  }
453  fclose(src);
454  return;
455 }
long nRANK() const
Definition: cpu.h:392
FILE * open_data(const char *fname, const char *mode, access_scheme scheme)
Definition: cpu.cpp:751
void finalize(exit_type exit_status)
STATIC long int ipPun
Definition: save_do.cpp:721
unsigned int p_ncpu
long MPI_Status
Definition: mpi_utilities.h:73
string chFilenamePrefix
Definition: save.h:413
t_cpu_i & i()
Definition: cpu.h:415
#define pid_t
STATIC void GridGatherOutput(const string &, long, const vector< long > &)
bool lgKeepMainOutputSeparate
Definition: grid.h:52
int MPI_MODE_WRONLY
bool lgTimeDependentStatic
Definition: dynamics.h:102
int MPI_ERR_INTERN
STATIC void check_grid_file(const string &, int, int)
int MPI_MODE_CREATE
char chHashString[INPUT_LINE_LENGTH]
Definition: save.h:398
#define MPI_Barrier(Z)
Definition: mpi_utilities.h:84
void * MPI_File
Definition: mpi_utilities.h:74
#define MPI_File_close(Z)
Definition: mpi_utilities.h:95
FILE * ioQQQ
Definition: cddefines.cpp:7
void set_nRANK(long n)
Definition: cpu.h:391
t_dynamics dynamics
Definition: dynamics.cpp:42
exit_type
Definition: cddefines.h:142
unsigned int p_ptr
long int nsave
Definition: save.h:303
vector< int > p_jobs
void init(unsigned int nJobs, unsigned int nCPU)
int mpi_mode_a
char toupper(char c)
Definition: cddefines.h:743
long totNumModels
Definition: grid.h:61
bool lg_separate_iterations[LIMPUN]
Definition: save.h:319
const ios_base::openmode mode_a
Definition: cpu.h:269
const ios_base::openmode mode_r
Definition: cpu.h:267
#define STATIC
Definition: cddefines.h:118
int MPI_MODE_RDONLY
int MPI_SUCCESS
#define MPI_File_set_view(U, V, W, X, Y, Z)
Definition: mpi_utilities.h:92
STATIC void GridGatherOutputParallel(const string &, long, const vector< long > &)
STATIC bool lgIsRemote(const string &)
#define wait(X)
const int NUM_OUTPUT_TYPES
Definition: grid.h:22
#define EXIT_FAILURE
Definition: cddefines.h:168
bool lgMaster() const
Definition: cpu.h:393
MPI_File MPI_FILE_NULL
long min(int a, long b)
Definition: cddefines.h:766
#define MPI_File_write(V, W, X, Y, Z)
Definition: mpi_utilities.h:94
int total_insanity(MPI_File, int, MPI_Status *)
t_grid grid
Definition: grid.cpp:5
#define MPI_File_get_size(Y, Z)
Definition: mpi_utilities.h:93
void append_file(FILE *dest, const char *source)
#define ASSERT(exp)
Definition: cddefines.h:617
int FITStype[LIMPUN]
Definition: save.h:380
bool lgFITS[LIMPUN]
Definition: save.h:377
bool lgHashEndIter[LIMPUN]
Definition: save.h:394
int MPI_MODE_APPEND
#define DEBUG_ENTRY(funcname)
Definition: cddefines.h:729
bool lgMPI() const
Definition: cpu.h:388
unsigned int p_rank
int fprintf(const Output &stream, const char *format,...)
Definition: service.cpp:1217
void saveFITSfile(FILE *io, int option)
Definition: save_fits.cpp:83
long nCPU() const
Definition: cpu.h:385
string GridPointPrefix(int n)
void process_output()
static t_cpu cpu
Definition: cpu.h:423
t_save save
Definition: save.cpp:5
long MPI_Offset
Definition: mpi_utilities.h:72
string chRedirectPrefix
Definition: save.h:417
#define fork()
bool lgSaveToSeparateFiles[LIMPUN]
Definition: save.h:315
STATIC void GridGatherOutputSequential(const string &, long)
#define MPI_Bcast(V, W, X, Y, Z)
Definition: mpi_utilities.h:85
int mpi_mode_r
int mpi_mode_w