MIDAPACK - MIcrowave Data Analysis PACKage 1.0beta
Parallel software tools for high performance CMB DA analysis
toeplitz.c
00001 /*
00002 ** file: toeplitz.c, version 1.0beta, May 2012  
00003 **
00004 ** Project:  Midapack library, ANR MIDAS'09
00005 ** Purpose:  Provide Toeplitz algebra tools suitable for Cosmic Microwave Background (CMB)
00006 **           data analysis.
00007 **
00008 ** Authors:  Pierre Cargemel, Frederic Dauvergne, Maude Le Jeune, Antoine Rogier, 
00009 **           Radek Stompor (APC, Paris)
00010 **
00011 ** 
00012 ** Copyright (c) 2010-2012 APC CNRS Université Paris Diderot
00013 ** 
00014 ** This program is free software; you can redistribute it and/or modify
00015 ** it under the terms of the GNU General Public License as published by
00016 ** the Free Software Foundation; either version 3 of the License, or
00017 ** (at your option) any later version.
00018 ** This program is distributed in the hope that it will be useful,
00019 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
00020 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00021 ** GNU General Public License for more details.
00022 ** 
00023 ** You should have received a copy of the GNU General Public License
00024 ** along with this program; if not, see http://www.gnu.org/licenses/gpl.html
00025 **
00026 ** For more information about ANR MIDAS'09 project see
00027 ** http://www.apc.univ-paris7.fr/APC_CS/Recherche/Adamis/MIDAS09/index.html
00028 ** 
00029 ** ACKNOWLEDGMENT: This work has been supported in part by the French National 
00030 ** Research Agency (ANR) through COSINUS program (project MIDAS no. ANR-09-COSI-009).
00031 **
00032 **
00033 ** Revision 1.0  2012/05/01?  fd@APC
00034 ** Official release 1.0, including only the Toeplitz algebra module.
00035 **
00036 */
00037 
00078 #include "toeplitz.h"
00079 
00080 
00081 //=========================================================================
00082 //Global parameters
00083 
00085 
00087 int NFFT = 1;
00088 
00090 
00094 int FFTW_FLAG  = FFTW_ESTIMATE;
00095 
00096 //Global parameter just to know the rank for printing when VERBOSE mode is on
00097 int PRINT_RANK = -1;
00098 
00099 
00100 //=========================================================================
00101 
00103 
00108 int print_error_message(int error_number, char const *file, int line)
00109 {
00110   char *str_mess;
00111   str_mess = (char *) malloc(100 * sizeof(char));
00112   if(error_number == 1)    
00113     sprintf (str_mess, "Error on line %d of %s. Toeplitz band width > vector size\n", line, file);
00114   if(error_number == 2)    
00115     sprintf (str_mess, "Error on line %d of %s. Bad allocation.\n", line, file);
00116   if(error_number == 3)    
00117     sprintf (str_mess, "Error on line %d of %s. Error at fftw multithread initialization.\n", line, file);
00118   if(error_number == 7)
00119     sprintf (str_mess, "Error on line %d of %s.\n", line, file);
00120   fprintf(stderr, "%s", str_mess);
00121   printf("%s", str_mess);
00122   return error_number;
00123   
00124 }
00125 
00126 
00127 //=========================================================================
00128 
00130 
00144 int optimal_blocksize(int n, int lambda, int bs_flag) {
00145   int bs;  //computed optimal block size
00146   int min_bs;  //minimum block size used for the computation
00147   int min_pow2;  //minimum power of two index used for the block size computation
00148 
00149   min_bs = 3*lambda;
00150   min_pow2 = (int) ceil( log(min_bs)/log(2) );
00151   bs = pow(2, min_pow2);
00152 
00153 
00154   if (bs_flag==1) {  //flag to use a different formula to find an optimal block size
00155   while(bs < 2*(lambda-1)*log(bs+1) && bs<n) {
00156     bs = bs*2;  }}
00157 
00158   if (bs > n)       //This is to avoid block size much bigger than the matrix. Append mostly
00159     bs = 3*lambda;  //when the matrix is small compared to his bandwith
00160 
00161   if(VERBOSE)
00162     printf("Computed optimal blocksize is %d (with lambda = %d)\n", bs, lambda);
00163 
00164   return bs;
00165 }
00166 
00167 
00168 //=========================================================================
00169 
00171 
00186 int tpltz_init(int n, int lambda, int *nfft, int *blocksize, fftw_complex **T_fft, double *T, fftw_complex **V_fft, double **V_rfft, fftw_plan *plan_f, fftw_plan *plan_b)
00187 {
00188   //initialize block size
00189   *blocksize = optimal_blocksize(n, lambda, 0);
00190   //initialize nfft
00191   *nfft = NFFT;
00192   //initialize fftw plan allocation flag
00193   int fftw_flag = FFTW_FLAG;
00194 
00195   //initialize fftw for omp threads
00196   fftw_init_omp_threads();
00197   //initialize fftw array and plan for T (and make it circulant first)
00198   circ_init_fftw(T, (*blocksize), lambda, T_fft);
00199   //initialize fftw array and plan for V  
00200   rhs_init_fftw(nfft, (*blocksize), V_fft, V_rfft, plan_f, plan_b, fftw_flag);
00201 
00202   if (VERBOSE)
00203     printf("Initialization finished successfully\n");
00204 
00205   return 0;
00206 }
00207 
00208 
00209 //=========================================================================
00210 
00212 
00217 int fftw_init_omp_threads()
00218 {
00219   int status;
00220 
00221   //initialize fftw omp threads
00222 #ifdef fftw_MULTITHREADING
00223   status = fftw_init_threads();
00224   if (status==0)
00225     return print_error_message (3, __FILE__, __LINE__);
00226 
00227 #pragma omp parallel
00228   n_thread = omp_get_num_threads();
00229   if (VERBOSE)
00230     printf("Using %d threads\n", n_thread);
00231 
00232   fftw_plan_with_nthreads(n_thread);
00233   if (VERBOSE)
00234     printf("Using multithreaded FFTs with %d threads\n", n_thread);
00235 #endif
00236 
00237   return 0;
00238 }
00239 
00240 
00241 //=========================================================================
00242 
00244 
00254 int rhs_init_fftw(int *nfft, int fft_size, fftw_complex **V_fft, double **V_rfft, fftw_plan *plan_f, fftw_plan *plan_b, int fftw_flag)
00255 {
00256   //allocate fftw arrays and plans for V
00257   *V_fft  = (fftw_complex*) fftw_malloc((*nfft)*(fft_size/2+1) * sizeof(fftw_complex) );
00258   *V_rfft = (double*) fftw_malloc((*nfft)*fft_size * sizeof(double) );
00259   if (*V_fft==0 || *V_rfft==0)
00260     return print_error_message (2, __FILE__, __LINE__);
00261 
00262   *plan_f = fftw_plan_many_dft_r2c(1, &fft_size, (*nfft), *V_rfft, &fft_size, 1, fft_size, *V_fft, NULL, 1, fft_size/2+1, fftw_flag );
00263   *plan_b = fftw_plan_many_dft_c2r(1, &fft_size, (*nfft), *V_fft, NULL, 1, fft_size/2+1, *V_rfft, &fft_size, 1, fft_size, fftw_flag );
00264 
00265 
00266   return 0;
00267 }
00268 
00269 
00270 //=========================================================================
00271 
00273 
00281 int circ_init_fftw(double *T, int fft_size, int lambda, fftw_complex **T_fft)
00282 {
00283   //routine variable
00284   int i;
00285   int circ_fftw_flag = FFTW_ESTIMATE;
00286   //allocation for T_fft
00287   *T_fft = (fftw_complex*) fftw_malloc( (fft_size/2+1) * sizeof(fftw_complex) );
00288   if (*T_fft==0)
00289     return print_error_message (2, __FILE__, __LINE__);
00290   double *T_circ = (double*) (*T_fft);
00291 
00292   //inplace fft
00293   fftw_plan plan_f_T;
00294   plan_f_T   = fftw_plan_dft_r2c_1d( fft_size, T_circ, *T_fft, circ_fftw_flag );
00295 
00296   //make T circulant
00297 #pragma omp parallel for 
00298   for(i=0; i<fft_size+2;i++) 
00299     T_circ[i] = 0.0;
00300 
00301   T_circ[0] = T[0];
00302   for(i=1;i<lambda;i++) {
00303     T_circ[i] = T[i]; 
00304     T_circ[fft_size-i] = T[i];    } 
00305 
00306   fftw_execute(plan_f_T);
00307   fftw_destroy_plan(plan_f_T);
00308 
00309   return 0;
00310 }
00311 
00312 
00313 //=========================================================================
00314 
00316 
00324 int tpltz_cleanup(fftw_complex **T_fft, fftw_complex **V_fft, double **V_rfft,fftw_plan *plan_f, fftw_plan *plan_b){
00325   fftw_destroy_plan(*plan_f);
00326   fftw_destroy_plan(*plan_b);
00327   fftw_free(*T_fft);
00328   fftw_free(*V_fft);
00329   fftw_free(*V_rfft);
00330 #ifdef fftw_MULTITHREADING
00331   fftw_cleanup_threads();
00332 #endif
00333   fftw_cleanup();
00334 }
00335 
00336 
00337 //=========================================================================
00338 
00340 
00348 int copy_block(int ninrow, int nincol, double *Vin, int noutrow, int noutcol, double *Vout, int inrow, int incol, int nblockrow, int nblockcol, int outrow, int outcol, double norm, int set_zero_flag)
00349 {
00350   int i, j, p, offsetIn, offsetOut;
00351 
00352   //do some size checks first
00353   if( (nblockcol > nincol) || (nblockrow > ninrow) || (nblockcol > noutcol) || (nblockrow > noutrow)) {
00354     printf("Error in routine copy_block. Bad size setup.\n");
00355     return print_error_message(7, __FILE__, __LINE__);  
00356   }
00357 
00358   if(set_zero_flag) {
00359 #pragma omp parallel for private(i)
00360     for(i=0;i<noutrow*noutcol;i++)  //could use maybe memset but how about threading
00361       Vout[i] = 0.0;  
00362   }
00363 
00364   offsetIn = ninrow*incol+inrow;
00365   offsetOut = noutrow*outcol+outrow;
00366 
00367 #pragma omp parallel for private(i,j,p)
00368   for(i=0;i<nblockcol*nblockrow;i++) {    //copy the block
00369     j = i/nblockrow;
00370     p = i%nblockrow;
00371     Vout[offsetOut+j*noutrow+p] = Vin[offsetIn+j*ninrow+p]*norm;
00372   }
00373 
00374   return 0;
00375 }
00376 
00377 
00378 //=========================================================================
00379 
00381 
00397 int scmm_direct(int fft_size, fftw_complex *C_fft, int ncol, double *V_rfft, double **CV, fftw_complex *V_fft, fftw_plan plan_f_V, fftw_plan plan_b_CV)
00398 {
00399   //routine variables
00400   int sizeT = fft_size/2+1;
00401   int i, idx;
00402 
00403   //perform forward FFT
00404   fftw_execute(plan_f_V); //input in V_rfft; output in V_fft
00405 
00406 #pragma omp parallel for private(idx)
00407   for(i=0;i<ncol*sizeT;i++) {
00408     idx = i%sizeT;
00409     V_fft[i][0] = C_fft[idx][0]*V_fft[i][0]-C_fft[idx][1]*V_fft[i][1];
00410     V_fft[i][1] = C_fft[idx][0]*V_fft[i][1]+C_fft[idx][1]*V_fft[i][0];    }
00411 
00412   //perform  backward FFts
00413   fftw_execute(plan_b_CV); //input in V_fft; output in V_rfft 
00414 
00415   return 0;
00416 }
00417 
00418 
00419 //=========================================================================
00420 
00422 
00449 int scmm_basic(double **V, int blocksize, int m, fftw_complex *C_fft, int lambda, double **CV, fftw_complex *V_fft, double *V_rfft, int nfft, fftw_plan plan_f_V, fftw_plan plan_b_CV)
00450 {
00451   //routine variables
00452   int i,k; //loop index
00453   int nloop = (int) ceil((1.0*m)/nfft);  //number of subblocks
00454 
00455   // Loop over set of columns
00456   int ncol = min(nfft, m);  //a number of columns to be copied from the data to working matrix
00457                             //equal the number of simultaneous FFTs
00458 
00459 #pragma omp parallel for private(i)
00460   for( i=0;i<blocksize*ncol;i++)
00461     V_rfft[i] = 0.0;  //could use maybe memset but how about threading
00462 
00463   for(k=0;k<nloop;k++) {   //this is the main loop over the set of columns
00464     if (k==nloop-1)   //last loop ncol may be smaller than nfft
00465       ncol = m-(nloop-1)*nfft;  
00466 
00467   //init fftw matrices. 
00468   //extracts a block of ncol full-length columns from the data matrix and embeds in a bigger
00469   //matrix padding each column with lambda zeros. Note that all columns will be zero padded 
00470   //thanks to the "memset" call above
00471 
00472   copy_block(blocksize, m, (*V), blocksize, ncol, V_rfft, 0, k*nfft, blocksize, ncol, 0, 0, 1.0, 0);
00473   //note: all nfft vectors are transformed below ALWAYS in a single go (if ncol < nfft) the extra
00474   //useless work is done. 
00475 
00476   scmm_direct(blocksize, C_fft, ncol, V_rfft, CV, V_fft, plan_f_V, plan_b_CV);
00477   //note: the parameter CV is not really used
00478 
00479   //extract the relevant part from the result
00480   copy_block(blocksize, ncol, V_rfft, blocksize, m, (*CV), 0, 0, blocksize, ncol, 0, k*nfft, 1.0/((double) blocksize), 0);
00481 
00482   }  //end of loop over the column-sets
00483 
00484 
00485   return 0;
00486 }
00487 
00488 
00489 //=========================================================================
00490 
00492 
00515 int stmm_core(double **V, int n, int m, fftw_complex *T_fft, int blocksize, int lambda, fftw_complex *V_fft, double *V_rfft, int nfft, fftw_plan plan_f, fftw_plan plan_b, int flag_offset)
00516 {
00517 
00518   //routine variable 
00519   int status;
00520   int i,j,k,p;  //loop index 
00521   int currentsize;
00522   int blocksize_eff = blocksize-2*lambda;  //just a good part after removing the overlaps
00523   int nbloc = ceil( (1.0*n)/blocksize_eff);  //a number of subblock of slide/overlap algorithm
00524 
00525   if (flag_offset==1)
00526     nbloc = ceil((1.0*(n-2*lambda))/blocksize_eff);
00527 
00528 
00529   double *V_bloc, *TV_bloc;
00530   V_bloc  = (double *) calloc(blocksize*m, sizeof(double));
00531   TV_bloc = (double *) calloc(blocksize*m, sizeof(double));      
00532   if((V_bloc==0)||(TV_bloc==0))
00533     return print_error_message(2, __FILE__, __LINE__);
00534 
00535   int offset=0;
00536   if (flag_offset==1)
00537     offset=lambda;
00538 
00539   //note: lambda-1 instead of lambda should be enough to work.
00540   //The same for mpi routines, we generally use one extra term in the communications
00541 
00542   int iV = 0;  //"-lambda+offset";  //first index in V 
00543   int iTV = offset;  //first index in TV 
00544 
00545   //"k=0";
00546   //first subblock separately as it requires some padding. prepare the block of the data vector
00547   //with the overlaps on both sides
00548   currentsize = min( blocksize-lambda+offset, n-iV); 
00549   //note: if flag_offset=0, pad first lambda elements with zeros (for the first subblock only)
00550   // and if flag_offset=1 there is no padding with zeros.
00551   copy_block( n, m, *V, blocksize, m, V_bloc, 0, 0, currentsize, m, lambda-offset, 0, 1.0, 0);
00552 
00553   //do block computation 
00554   status = scmm_basic(&V_bloc, blocksize, m, T_fft, lambda, &TV_bloc, V_fft, V_rfft, nfft, plan_f, plan_b);
00555 
00556   if (status!=0) {
00557     printf("Error in stmm_core.");
00558     return print_error_message(7, __FILE__, __LINE__);  }
00559 
00560 
00561   //now copy first the new chunk of the data matrix **before** overwriting the input due to overlaps !
00562   iV = blocksize_eff-lambda+offset;
00563 
00564   if(nbloc > 1) {
00565     currentsize  = min( blocksize, n-iV);  //not to overshoot          
00566 
00567     int flag_reset = (currentsize!=blocksize);  //with flag_reset=1, always "memset" the block.
00568     copy_block( n, m, *V, blocksize, m, V_bloc, iV, 0, currentsize, m, 0, 0, 1.0, flag_reset);
00569   }
00570 
00571   //and now store the ouput back in V
00572   currentsize  = min( blocksize_eff, n-iTV);       // to trim the extra rows
00573   copy_block( blocksize, m, TV_bloc, n, m, *V, lambda, 0, currentsize, m, iTV, 0, 1.0, 0);
00574 
00575 
00576   iTV += blocksize_eff;
00577   //now continue with all the other subblocks    
00578   for(k=1;k<nbloc;k++) {
00579 
00580     //do bloc computation 
00581     status = scmm_basic(&V_bloc, blocksize, m, T_fft, lambda, &TV_bloc, V_fft, V_rfft, nfft, plan_f, plan_b);
00582     if (status!=0) break;
00583 
00584 
00585     iV += blocksize_eff;
00586     //copy first the next subblock to process 
00587     if(k != nbloc-1) {
00588       currentsize = min(blocksize, n-iV);  //not to overshoot          
00589 
00590       int flag_resetk = (currentsize!=blocksize);  //with flag_reset=1, always "memset" the block.
00591       copy_block( n, m, *V, blocksize, m, V_bloc, iV, 0, currentsize, m, 0, 0, 1.0, flag_resetk);
00592     }
00593 
00594     //and then store the output in V 
00595     currentsize  = min( blocksize_eff, n-iTV);  //not to overshoot               
00596     copy_block( blocksize, m, TV_bloc, n, m, *V, lambda, 0, currentsize, m, iTV, 0, 1.0, 0);
00597     iTV += blocksize_eff;
00598 
00599   }//end bloc computation 
00600 
00601 
00602   free(V_bloc);
00603   free(TV_bloc);
00604 
00605   return status;
00606 }
00607 
00608 
00609 
00610 //=========================================================================
00611 
00613 
00635 int stmm_reshape(double **V, int n, int m, int id0, int l, fftw_complex *T_fft, int lambda, fftw_complex *V_fft, double *V_rfft, fftw_plan plan_f, fftw_plan plan_b, int blocksize, int nfft)
00636 {
00637 
00638   //routine variable 
00639   int i,j,k,p;  //loop index 
00640   int idf       = id0+l-1;
00641   int cfirst    = id0/n;  //first column index 
00642   int clast     = idf/n;  //last column index 
00643   int clast_1   = (idf+1)/n;
00644   int m_eff     = clast - cfirst + 1 ;  //number of columns 
00645   int rfirst    = id0%n;
00646   int rlast     = idf%n;
00647 
00648   int fft_size;
00649   int flag_nfft=1;  //flag to avoid nfft reshaping when nfft=1. By default, it is active,
00650                     //thus if nfft==1
00652   int flag_offset;
00653 
00654   if (l<lambda) // test to avoid communications errors
00655     return print_error_message (1, __FILE__, __LINE__);
00656 
00657 
00658   //allocation for the vector shape V1 
00659   double *V1;
00660   int v1_size;
00661 
00662   v1_size = l + (m_eff-1)*lambda;  //we have  l = (m_eff-2)*n+ (n-rfirst) + (rlast+1) (just for information)
00663   V1 = (double *) calloc(v1_size, sizeof(double));
00664 
00665 
00666   //reshape the matrix V into a vector shape V1. It is add lambda zeros
00667   //at the end of each column.
00668 #pragma omp parallel for private(i,j,p)   
00669   for(i=0;i<l;i++) {
00670     j = (i+rfirst)/n;
00671     p = (i+rfirst)%n;
00672     V1[p-rfirst+j*(n+lambda)] = (*V)[i]; }
00673   //note: not really need copy if m=1. Will be improve as memory use.
00674 
00675 
00676   //shortcut possible for nfft==1 with flag_nfft==1. You need also to put flag_offset=0 in the core routine.
00677   //to make it work correctly.
00678   if (flag_nfft==1 && nfft==1) {
00679     flag_offset=0;
00680     int status = stmm_core(&V1, v1_size, nfft, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset);
00681     if (status!=0)
00682       return status;
00683 
00684   }
00685   else {  //perform sliding windows algorithm on a matrix shape optimize for nfft 
00686 
00687   fft_size = ceil(1.0*v1_size/nfft)+2*lambda;  //maybe use a better name  "fft_nnew"
00688   //  int blocksize_eff = blocksize-2*lambda;
00689   //  int nbloc = ceil(1.0*fft_size/blocksize_eff);  //just for information
00690   //  int fft_size = nbloc*blocksize_eff; //if we want to have exactly the size correspondance
00691                                        //(not really usefull).
00692 
00693   //reallocation of V with the computed matrix size above 
00694   (*V) = realloc((*V), fft_size*nfft * sizeof(double));
00695 
00696   //convert the vector shape into a matrix shape optimize for nfft
00697   vect2nfftblock(V1, v1_size, (*V), fft_size, nfft, lambda);
00698 
00699   //perform sequential multiplication by small blocks 
00700   flag_offset=1;
00701   int status = stmm_core(V, fft_size, nfft, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset);
00702   if (status!=0)
00703     return status;
00704 
00705   nfftblock2vect((*V), fft_size, nfft, lambda, V1, v1_size);
00706 
00707   }//end else of  if (flag_nfft==1)
00708 
00709   //reallocation of (*V) to his input size
00710   *V = realloc(*V, l * sizeof(double));
00711 
00712   //reshape the vector shape V1 into the input matrix V. It is remove the lambda zeros
00713   //at the end of each column used for the computation.
00714 #pragma omp parallel for private(j,p) 
00715   for(i=0;i<l;i++) {
00716     j=(i+rfirst)/n;
00717     p=(i+rfirst)%n;
00718     (*V)[i] = V1[p-rfirst+j*(n+lambda)]; }
00719 
00720 
00721   free(V1);
00722 
00723 //ps: we could consider theses transformations as pointers functions to avoid many copies
00724 //of datas. This will be improved in futurs developments as the memory use.
00725 
00726 
00727   return 0;
00728 }
00729 
00730 
00731 //=========================================================================
00732 
00734 
00740 int vect2nfftblock(double *V1, int v1_size, double *V2, int fft_size, int nfft, int lambda)
00741 {
00742   int i,j,p;
00743   //reshape the vector V1 into a matrix shape (*V) for nfft optimization
00744   // double *V2; V2 = (double *) calloc(fft_size*nfft, sizeof(double));
00745   //maybe use V2 instead to avoid reallocation. Anyway, find a way to better optimize memory use.
00746 
00747   //copy the lambda elements on the edges of each column of the reshape matrix
00748   for (j=0; j<nfft; j++){
00749     for (i=0; i<lambda; i++) {  //top overlap
00750       if (j*(fft_size-2*lambda)+i-lambda>=0 && j*(fft_size-2*lambda)+i-lambda<v1_size)
00751         (V2)[j*fft_size+i] = V1[j*(fft_size-2*lambda)+i-lambda];
00752       else
00753         (V2)[j*fft_size+i] = 0.0; }
00754     for (i=0; i<lambda; i++) {  //bottom overlap
00755       if ((j+1)*(fft_size-2*lambda)+i<v1_size)
00756         (V2)[fft_size-lambda+i+(j*fft_size)] = V1[(j+1)*(fft_size-2*lambda)+i];
00757       else
00758         (V2)[fft_size-lambda+i+(j*fft_size)] = 0.0;  }
00759   }
00760 
00761   //copy the middle part of the optimize nfft matrix shape (*V)
00762 #pragma omp parallel for private(j,p) 
00763   for(i=0;i<v1_size;i++) {
00764     j = i/(fft_size-2*lambda);
00765     p = i%(fft_size-2*lambda);
00766     (V2)[lambda+p+(j*fft_size)] = V1[i]; }  //core copy 
00767 
00768   for(i=v1_size;i<nfft*(fft_size-2*lambda);i++) {
00769     j = i/(fft_size-2*lambda);
00770     p = i%(fft_size-2*lambda);
00771     (V2)[lambda+p+(j*fft_size)] = 0.0; }  //padd zeros to extra terms (nfft*fft_size != v1_size)
00772 
00773 
00774   return 0;
00775 }
00776 
00777 
00778 //=========================================================================
00779 
00781 
00786 int nfftblock2vect(double *V2, int fft_size, int nfft, int lambda, double *V1, int v1_size)
00787 {
00788   int i,j,p;
00789 
00790   //reshape the matrix shape (*V) used for nfft optimization into the vector shape V1.
00791   //It is extract the middle part without the lambda edges elements of each column.
00792 #pragma omp parallel for private(j,p) 
00793   for(i=0;i<v1_size;i++) {
00794     j = i/(fft_size-2*lambda);
00795     p = i%(fft_size-2*lambda);
00796     V1[i] = V2[lambda+p+(fft_size*j)];   }
00797 
00798   //almost equivalent to
00799   //copy_block(fft_size, nfft, (*V), (fft_size-2*lambda), nfft, V1, lambda, 0, 0, 0, 1.0, 0);
00800   //not good if (fft_size-2*lambda)*nfft <> v1_size
00801   //         i.e. if ceil(1.0*v1_size/nfft) <> v1_size/nfft
00802   //We need to extend V1 memory to use this
00803 
00804   return 0;
00805 }
00806 
00807 
00808 //=========================================================================
00809 
00811 
00833 int stmm(double **V, int n, int m, int id0, int l, fftw_complex *T_fft, int lambda, fftw_complex *V_fft, double *V_rfft, fftw_plan plan_f, fftw_plan plan_b, int blocksize, int nfft)
00834 {
00835 
00836   //routine variable 
00837   int i,j,k,p;  //loop index 
00838   int idf       = id0+l-1;
00839   int cfirst    = id0/n;  //first column index 
00840   int clast     = idf/n;  //last column index 
00841   int clast_1   = (idf+1)/n;
00842   int m_eff     = clast - cfirst + 1 ;  //number of columns 
00843   int rfirst    = id0%n;
00844   int rlast     = idf%n;
00845 
00846   if (l<lambda) // test to avoid communications errors
00847     return print_error_message (1, __FILE__, __LINE__);
00848 
00849 //the middle
00850 int nloop_middle;  //change it to number of full column to improve memory
00851   if (m_eff>2)
00852     nloop_middle = (m_eff-2)/nfft;  
00853   else
00854     nloop_middle = 0;
00855 
00856   int m_middle = nfft*nloop_middle;
00857   int vmiddle_size = n*m_middle;
00858   double *Vmiddle;
00859 
00860 
00861   if (nloop_middle==0) {
00862     stmm_reshape(V, n, m, id0, l, T_fft, lambda, V_fft, V_rfft, plan_f, plan_b, blocksize, nfft);
00863 
00864   }
00865   else { //(nloop_middle>0)          
00866   
00867     int offset_middle = n-id0;
00868     Vmiddle = (*V)+offset_middle;
00869 
00870     int flag_offset=0;
00871     stmm_core(&Vmiddle, n, m_middle, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset);
00872 
00873 
00874 //edge  (first+last columns + extra column from the euclidian division)
00875   int v1_size = min(l,(n-rfirst));  
00876   int v2_size = l-(v1_size+vmiddle_size);
00877   if (v2_size<0)
00878     v2_size=0;
00879 
00880   int vedge_size = v1_size+v2_size;
00881   double *Vedge;
00882   Vedge = (double *) calloc(vedge_size, sizeof(double));
00883 
00884 
00885 #pragma omp parallel for private(i)
00886   for(i=0;i<v1_size;i++)
00887     Vedge[i] = (*V)[i];
00888 
00889 
00890   if (v2_size>0) {
00891   int offset_edge = v1_size+vmiddle_size;
00892 #pragma omp parallel for private(i)
00893   for(i=0;i<v2_size;i++)
00894     Vedge[v1_size+i] = (*V)[offset_edge+i];
00895   }
00896 
00897   stmm_reshape(&Vedge, n, m, id0, vedge_size, T_fft, lambda, V_fft, V_rfft, plan_f, plan_b, blocksize, nfft);
00898 
00899 //Copy result to V:
00900 #pragma omp parallel for private(i)
00901   for(i=0;i<v1_size;i++)
00902     (*V)[i]=Vedge[i];
00903 
00904   if (v2_size>0) {
00905   int offset_edge2 = v1_size+vmiddle_size;
00906 #pragma omp parallel for private(i)
00907   for(i=0;i<v2_size;i++)
00908     (*V)[offset_edge2+i]=Vedge[v1_size+i];
00909   }
00910 
00911   if (vmiddle_size>0) {
00912   int offset_middle2 = n-rfirst;
00913 #pragma omp parallel for private(i)
00914   for(i=0;i<vmiddle_size;i++)
00915     (*V)[offset_middle2+i]=Vmiddle[i];
00916   }
00917 
00918   free(Vedge);
00919 
00920   } //End (nloop_middle>0)
00921 
00922 //ps: this loop can be improved by considering the number of full column instead.
00923 //We can also improve the memory use and the number of copy.
00924 
00925   return 0;
00926 }
00927 
00928 
00929 //=========================================================================
00930  
00932 
00946 int mpi_stmm(double **V, int n, int m, int id0, int l, double *T, int lambda, MPI_Comm comm) 
00947 {
00948 
00949   //mpi variables 
00950   int rank;   //rank process 
00951   int size;   //number of processes 
00952   MPI_Status status; 
00953   MPI_Comm_rank(comm, &rank);     
00954   MPI_Comm_size(comm, &size);     
00955 
00956  
00957   //routine variables 
00958   int i,j,k; // some index 
00959   int idf = id0+l;  // first index of scattered V for rank "rank + 1"; 
00960   int cfirst = id0/n;   // first column index 
00961   int clast  = idf/n; // last column index 
00962   int clast_r = (idf-1)/n; 
00963   int m_eff  = clast_r - cfirst + 1 ; 
00964   double *V1, *Lambda;
00965 
00966   // Mpi communication conditions 
00967   // Mpi comm is needed when columns are truncated 
00968   int right = rank + 1; 
00969   int left  = rank - 1; 
00970   int v1_size = l + 2*lambda; // size including comm 
00971   if (rank==0 || cfirst*n==id0){ // no left comm 
00972     v1_size -= lambda; 
00973     left = MPI_PROC_NULL;}  
00974   if (rank==(size-1) || clast*n==idf){ // no right comm 
00975     v1_size -= lambda; 
00976     right = MPI_PROC_NULL;} 
00977 
00978   // init data to send 
00979   Lambda=(double *) malloc(2*lambda * sizeof(double)); 
00980   if (Lambda==0) 
00981     return print_error_message (2, __FILE__, __LINE__); 
00982  
00983   for(i=0;i<lambda;i++)    { 
00984     Lambda[i]=(*V)[i]; 
00985     Lambda[i+lambda]=(*V)[i+l-lambda];    } 
00986   if (VERBOSE) 
00987     printf("[rank %d] Left comm with %d | Right comm with %d\n", rank, left, right); 
00988 
00989   //send and receive data 
00990   MPI_Sendrecv_replace(Lambda, lambda, MPI_DOUBLE, left, MPI_USER_TAG, right, MPI_USER_TAG, comm, &status);  //1st comm 
00991   MPI_Sendrecv_replace(&(*(Lambda+lambda)), lambda, MPI_DOUBLE, right, MPI_USER_TAG, left, MPI_USER_TAG, comm, &status);  //2nd comm 
00992   
00993  
00994   if (l<lambda)  //After sendrecv to avoid problems of communication for others processors
00995     return print_error_message (1, __FILE__, __LINE__); 
00996 
00997   //copy received data  
00998   if(left==MPI_PROC_NULL && right==MPI_PROC_NULL)  // 0--0 : nothing to do
00999     V1 = *V;
01000   else if(left==MPI_PROC_NULL) { // 0--1 : realloc
01001     *V = realloc(*V, v1_size * sizeof(double));
01002     if(*V == NULL) 
01003       return print_error_message (2, __FILE__, __LINE__); 
01004     V1 = *V;  }
01005   else  // 1--1 or 1--0 : new allocation
01006     V1  = (double *) malloc(v1_size * sizeof(double)); 
01007   
01008   if (left!=MPI_PROC_NULL){ 
01009     for(i=0;i<lambda;i++) 
01010       V1[i] = Lambda[i+lambda]; 
01011     id0 -= lambda;} 
01012   if (right!=MPI_PROC_NULL){ 
01013     for(i=0;i<lambda;i++) 
01014       V1[i+v1_size-lambda] = Lambda[i];      } 
01015   
01016   // Copy input matrix V 
01017   int offset = 0; 
01018   if (left!=MPI_PROC_NULL){ 
01019     offset = lambda;
01020 #pragma omp parallel for 
01021     for(i=offset;i<l+offset;i++) 
01022       V1[i] = (*V)[i-offset]; }
01023   
01024   fftw_complex *V_fft, *T_fft; 
01025   double *V_rfft; 
01026   fftw_plan plan_f, plan_b; 
01027 
01028 
01029   //Compute matrix product 
01030   int nfft,  blocksize;
01031   tpltz_init(v1_size, lambda , &nfft, &blocksize, &T_fft, T, &V_fft, &V_rfft, &plan_f, &plan_b);
01032 
01033   if(VERBOSE)
01034   printf("[rank %d] Before middle-level call : blocksize=%d, nfft=%d\n", rank, blocksize, nfft);
01035 
01036   stmm(&V1, n, m, id0, v1_size, T_fft, lambda, V_fft, V_rfft, plan_f, plan_b, blocksize, nfft);
01037 
01038   tpltz_cleanup(&T_fft, &V_fft, &V_rfft,&plan_f, &plan_b ); 
01039   
01040   // Copy output matrix TV
01041   offset = 0;
01042   if (left!=MPI_PROC_NULL)
01043     offset = lambda;
01044   if(left==MPI_PROC_NULL && right==MPI_PROC_NULL) // 0--0
01045     *V=V1;
01046   else if(left==MPI_PROC_NULL) { // 0--1
01047     V1 = realloc(V1, l * sizeof(double));
01048     if(V1 == NULL)
01049       return print_error_message (2, __FILE__, __LINE__);
01050     *V = V1;     }
01051   else { // 1--0 or 1--1
01052 #pragma omp parallel for
01053     for(i=offset;i<l+offset;i++)
01054       (*V)[i-offset] = V1[i];    }
01055 
01056   if(left!=MPI_PROC_NULL)
01057     free(V1);
01058   
01059   return 0; 
01060 } 
01061 
01062 
01063 //=========================================================================
01064 
01066 
01090 int mpi_stbmm(double **V, int *n, int m, int nrow, double *T, int nb_blocks_local, int nb_blocks_all, int *lambda, int *idv, int idp, int local_V_size, MPI_Comm comm)
01091 {
01092 
01093   //MPI parameters 
01094   int rank;   //process rank 
01095   int size;   //process number 
01096 
01097   MPI_Status status;
01098   MPI_Comm_rank(comm, &rank);
01099   MPI_Comm_size(comm, &size);
01100 
01101   PRINT_RANK=rank;
01102 
01103   FILE *file;
01104   file = stdout;
01105 
01106   //neighbour if the process for mpi communication conditions if a block is shared
01107   int right = rank + 1;
01108   int left  = rank - 1;
01109 
01110   int i,j,k;  //some indexes 
01111 
01112 
01113   //Define the indices for each process
01114   int idv0, idvn;  //indice of the first and the last block of V for each processes
01115 
01116   int *nnew;
01117   nnew = (int*) calloc( nb_blocks_local, sizeof(int));
01118   int idpnew;
01119   int local_V_size_new;  
01120 
01121 
01122   int status_params = get_overlapping_blocks_params( nb_blocks_local, idv, n, local_V_size, nrow, idp, &idpnew, &local_V_size_new, nnew, &idv0, &idvn);
01123 
01124   if (VERBOSE==1)
01125     printf("status_params=%d\n", status_params);
01126 
01127   if( status_params == 0) {
01128   free( nnew);
01129     return(0); // no work to be done
01130   }
01131 
01132 
01133   if (VERBOSE==1) {  //print on screen news parameters definition if VERBOSE
01134     fprintf(file, "news parameters definition:\n");
01135     fprintf(file, "[%d] idp=%d ; idpnew=%d", idp, idpnew);
01136     fprintf(file, "[%d] local_V_size=%d ; local_V_size_new=%d\n", rank, local_V_size, local_V_size_new);
01137     for(i=0;i<nb_blocks_local;i++)
01138       fprintf(file, "[%d] n[%d]=%d ; nnew[%d]=%d\n", rank, i, n[i], i, nnew[i]);
01139   }
01140 
01141 
01142   int vShft=idpnew-idp;   //new first element of relevance in V
01143 
01144   //Define the column indices:
01145   //index of the first and the last column of V for the current process
01146   int idvm0 = idpnew/nrow;
01147   int idvmn = (idpnew+local_V_size_new-1)/nrow;
01148   //number of columns of V for the current process
01149   int ncol_rank = idvmn-idvm0+1;
01150   //number of blocks for the current process with possibly repetitions 
01151   int nb_blocs_rank;
01152 
01153   if(ncol_rank == 1) // Empty process not allowed 
01154     nb_blocs_rank = idvn - idv0 + 1;
01155   else
01156     nb_blocs_rank = (ncol_rank-2)*nb_blocks_local + (nb_blocks_local-idv0) + (idvn+1);  //in this case nb_blocks_local = nblocs_all
01157 
01158   if (VERBOSE==1) {  //print on screen 
01159   fprintf(file, "[%d] nb_blocs_rank=%d, nb_blocks_local=%d\n", rank, nb_blocs_rank, nb_blocks_local);
01160   }
01161 
01162   //Define the indices for the first and the last element in each blocks
01163   int idvp0 = idpnew%nrow-idv[idv0];  //index of the first element of the process in the first block
01164   int idvpn;  //reverse index of the last element of the process in the last block
01165               //It's the number of remaining elements needed to fully complete the last block
01166   idvpn = idv[idvn]+nnew[idvn]-1 - (idpnew+local_V_size_new-1)%nrow ;
01167 
01168 
01169   //Define the offsets for the first and last blocks of the process for V1
01170   int v1_size = local_V_size_new;  //length of the new vector V1
01171   int offset0=0;
01172   int offsetn=0;
01173 
01174   if(idvp0 != 0) {
01175     offset0 = min( idvp0, lambda[idv0]);
01176     v1_size += offset0; }
01177 
01178   if(idvpn != 0) {
01179     offsetn = min(idvpn, lambda[idvn]);
01180     v1_size += offsetn; }
01181 //ps: no need v1_size anymore, I keep it just for information.
01182 
01183 
01184 //initialization for send and receive data
01185   //vector which receives V datas
01186   double *LambdaOut=(double *) calloc(lambda[idv0]+lambda[idvn], sizeof(double));
01187 //  double *LambdaOut=(double *) calloc(toSendLeft+toSendRight, sizeof(double));
01188 
01189   if (LambdaOut==0)
01190     return print_error_message (2, __FILE__, __LINE__);
01191 
01192   int toSendLeft = 0;
01193 
01194   if( offset0 != 0) {
01195           toSendLeft = min( idv[idv0]+nnew[idv0]-idpnew%nrow, lambda[idv0]);
01196       for(i=0;i<toSendLeft;i++)
01197          LambdaOut[i]=(*V)[i];
01198   }
01199 
01200   int toSendRight = 0;
01201 
01202   if( offsetn != 0) {
01203     toSendRight = min( (idpnew+local_V_size)%nrow-idv[idvn], lambda[idvn]);
01204     for(i=0;i<toSendRight;i++)
01205       LambdaOut[i+lambda[idv0]]=(*V)[i+local_V_size-toSendRight];
01206  
01207   }
01208 
01209   if(rank==0 || offset0==0)
01210     left = MPI_PROC_NULL;
01211   if(rank==size-1 || offsetn==0)
01212     right = MPI_PROC_NULL;
01213 
01214   double *LambdaIn=(double *) calloc(lambda[idv0]+lambda[idvn], sizeof(double));
01215 //  double *LambdaIn=(double *) calloc(offset0+offsetn, sizeof(double));
01216   if (LambdaIn==0)
01217          return print_error_message (2, __FILE__, __LINE__);
01218 
01219 //send and receive data
01220   MPI_Sendrecv( LambdaOut, toSendLeft, MPI_DOUBLE, left,  MPI_USER_TAG, (LambdaIn+lambda[idv0]), offsetn, MPI_DOUBLE, right, MPI_USER_TAG, comm, &status);
01221   MPI_Sendrecv( (LambdaOut+lambda[idv0]), toSendRight, MPI_DOUBLE, right,  MPI_USER_TAG, LambdaIn, offset0, MPI_DOUBLE, left, MPI_USER_TAG, comm, &status);
01222 
01223   free(LambdaOut);
01224 
01225 
01226 //size of the first and the last bloc for the current process
01227   int v0rank_size, vnrank_size;
01228   if (nb_blocs_rank == 1) {  //only one block 
01229     v0rank_size = ((idpnew+local_V_size_new-1)%nrow +1) - idpnew%nrow + offset0 + offsetn;
01230     vnrank_size = 0; //just for convenience - no really need it
01231   }
01232   else { //more than one block
01233     v0rank_size = idv[idv0] + nnew[idv0] - idpnew%nrow + offset0;
01234     vnrank_size = ((idpnew+local_V_size_new-1)%nrow +1) - idv[idvn] + offsetn;
01235   }
01236 
01237 
01238 //---------------------------------------
01239 //initialization for the blocks loop
01240 
01241   int idv1=0;     //index of *V1
01242 
01243   int mid;  //local number of column for the current block
01244   //index of the first element of the process inside the first block
01245   int offset_id0;
01246   offset_id0 = idvp0;
01247 
01248 //fftw variables
01249   fftw_complex *V_fft, *T_fft;
01250   double *V_rfft;
01251   fftw_plan plan_f, plan_b;
01252 //init local block vector
01253   double *V1block;
01254   int lambdaShft;
01255 
01256 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
01257 //loop on the blocks inside the process
01258   int nfft, blocksize;
01259   int iblock;  //index for the loop on the blocks
01260 //  int loopindex;
01261   int id; //indice of the current block
01262 
01263   int vblock_size;
01264   int id0block;
01265 
01266   for(iblock=idv0;iblock<idv0+nb_blocs_rank;iblock++) {
01267     id = iblock%nb_blocks_local;  //index of current block
01268 
01269 
01270   if(nnew[id]>0) { //the block is ok
01271 
01272   mid=1;  //always one!
01273 
01274   for( lambdaShft=k=0; k<id; k++)
01275     lambdaShft += lambda[k];
01276 
01277 
01278 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
01279 //first case : First block of the process
01280   if(iblock==idv0) {
01281   if(VERBOSE)
01282     fprintf(file, "[%d] First block...\n", rank);
01283 
01284   vblock_size=v0rank_size;
01285   id0block=(offset_id0-offset0);
01286 
01287   V1block = (double *) calloc(vblock_size, sizeof(double));
01288 
01289 
01290 #pragma omp parallel for                        
01291   for (j=0;j<offset0;j++)
01292     V1block[j] = LambdaIn[j];
01293 
01294 //if (nb_blocs_rank == 1) currentsize_middlepart=vblock_size-offset0-offsetn = local_V_size_new
01295 //else currentsize_middlepart=vblock_size-offset0
01296   int currentsize_middlepart=min(vblock_size-offset0, local_V_size_new);
01297 #pragma omp parallel for   
01298   for (j=0;j<currentsize_middlepart;j++)
01299     V1block[offset0+j] = (*V)[j+vShft];
01300 
01301 if (nb_blocs_rank == 1) {
01302 #pragma omp parallel for   
01303   for (j=0;j<offsetn;j++)
01304     V1block[vblock_size-offsetn+j] = LambdaIn[j+lambda[idv0]];
01305 }
01306 
01307   //init Toeplitz arrays
01308   tpltz_init(vblock_size, lambda[id], &nfft, &blocksize, &T_fft, (T+lambdaShft), &V_fft, &V_rfft, &plan_f, &plan_b);
01309   //Toeplitz computation
01310 
01311 
01312   if(VERBOSE)
01313     fprintf(file, "[%d] Before middle-level call : nfft = %d, blocksize = %d\n", rank, nfft, blocksize);
01314   stmm(&V1block, nnew[id], mid, id0block, vblock_size, T_fft, lambda[id], V_fft, V_rfft, plan_f, plan_b, blocksize, nfft);
01315   tpltz_cleanup(&T_fft, &V_fft, &V_rfft, &plan_f, &plan_b);
01316 
01317 
01318   int currentsize=min(vblock_size-offset0, local_V_size_new);
01319 #pragma omp parallel for
01320   for (j=0;j<currentsize;j++)
01321     (*V)[vShft+j] = V1block[offset0+j];
01322 
01323 
01324   free(V1block);
01325 
01326   }//end (First case)
01327 
01328 
01329 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
01330 //Generic case : Generic block of the process
01331   else if(iblock!=idv0 && iblock!=idv0+nb_blocs_rank-1) {
01332   if(VERBOSE)
01333     fprintf(file, "[%d] generic block...\n");
01334 
01335   vblock_size=nnew[id];
01336   id0block=0;
01337 
01338   V1block = (double *) calloc(vblock_size, sizeof(double));
01339 
01340   idv1 = idv[id]-idp%nrow - vShft + offset0 +nrow*( (iblock/nb_blocks_local) );
01341 
01342 #pragma omp parallel for                        
01343   for (j=0;j<vblock_size;j++)
01344     V1block[j] = (*V)[j+idv1-offset0+vShft];
01345 
01346   //init Toeplitz arrays
01347   tpltz_init(nnew[id], lambda[id], &nfft, &blocksize, &T_fft, (T+lambdaShft), &V_fft, &V_rfft, &plan_f, &plan_b);
01348   //Toeplitz computation
01349   if(VERBOSE)
01350     fprintf(file, "[%d] Before middle-level call : nfft = %d, blocksize = %d\n", rank, nfft, blocksize);
01351   stmm(&V1block, nnew[id], mid, id0block, vblock_size, T_fft, lambda[id], V_fft, V_rfft, plan_f, plan_b, blocksize, nfft);
01352   tpltz_cleanup(&T_fft, &V_fft, &V_rfft, &plan_f, &plan_b);
01353 
01354 #pragma omp parallel for                        
01355   for (j=0;j<vblock_size;j++)
01356     (*V)[j+idv[id] - idp%nrow + nrow*( (iblock/nb_blocks_local) )] = V1block[j];
01357 
01358 
01359   free(V1block);
01360 
01361   }  //end (Generic case)
01362 
01363 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
01364 // Last case : Last block of the process
01365   else if(iblock==idv0+nb_blocs_rank-1 && iblock!= idv0) {
01366   if(VERBOSE)  
01367     fprintf(file, "[%d] last block...\n");
01368 
01369   vblock_size=vnrank_size;
01370   id0block=0;
01371 
01372   V1block = (double *) calloc(vblock_size, sizeof(double));
01373 
01374   idv1 = idv[id] - idp%nrow - vShft + offset0  + nrow*( (iblock/nb_blocks_local) );
01375 
01376 #pragma omp parallel for   
01377   for (j=0;j<vblock_size-offsetn;j++)
01378     V1block[j] = (*V)[j+idv1-offset0+vShft];
01379 #pragma omp parallel for   
01380   for (j=0;j<offsetn;j++)
01381     V1block[vblock_size-offsetn+j] = LambdaIn[j+lambda[idv0]];
01382 
01383 
01384   //init Toeplitz arrays
01385   tpltz_init(vblock_size, lambda[id], &nfft, &blocksize, &T_fft, (T+lambdaShft), &V_fft, &V_rfft, &plan_f, &plan_b);
01386   //Toeplitz computation
01387   if(VERBOSE)
01388     printf("[%d] Before middle-level call : nfft = %d, blocksize = %d\n", rank, nfft, blocksize);
01389   stmm(&V1block, nnew[id], mid, id0block, vblock_size, T_fft, lambda[id], V_fft, V_rfft, plan_f, plan_b, blocksize, nfft);
01390   tpltz_cleanup(&T_fft, &V_fft, &V_rfft, &plan_f, &plan_b);
01391 
01392 
01393 #pragma omp parallel for                                                 
01394   for (j=0;j<vnrank_size-offsetn;j++)
01395     (*V)[idv[id]-idp%nrow+j+nrow*( (iblock/nb_blocks_local) )] = V1block[j];
01396 
01397 
01398   free(V1block);
01399 
01400   }//end of last block
01401   else { break; }//error  //we can put the generic case here instead of between first and last cases
01402 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
01403   }//end of if(nnew[id]>0)
01404   }//end of loop over the blocks
01405 
01406 
01407   return 0;
01408 }
01409 
01410 
01411 //====================================================================
01412 
01414 
01422 // loop over the blocks to find which are useful for the local data
01423 
01424 int get_overlapping_blocks_params(int nbloc, int *idv, int *n, int local_V_size, int nrow, int idp, int *idpnew, int *local_V_size_new, int *nnew, int *ifirstBlock, int *ilastBlock)
01425 {
01426 
01427   int ib, nblockOK=0, firstrow, lastrow, nfullcol_data;
01428   int idptmp;
01429 
01430   nfullcol_data = (local_V_size-(nrow-idp%nrow)-(idp+local_V_size)%nrow)/nrow;  //check how many full columns input data have
01431 
01432 //  int rank = PRINT_RANK;
01433 //  FILE *file;
01434 //  file = stdout;
01435 
01436 
01437 
01438   if( nfullcol_data > 0) {
01439 
01440   for( ib=0; ib<nbloc; ib++) {
01441     if( idv[ib] < nrow) {
01442       nnew[ib] = min( n[ib], nrow-idv[ib]);  //block used for the product
01443       nblockOK++;
01444     }
01445   }   
01446 
01447   }
01448   else {  //no full column observed 
01449 
01450     firstrow = idp%nrow;
01451     lastrow = (idp+local_V_size-1)%nrow;
01452 
01453     if( firstrow < lastrow) {  //just one column partially observed   
01454 
01455     for( ib=0; ib<nbloc; ib++) {
01456     if( (idv[ib]+n[ib] > firstrow) && (idv[ib] < lastrow+1)) {
01457       nnew[ib] = min( n[ib], nrow-idv[ib]);  //block used for the product
01458       nblockOK++;
01459     }
01460     }
01461 
01462     }
01463     else {  //two columns partially observed   
01464 
01465       for( ib=0; ib<nbloc; ib++) {
01466         if( (idv[ib]+n[ib] > firstrow) && (idv[ib] < nrow)) {  //intersects first partial column
01467           nnew[ib] = min( n[ib], nrow-idv[ib]);  //block used for the product
01468           nblockOK++;
01469         }
01470 
01471         if( (idv[ib] < lastrow+1) && (idv[ib]+n[ib] > 0)) {  //intersects second partial column
01472           nnew[ib] = min( n[ib], nrow-idv[ib]);  //block used for the product
01473           nblockOK++;  //may overcount but we do not care
01474         }  //could use else insteed!
01475       }
01476      }
01477   }
01478 
01479   if (VERBOSE)
01480     printf("nblockOK=%d\n", nblockOK);
01481 
01482 
01483   if( nblockOK == 0) return(0);  //no blocks overlapping with the data 
01484 
01485   //find the first and last relevant blocks for the begining and end of the local data  V
01486 
01487  //first block
01488   idptmp = idp;
01489 
01490   for( *ifirstBlock = -1; *ifirstBlock == -1;     ) {
01491     for(ib=0;ib<nbloc;ib++) {
01492       if(nnew[ib] != 0 && idptmp%nrow < idv[ib]+nnew[ib]) break;
01493     }
01494 
01495     if (ib<nbloc && idv[ib] <= idptmp%nrow) {
01496       *ifirstBlock = ib;
01497       *idpnew = idptmp;
01498     }
01499     else if (ib<nbloc && idv[ib] > idptmp%nrow) {
01500       *ifirstBlock = ib;
01501       int extrabegining = idv[ib]-idp%nrow;
01502 //      *idpnew = idp+extrabegining;//idv[ib];
01503       int idvfirstcolumn = idptmp/nrow;
01504       *idpnew = idv[ib]+idvfirstcolumn*nrow;
01505     }
01506     else { //ib=nb_blocs
01507       idptmp += (int) (nrow-idptmp%nrow);
01508 //          idtmp = (int) ceil((1.0*idpnew)/(1.0*nrow))*nrow; // go to the first element of the next column
01509     }}
01510 
01511 
01512  //last block
01513   idptmp = idp+local_V_size-1;
01514 
01515   for( *ilastBlock = -1; *ilastBlock == -1; ) {
01516     for(ib=nbloc-1;ib>=0;ib--) {
01517       if(nnew[ib] != 0 && idv[ib] <= idptmp%nrow) break;
01518     }
01519 
01520 
01521     if (ib>=0 && idptmp%nrow < idv[ib]+nnew[ib]) {
01522       *ilastBlock = ib;
01523       *local_V_size_new = local_V_size-(*idpnew)+idp;
01524     }
01525     else if (ib>=0 && idv[ib]+nnew[ib] <= idptmp%nrow) {
01526       *ilastBlock = ib;
01527       int extraend = (local_V_size-1+idp)%nrow+1-(idv[ib]+nnew[ib]);
01528       //*local_V_size_new = (local_V_size+idp)%nrow-(idv[*ilastBlock]+nnew[*ilastBlock]);
01529      //idv[*ilastBlock]+nnew[*ilastBlock]-(*idpnew);
01530       *local_V_size_new = local_V_size-(*idpnew)+idp-extraend;
01531 
01532       int idvlastcolumn = idptmp/nrow;
01533       *local_V_size_new = idv[ib]+nnew[ib]+idvlastcolumn*nrow - (*idpnew);
01534 
01535     }
01536     else {
01537       idptmp = (int) idptmp - (idptmp%nrow)-1;
01538 //        idtmp = (int) floor( (1.0*idpnew)/(1.0*nrow))*nrow-1; // go to the last element of the previous column
01539     }}
01540 
01541 
01542     return(1);
01543 }
01544 
01545 
01546 //====================================================================
01547 
01549 
01576 int mpi_gstbmm(double **V, int *n, int m, int nrow, double *T, int nb_blocks_local, int nb_blocks_all, int *lambda, int *idv, int id0p, int local_V_size, int *id0gap, int *lgap, int ngap, MPI_Comm comm)
01577 {
01578 
01579   //MPI parameters
01580   int rank;  //process rank
01581   int size;  //process number
01582 
01583   MPI_Status status;
01584   MPI_Comm_rank(comm, &rank);
01585   MPI_Comm_size(comm, &size);
01586 
01587   int i,j,k;   //some indexes
01588 
01589   int flag_skip_build_gappy_blocks=0;
01590 
01591   FILE *file;
01592   file = stdout;
01593 
01594 //put zeros at the gaps location
01595   reset_gaps( V, id0p, local_V_size, m, nrow, id0gap, lgap, ngap);
01596 
01597 //allocation for the gappy structure of the diagonal block Toeplitz matrix
01598   int nb_blocks_gappy;
01599   double *Tgappy;
01600   int *idvgappy;
01601   int *ngappy;
01602   int *lambdagappy;
01603 
01604   int nb_blockgappy_max; 
01605   int Tgappysize_max;
01606 
01607 //some computation usefull to determine the max size possible for the gappy variables
01608   int Tsize=0;
01609   int lambdamax=0;
01610 
01611   if (VERBOSE) 
01612     fprintf(file, "[%d] flag_skip_build_gappy_blocks=%d\n", rank, flag_skip_build_gappy_blocks);
01613 
01614   if (flag_skip_build_gappy_blocks==1) {  //no build gappy blocks strategy, just put zeros at gaps location
01615 
01616   //compute the product using only the input Toeplitz blocks structure with zeros at the gaps location
01617   mpi_stbmm(V, n, m, nrow, T, nb_blocks_local, nb_blocks_all, lambda, idv, id0p, local_V_size, MPI_COMM_WORLD);
01618 
01619   }
01620   else { //build gappy blocks strategy
01621 
01622   for(Tsize=i=0;i<nb_blocks_local;i++)
01623     Tsize += lambda[i];
01624 
01625   for(i=0;i<nb_blocks_local;i++) {
01626     if (lambda[i]>lambdamax)
01627       lambdamax = lambda[i];
01628   }
01629 
01630 //compute max size possible for the gappy variables
01631   nb_blockgappy_max = nb_blocks_local+ngap;
01632   Tgappysize_max = Tsize + lambdamax*ngap;
01633 
01634 //allocation of the gappy variables with max size possible
01635   Tgappy = (double *) calloc(Tgappysize_max,sizeof(double));
01636   ngappy = (int *) calloc(nb_blockgappy_max, sizeof(int));
01637   lambdagappy = (int *) calloc(nb_blockgappy_max, sizeof(int));
01638   idvgappy = (int *) calloc(nb_blockgappy_max, sizeof(int));
01639 
01640 
01641 //build gappy Toeplitz block structure considering significant gaps locations, meaning we skip
01642 //the gaps lower than the minimum correlation distance. You can also use the flag_param_distmin_fixed
01643 //parameter which allows you to skip the gap lower than these value. Indeed, sometimes it's
01644 //better to just put somes zeros than to consider two separates blocks.
01645 //ps: This criteria could be dependant of the local lambda in futur impovements.
01646   int flag_param_distmin_fixed=0;
01647   build_gappy_blocks(n, m, nrow, T, nb_blocks_local, nb_blocks_all, lambda, idv, id0gap, lgap, ngap, &nb_blocks_gappy, Tgappy, idvgappy, ngappy, lambdagappy, flag_param_distmin_fixed);
01648 
01649   if (VERBOSE) {
01650     fprintf(file, "[%d] nb_blocks_gappy=%d\n", rank, nb_blocks_gappy);
01651       fprintf(file, "[%d] idvgappy[%d]=%d \n", rank, i, idvgappy[i]);
01652     for(i=0;i<nb_blocks_gappy;i++)
01653       fprintf(file, "[%d] ngappy[%d]=%d \n", rank, i, ngappy[i]);
01654   }
01655 //ps: we could reallocate the gappy variables to their real size. Not sure it's worth it.
01656 
01657 //compute the product using the freshly created gappy Toeplitz blocks structure
01658   mpi_stbmm(V, ngappy, m, nrow, Tgappy, nb_blocks_gappy, nb_blocks_gappy, lambdagappy, idvgappy, id0p, local_V_size, MPI_COMM_WORLD);
01659 
01660   } //end flag_skip_build_gappy_blocks==1
01661 
01662 
01663 //put zeros on V for the gaps location again. Indeed, some gaps are just replaced by zeros
01664 //in input, it's created some fakes results we need to clear after the computation.
01665   reset_gaps( V, id0p, local_V_size, m, nrow, id0gap, lgap, ngap);
01666 
01667 
01668   return 0;
01669 }
01670 
01671 
01672 //====================================================================
01674 
01679 //put zeros on V for the gaps location
01680 int reset_gaps(double **V, int id0,int local_V_size, int m, int nrow, int *id0gap, int *lgap, int ngap)
01681 {
01682   int i,j,k;
01683 
01684   for (j=0 ; j<m; j++) {
01685   for (k=0 ; k<ngap; k++)
01686    for (i=0 ; i<lgap[k]; i++)
01687 
01688    if (id0gap[k]+i+j*nrow>=id0 && id0gap[k]+i+j*nrow-id0 <id0+local_V_size)
01689      (*V)[id0gap[k]+i+j*nrow-id0] = 0.;
01690 
01691   }
01692 
01693 
01694   return 0;
01695 }
01696 
01697 //====================================================================
01699 
01707 int build_gappy_blocks(int *n, int m, int nrow, double *T, int nb_blocks_local, int nb_blocks_all, int *lambda, int *idv, int *id0gap, int *lgap, int ngap, int *nb_blocks_gappy_final, double *Tgappy, int *idvgappy, int *ngappy, int *lambdagappy, int flag_param_distmin_fixed)
01708 {
01709 
01710   int i,j,k;
01711   int id,ib;
01712   int idtmp;
01713   int igapfirstblock, igaplastblock;
01714 
01715   int param_distmin=0;
01716   if (flag_param_distmin_fixed!=0)
01717     param_distmin = flag_param_distmin_fixed;
01718 
01719   int lambdaShft;
01720 
01721   int igaplastblock_prev=-1;
01722   int lambdaShftgappy=0;
01723   int offset_id = 0;
01724 
01725   int flag_igapfirstinside, flag_igaplastinside;
01726   int nbloc = nb_blocks_local;
01727   int nblocks_gappy=0;
01728 
01729   int idvtmp_firstblock;
01730 
01731 
01732   int nb_blockgappy_max = nb_blocks_local+ngap;
01733   int Tgappysize_max; 
01734 
01735   int ngappy_tmp;
01736   int lgap_tmp;
01737 
01738   int flag_gapok=0;
01739 
01740   int distcorr_min;
01741 
01742   int Tsize=0;
01743   int lambdamax=0;
01744   for(i=0;i<nb_blocks_local;i++) {
01745     Tsize += lambda[i];
01746 
01747     if (lambda[i]>lambdamax)  
01748       lambdamax = lambda[i];
01749   }
01750 
01751   Tgappysize_max = Tsize + lambdamax*ngap; 
01752 
01753   int Tgappysize=0;
01754   int k_prev=-1;
01755 
01756   for (k=0;k<ngap;k++) {
01757 
01758   //find block for the gap begining 
01759   for( igapfirstblock = -1; igapfirstblock == -1; ) {
01760     idtmp = id0gap[k];
01761 
01762     for(ib=0;ib<nbloc;ib++) {
01763       if(n[ib] != 0 && idtmp%nrow < idv[ib]+n[ib]) break;  }
01764 
01765     if (ib<nbloc && idv[ib] <= idtmp) {
01766       igapfirstblock = ib;  //the block contained the id0gap
01767       flag_igapfirstinside = 1;
01768     }
01769     else if (ib<nbloc && idv[ib] > idtmp) {
01770       igapfirstblock = ib;  //first block after the id0gap
01771       flag_igapfirstinside = 0;
01772     }
01773     else { //ib=nbloc 
01774       igapfirstblock = -2;  //no block defined
01775       flag_igapfirstinside = 0; 
01776     }}
01777 
01778   //find block for the end of the gap - reverse way
01779   for( igaplastblock = -1; igaplastblock == -1; ) {
01780     idtmp = id0gap[k]+lgap[k]-1;
01781 
01782     for(ib=nbloc-1;ib>=0;ib--) {
01783       if(n[ib] != 0 && idv[ib] <= idtmp) break;    }
01784 
01785     if (ib>=0 && idtmp < idv[ib]+n[ib]) {
01786       igaplastblock = ib;
01787       flag_igaplastinside = 1;
01788     }
01789     else if (ib>=0 && idv[ib]+n[ib] <= idtmp) {
01790       igaplastblock = ib;
01791       flag_igaplastinside = 0;
01792     }
01793     else { //ib=-1
01794       igaplastblock = -2;  //no block defined.
01795       flag_igaplastinside = 0; 
01796     }}
01797 
01798 
01799     if (igapfirstblock==igaplastblock)
01800       distcorr_min = lambda[igapfirstblock];
01801     else
01802       distcorr_min = 0;
01803 
01804 
01805 //igapfirstblock != -2 && igaplastblock != -2 not really need but it's a shortcut
01806   if (lgap[k]> max(distcorr_min, param_distmin) && igapfirstblock != -2 && igaplastblock != -2) {
01807 
01808   idvtmp_firstblock = max( idv[igapfirstblock] , id0gap[k_prev]+lgap[k_prev]);
01809 
01810 //test if the gap is ok for block reduce/split 
01811   if (igapfirstblock!=igaplastblock) {
01812 
01813     flag_gapok = 1;  //reduce the gap in each block. no pb if we add max() inside the ifs.
01814   }
01815   else if (id0gap[k]-idvtmp_firstblock>=lambda[igapfirstblock] && idv[igaplastblock]+n[igaplastblock]-(id0gap[k]+lgap[k])>=lambda[igaplastblock]) {
01816 
01817     flag_gapok = 1;
01818   }
01819   else if (igapfirstblock==igaplastblock){
01820 
01821   int ngappyleft_tmp = id0gap[k]-idvtmp_firstblock;
01822   int leftadd = max(0,lambda[igapfirstblock] - ngappyleft_tmp); 
01823   int ngappyright_tmp = idv[igaplastblock]+n[igaplastblock]-(id0gap[k]+lgap[k]);
01824   int rightadd = max(0,lambda[igapfirstblock] - ngappyright_tmp);
01825   int restgap = lgap[k] - (leftadd+rightadd);  
01826 
01827 //  flag_gapok = (restgap>=0); 
01828   flag_gapok = (restgap >= max(0, param_distmin));
01829 
01830   }
01831   else {
01832   flag_gapok = 0;
01833   }
01834 
01835 
01836  //create gappy blocks if criteria is fullfill
01837   if (flag_gapok==1) {
01838 
01839   //copy the begining blocks
01840     for (id=igaplastblock_prev+1;id<igapfirstblock;id++) {
01841 
01842       for(lambdaShft=j=0; j<id; j++)  lambdaShft += lambda[j];
01843 
01844       for (i=0;i<lambda[id];i++)
01845         Tgappy[i+lambdaShftgappy] =  T[i+lambdaShft];
01846 
01847       lambdagappy[nblocks_gappy] = lambda[id];
01848       ngappy[nblocks_gappy] = n[id];
01849       idvgappy[nblocks_gappy] = idv[id];
01850       nblocks_gappy = nblocks_gappy + 1;
01851       lambdaShftgappy = lambdaShftgappy + lambda[id];
01852 
01853     }
01854 
01855   //clear last blockgappy if same block again - outside the "if" for border cases with n[]==0
01856     if (igaplastblock_prev==igapfirstblock && k!= 0) {
01857       lambdaShftgappy=lambdaShftgappy-lambda[igapfirstblock];
01858       nblocks_gappy = nblocks_gappy - 1;
01859 //      idvtmp_firstblock = id0gap[k-1]+lgap[k-1]; //always exist because igaplastblock_prev!=-1
01860                                       //so not first turn - it's replace "idv[igapfirstblock]"
01861     }
01862 
01863   //reduce first block if defined
01864     if (flag_igapfirstinside==1 && (id0gap[k]-idvtmp_firstblock)>0) {  //check if inside and not on the border - meaning n[] not zero
01865 
01866       for(lambdaShft=j=0; j<igapfirstblock; j++)  lambdaShft += lambda[j];
01867 
01868       for (i=0;i<lambda[id];i++)   //copy the lambda's block
01869         Tgappy[i+lambdaShftgappy] =  T[i+lambdaShft];
01870 
01871       lambdagappy[nblocks_gappy] = lambda[id];
01872       ngappy[nblocks_gappy] = id0gap[k]-idvtmp_firstblock;
01873       ngappy[nblocks_gappy] = max( ngappy[nblocks_gappy], lambda[igapfirstblock]);
01874 
01875       idvgappy[nblocks_gappy] = idvtmp_firstblock;
01876       nblocks_gappy = nblocks_gappy + 1;
01877       lambdaShftgappy = lambdaShftgappy + lambda[igapfirstblock];
01878 
01879     }
01880 
01881   //reduce last block if defined
01882     if (flag_igaplastinside==1  && (idv[igaplastblock]+n[igaplastblock]-(id0gap[k]+lgap[k]))>0 ) {  //check if inside and not on the border - meaning n[] not zero
01883 
01884       for(lambdaShft=j=0; j<igaplastblock; j++)  lambdaShft += lambda[j];
01885 
01886       for (i=0;i<lambda[id];i++)   //duplicate the lambda's block
01887         Tgappy[i+lambdaShftgappy] =  T[i+lambdaShft];
01888 
01889       lambdagappy[nblocks_gappy] = lambda[id];
01890       ngappy[nblocks_gappy] = idv[igaplastblock]+n[igaplastblock]-(id0gap[k]+lgap[k]);
01891       int rightadd0 = max(0,lambda[igapfirstblock] - ngappy[nblocks_gappy]);
01892 
01893       ngappy[nblocks_gappy] = max( ngappy[nblocks_gappy] , lambda[igaplastblock]);
01894 
01895  //     idvgappy[nblocks_gappy] = id0gap[k]+lgap[k];
01896       idvgappy[nblocks_gappy] = id0gap[k]+lgap[k]-rightadd0;
01897 
01898       nblocks_gappy = nblocks_gappy + 1;
01899       lambdaShftgappy = lambdaShftgappy + lambda[igaplastblock];
01900 
01901     }
01902 
01903   igaplastblock_prev = igaplastblock;
01904   k_prev = k;
01905  
01906   }//end if (flag_gapok)
01907   }//end if (lgap[k]>param_distmin)
01908   }//end gap loop
01909 
01910 
01911  //now continu to copy the rest of the block left
01912   for (id=igaplastblock_prev+1;id<nb_blocks_local;id++) {
01913 
01914     for(lambdaShft=j=0; j<id; j++)  lambdaShft += lambda[j];
01915 
01916     for (i=0;i<lambda[id];i++)
01917       Tgappy[i+lambdaShftgappy] =  T[i+lambdaShft];
01918 
01919     lambdagappy[nblocks_gappy] = lambda[id];
01920     ngappy[nblocks_gappy] = n[id];
01921     idvgappy[nblocks_gappy] = idv[id];
01922     nblocks_gappy = nblocks_gappy + 1;
01923     lambdaShftgappy = lambdaShftgappy + lambda[id];
01924 
01925   }
01926 
01927   *nb_blocks_gappy_final = nblocks_gappy;  //just for output
01928 
01929 
01930   return 0;
01931 }
01932 
01933