def split_seq(self, sequence): ''' Split the sequence according to rank and processor number. ''' starts = [i for i in range(0, len(sequence), len(sequence)//self.size)] ends = starts[1: ] + [len(sequence)] start, end = list(zip(starts, ends))[self.rank]
return sequence[start: end]
def split_size(self, size): ''' Split a size number(int) to sub-size number. ''' if size < self.size: warn_msg = ('Splitting size({}) is smaller than process ' + 'number({}), more processor would be ' + 'superflous').format(size, self.size) self._logger.warning(warn_msg) splited_sizes = [1]*size + [0]*(self.size - size) elif size % self.size != 0: residual = size % self.size splited_sizes = [size // self.size]*self.size for i in range(residual): splited_sizes[i] += 1 else: splited_sizes = [size // self.size]*self.size
return splited_sizes[self.rank]
def merge_seq(self, seq): ''' Gather data in sub-process to root process. ''' if self.size == 1: return seq
mpi_comm = MPI.COMM_WORLD merged_seq= mpi_comm.allgather(seq) return list(chain(*merged_seq)) |