import bisect
import itertools as it
import logging
import time

import numpy as np
import rmsd
from scipy.spatial.distance import pdist

from pysisyphus.calculators.XTB import XTB
from pysisyphus.helpers import check_for_end_sign
from pysisyphus.helpers_pure import highlight_text
from pysisyphus.intcoords.setup import get_pair_covalent_radii
from pysisyphus.optimizers.RFOptimizer import RFOptimizer
from pysisyphus.stocastic.align import matched_rmsd
from pysisyphus.xyzloader import make_trj_str_from_geoms

[docs] class Pipeline: def __init__(self, geom, calc_getter=None, seed=None, max_cycles=5, cycle_size=15, rmsd_thresh=.1, energy_thresh=1e-3, energy_range=.125, compare_num=25, break_after=2, calc_kwargs=None): self.initial_geom = geom self.calc_getter = calc_getter self.max_cycles = max_cycles self.cycle_size = cycle_size if seed is None: self.seed = int(time.time()) else: self.seed = seed self.rmsd_thresh = rmsd_thresh self.energy_thresh = energy_thresh self.energy_range = energy_range self.compare_num = compare_num self.break_after = break_after self.calc_kwargs = { "charge": 0, "mult": 1, } if calc_kwargs is not None: self.calc_kwargs.update(calc_kwargs) np.random.seed(self.seed) self.logger = logging.getLogger("stocastic") self.is_analytical2d = len(geom.atoms) == 1 self.log(f"Seed: {self.seed}") self.coords_size = self.initial_geom.coords.size self.calc_counter = 0 self.cur_cycle = 0 self.cur_micro_cycle = 0 # Indicates if the next cycle is the last one self.break_in = self.break_after self.new_geoms = [] self.new_energies = [] self.initial_coords3d = self.initial_geom.coords3d self.atoms = self.initial_geom.atoms def __str__(self): return f"{self.__class__.__name__}(seed={self.seed})"
[docs] def log(self, message): """Write a log message. Wraps the logger variable. Parameters ---------- message : str Message to be logged. """ self.logger.debug(f"{message}")
[docs] def get_valid_index_set(self, to_intersect): return set(range(len(self.new_energies))) & set(to_intersect)
[docs] def atoms_are_too_close(self, geom, factor=.7): """Determine if atoms are too close.""" dist_mat = pdist(geom.coords3d) cov_rad_mat = get_pair_covalent_radii(geom.atoms) too_close = dist_mat < factor*cov_rad_mat return any(too_close)
[docs] def geom_is_close_in_energy(self, geom): energy = i = bisect.bisect_left(self.new_energies, energy) # Determine if there are neighbours that are close in energy # as we insert left/before the most similary energy the indices # of the (to be) neighbours in the current self.new_energies list # are i-1 and i. valid_inds = self.get_valid_index_set((i-1, i)) diffs = [abs(self.new_energies[j] - energy) for j in valid_inds] return len(diffs) > 0 and min(diffs) < self.energy_thresh
[docs] def geom_is_new(self, geom): """Determine if geometry is not already known.""" if len(self.new_geoms) == 0: self.log("Found first geometry!") return True i = bisect.bisect_left(self.new_energies, to_intersect = range(i-self.compare_num, i+self.compare_num) valid_inds = np.array(list(self.get_valid_index_set(to_intersect))) new_energies = np.array(self.new_energies)[valid_inds] # Restrict geometries for RMSD comparison to an energy range # around the energy of the geometry to check. in_range = np.abs(new_energies - < self.energy_range valid_inds = valid_inds[in_range] # If this evalutes to True the energy of the current in geometry is # quite different and we add the geometry. if valid_inds.size == 0: # print("Energy of geometry is very different from the remaining " # "ones. Adding geometry!") reason = "different energy." is_new = True # Otherwise check the RMSD values for the remaining geometries that # are close in energy. else: rmsds = [matched_rmsd(geom, self.new_geoms[i])[0] for i in valid_inds] rmsds = np.array(rmsds) is_new = rmsds.min() > self.rmsd_thresh reason = f"different RMSD (min(RMSD) = {rmsds.min():.3f})" if is_new: self.log(f"Found new geometry based on {reason}") return is_new
[docs] def geom_is_valid(self, geom): """Filter out geometries that are None, or were the atoms are too close or when they are already known.""" valid = (geom is not None and not self.geom_is_close_in_energy(geom) ) if not self.is_analytical2d: valid = (valid and not self.atoms_are_too_close(geom) and self.geom_is_new(geom) ) return valid
[docs] def get_input_geom(self, geom): raise Exception("Implement me!")
""" def run_new_geom(self, geom): input_geom = self.get_input_geom(geom) input_coords = input_geom.coords.copy() # Check if the geometry is similar to an already known starting # geometry. overlaps = # print("overlaps with already known starting coordinates") # print(overlaps) max_overlap_ind = overlaps.argmax() max_overlap = overlaps[max_overlap_ind] similar_fn = f"similar_{self.similar_ind:03d}.trj" # print(f"max overlap is {max_overlap:.1f}, {similar_fn}, index {max_overlap_ind}") max_coords = self.starting_coords[max_overlap_ind] self.similar_ind += 1 rmsds = list() for sc in self.starting_coords: sc3d = sc.reshape(-1, 3) rm = rmsd.kabsch_rmsd(new_coords.reshape(-1,3), sc3d) rmsds.append(rm) rmsds = np.array(rmsds) self.starting_coords.append(new_coords) """
[docs] def get_unique_geometries(self, geoms): geom_num = len(geoms) rmsds = np.full((geom_num, geom_num), np.inf) for i, j in it.combinations(range(geom_num), 2): coords1 = geoms[i].coords.reshape(-1, 3) coords2 = geoms[j].coords.reshape(-1, 3) rmsds[i, j] = rmsd.kabsch_rmsd(coords1, coords2) is_, js = np.where(rmsds < self.rmsd_thresh) similar_inds = np.unique(js) unique_geoms = [geoms[i] for i in range(geom_num) if i not in similar_inds] return unique_geoms
[docs] def run_geom_opt(self, geom): if self.calc_getter is not None: calc = self.calc_getter(calc_number=self.calc_counter) geom.set_calculator(calc) opt_kwargs = { "gdiis": False, "thresh": "gau_loose", "overachieve_factor": 2, "max_cycles": 75, } opt = RFOptimizer(geom, **opt_kwargs) opt_geom = geom if opt.is_converged else None else: calc = XTB(calc_number=self.calc_counter, **self.calc_kwargs) opt_result = calc.run_opt(geom.atoms, geom.coords, keep=False) try: opt_geom = opt_result.opt_geom except AttributeError: opt_geom = None self.calc_counter += 1 return opt_geom
[docs] def write_geoms_to_trj(self, geoms, fn, comments=None): with open(fn, "w") as handle: handle.write( make_trj_str_from_geoms(geoms, comments, energy_comments=True) )
[docs] def run(self): for self.cur_cycle in range(self.max_cycles): cycle_start = time.time() self.log(highlight_text(f"Cycle {self.cur_cycle}")) input_geoms = [self.get_input_geom(self.initial_geom) for _ in range(self.cycle_size)] # Write input geometries to disk self.write_geoms_to_trj(input_geoms, f"cycle_{self.cur_cycle:03d}_input.trj") # Run optimizations on input geometries calc_start = time.time() opt_geoms = list() for i, geom in enumerate(input_geoms, 1): print(f"Optimizing geometry {i:03d}/{self.cycle_size:03d}", end="\r") opt_geoms.append(self.run_geom_opt(geom)) print() calc_end = time.time() calc_duration = calc_end - calc_start self.log(f"Optimizations took {calc_duration:.0f} s.") kept_geoms = list() for geom in opt_geoms: # Do all the filtering and reject all invalid geometries if not self.geom_is_valid(geom): continue energy = i = bisect.bisect_left(self.new_energies, energy) self.new_energies.insert(i, energy) self.new_geoms.insert(i, geom) kept_geoms.append(geom) if i == 0 and len(self.new_energies) > 1: last_minimum = self.new_energies[1] diff = abs(energy - last_minimum) self.log(f"It is a new global minimum at {energy:.4f} au! " f"Last one was {diff:.4f} au higher.") kept_num = len(kept_geoms) trj_filtered_fn = f"cycle_{self.cur_cycle:03d}.trj" # Sort by energy kept_geoms = sorted(kept_geoms, key=lambda g: if kept_geoms: self.write_geoms_to_trj(kept_geoms, trj_filtered_fn) self.log(f"Kicks in cycle {self.cur_cycle} produced " f"{kept_num} new geometries.") self.break_in = self.break_after elif self.break_in == 0: self.log("Didn't find any new geometries in the last " f"{self.break_after} cycles. Exiting!") break else: self.log(f"Cycle {self.cur_cycle} produced no new geometries.") self.break_in -= 1 cycle_end = time.time() cycle_duration = cycle_end - cycle_start self.log(f"Cycle {i} took {cycle_duration:.0f} s.") self.log("") if check_for_end_sign(): break self.log(f"Run produced {len(self.new_energies)} geometries!") # Return empty list of nothing was found if not self.new_energies: return [] fn = "final.trj" self.write_geoms_to_trj(self.new_geoms, fn) # self.new_energies = np.array(new_energies) np.savetxt("energies.dat", self.new_energies) first_geom = self.new_geoms[0] first_geom.standard_orientation() = self.new_energies[0] if self.is_analytical2d: return self.new_geoms matched_geoms = [first_geom, ] for geom, energy in zip(self.new_geoms[1:], self.new_energies): rmsd, (_, matched_geom) = matched_rmsd(first_geom, geom) = energy matched_geoms.append(matched_geom) fn_matched = "final_matched.trj" self.write_geoms_to_trj(matched_geoms, fn_matched) return matched_geoms