defsplit_seq(self, sequence): ''' Split the sequence according to rank and processor number. ''' starts = [i for i in range(0, len(sequence), len(sequence)//self.size)] ends = starts[1: ] + [len(sequence)] start, end = list(zip(starts, ends))[self.rank]
return sequence[start: end]
defsplit_size(self, size): ''' Split a size number(int) to sub-size number. ''' if size < self.size: warn_msg = ('Splitting size({}) is smaller than process ' + 'number({}), more processor would be ' + 'superflous').format(size, self.size) self._logger.warning(warn_msg) splited_sizes = [1]*size + [0]*(self.size - size) elif size % self.size != 0: residual = size % self.size splited_sizes = [size // self.size]*self.size for i in range(residual): splited_sizes[i] += 1 else: splited_sizes = [size // self.size]*self.size
return splited_sizes[self.rank]
defmerge_seq(self, seq): ''' Gather data in sub-process to root process. ''' if self.size == 1: return seq
defmaster_only(func): ''' Decorator to limit a function to be called only in master process in MPI env. ''' @wraps(func) def_call_in_master_proc(*args, **kwargs): if mpi.is_master: return func(*args, **kwargs)
# Enter evolution iteration. for g in range(ng): # Scatter jobs to all processes. local_indvs = [] local_size = mpi.split_size(self.population.size // 2)
# Fill the new population. for _ in range(local_size): # Select father and mother. parents = self.selection.select(self.population, fitness=self.fitness) # Crossover. children = self.crossover.cross(*parents) # Mutation. children = [self.mutation.mutate(child) for child in children] # Collect children. local_indvs.extend(children)
# Gather individuals from all processes. indvs = mpi.merge_seq(local_indvs) # The next generation. self.population.individuals = indvs