Source code for pysisyphus.run

import argparse
from collections import namedtuple
from collections.abc import Callable
import copy
import datetime
import itertools as it
import os
from math import modf
from pathlib import Path
import platform
from pprint import pprint
import re
import shutil
import sys
import textwrap
import time
from typing import Union

import distributed
import numpy as np
import scipy as sp
import yaml

from pysisyphus import __version__
from pysisyphus.calculators import *
from pysisyphus.config import OUT_DIR_DEFAULT, p_DEFAULT, T_DEFAULT
from pysisyphus.cos import *
from pysisyphus.cos.GrowingChainOfStates import GrowingChainOfStates
from pysisyphus.color import bool_color
from pysisyphus.exceptions import HEIIsFirstOrLastException
from pysisyphus.dynamics import (
    get_mb_velocities_for_geom,
    mdp,
    md,
    get_colvar,
    Gaussian,
)
from pysisyphus.drivers import (
    relaxed_1d_scan,
    run_afir_paths,
    run_opt,
    run_precontr,
    run_perf,
    print_perf_results,
)
from pysisyphus.drivers.barriers import do_endopt_ts_barriers

from pysisyphus.Geometry import Geometry
from pysisyphus.helpers import (
    confirm_input,
    shake_coords,
    print_barrier,
    get_tangent_trj_str,
)
from pysisyphus.helpers_pure import (
    find_closest_sequence,
    merge_sets,
    recursive_extract,
    recursive_update,
    highlight_text,
    approx_float,
    results_to_json,
)
from pysisyphus.intcoords import PrimitiveNotDefinedException
from pysisyphus.intcoords.setup import get_bond_mat
from pysisyphus.init_logging import init_logging
from pysisyphus.intcoords.PrimTypes import PrimTypes, normalize_prim_inputs
from pysisyphus.intcoords.helpers import form_coordinate_union
from pysisyphus.intcoords.setup import get_bond_sets
from pysisyphus.interpolate import interpolate_all
from pysisyphus.irc import *
from pysisyphus.io import save_hessian
from pysisyphus.stocastic import *
from pysisyphus.thermo import can_thermoanalysis
from pysisyphus.trj import get_geoms, dump_geoms, standardize_geoms
from pysisyphus.xyzloader import write_geoms_to_trj
from pysisyphus.yaml_mods import get_loader, UNITS


CALC_DICT = {
    "afir": AFIR,
    "composite": Composite,
    "conical": ConicalIntersection,
    "dftb+": DFTBp,
    "dimer": Dimer,
    "dummy": Dummy,
    "energymin": EnergyMin,
    "ext": ExternalPotential,
    "g09": Gaussian09,
    "g16": Gaussian16,
    "ipiserver": IPIServer,
    "mopac": MOPAC,
    "multi": MultiCalc,
    "obabel": OBabel,
    "oniom": ONIOM,
    "openmolcas": OpenMolcas,
    "orca": ORCA,
    "orca5": ORCA5,
    "psi4": Psi4,
    "pyxtb": PyXTB,
    "remote": Remote,
    "turbomole": Turbomole,
    "xtb": XTB,
}

try:
    from pysisyphus.calculators.PySCF import PySCF

    CALC_DICT["pyscf"] = PySCF
except ImportError:
    pass

try:
    from pysisyphus.calculators.QCEngine import QCEngine

    CALC_DICT["qcengine"] = QCEngine
except ImportError:
    pass

COS_DICT = {
    "neb": NEB.NEB,
    "aneb": AdaptiveNEB.AdaptiveNEB,
    "feneb": FreeEndNEB.FreeEndNEB,
    "szts": SimpleZTS.SimpleZTS,
    "gs": GrowingString.GrowingString,
    "fs": FreezingString.FreezingString,
}

IRC_DICT = {
    "dvv": DampedVelocityVerlet,
    "euler": Euler,
    "eulerpc": EulerPC,
    "gs": GonzalezSchlegel,
    "imk": IMKMod,
    "lqa": LQA,
    "modekill": ModeKill,
    "rk4": RK4,
}

STOCASTIC_DICT = {
    "frag": FragmentKick,
    "kick": Kick,
}


[docs] def parse_args(args): parser = argparse.ArgumentParser() action_group = parser.add_mutually_exclusive_group(required=True) action_group.add_argument( "yaml", nargs="?", help="Start pysisyphus with input from a YAML file." ) action_group.add_argument( "--clean", action="store_true", help="Ask for confirmation before cleaning." ) action_group.add_argument( "--fclean", action="store_true", help="Force cleaning without prior confirmation.", ) action_group.add_argument( "--bibtex", action="store_true", help="Print bibtex string for pysisyphus paper.", ) action_group.add_argument( "-v", "--version", action="store_true", help="Print pysisyphus version." ) run_type_group = parser.add_mutually_exclusive_group(required=False) run_type_group.add_argument( "--restart", action="store_true", help="Continue a previously crashed/aborted/... pysisphus run.", ) run_type_group.add_argument( "--cp", "--copy", nargs="+", help="Copy .yaml file and corresponding geometries from the 'geom' section " "to a new directory. The first argument is interpreted as destination. Any " "remaining (optional) arguments are files that are also copied.", ) parser.add_argument( "--scheduler", default=None, help="Address of the dask scheduler." ) return parser.parse_args()
[docs] def get_calc_closure(base_name, calc_key, calc_kwargs, iter_dict=None, index=None): if iter_dict is None: iter_dict = dict() if index is None: index = 0 # Maps YAML input to actual Calculator-class arguments calc_map = { "calculator": "calculator", "calc": "calculator", # shortcut for 'calculator' # calc1/calc2 are used for ConicalIntersection and EnergyMin. "calculator1": "calculator1", "calc1": "calculator1", # shortcut for 'calculator1' "calcualtor2": "calculator2", "calc2": "calculator2", # shortcut for 'calculator2' } def calc_getter(**add_kwargs): nonlocal index kwargs_copy = copy.deepcopy(calc_kwargs) # Some calculators are just wrappers, modifying forces from actual calculators, # e.g. AFIR and Dimer. If we find one of the keys in 'calc_map' in 'calc_kwargs' # we create the actual calculator and assign it to the corresponding value in # 'calc_map'. for key, val in calc_map.items(): if key in kwargs_copy: # Use different base_name to distinguish the calculator(s) actual_base_name = val actual_kwargs = kwargs_copy.pop(key) actual_key = actual_kwargs.pop("type") # Pass 'index' to arguments, to avoid recreating calculators with # the same name. actual_calc = get_calc_closure( actual_base_name, actual_key, actual_kwargs, index=index )() kwargs_copy[val] = actual_calc kwargs_copy["base_name"] = base_name kwargs_copy.update(add_kwargs) kwargs_copy["calc_number"] = index for key, iter_ in iter_dict.items(): kwargs_copy[key] = next(iter_) index += 1 return CALC_DICT[calc_key](**kwargs_copy) return calc_getter
[docs] def run_tsopt_from_cos( cos, tsopt_key, tsopt_kwargs, calc_getter=None, ovlp_thresh=0.4, coordinate_union="bonds", ): print(highlight_text(f"Running TS-optimization from COS")) # Later want a Cartesian HEI tangent, so if not already present we create # a Cartesian COS object to obtain the tangent from. atoms = cos.images[0].atoms if cos.coord_type != "cart": cart_images = list() for image in cos.images: cart_image = Geometry(atoms, image.cart_coords) cart_image.energy = image.energy cart_images.append(cart_image) cart_cos = ChainOfStates.ChainOfStates(cart_images) # Just continue using the Cartesian COS object else: cart_cos = cos hei_kind = tsopt_kwargs.pop("hei_kind", "splined") # Use plain, unsplined, HEI if hei_kind == "plain": hei_index = cos.get_hei_index() hei_image = cos.images[hei_index] # Select the Cartesian tangent from the COS cart_hei_tangent = cart_cos.get_tangent(hei_index) # Use splined HEI elif hei_kind == "splined": # The splined HEI tangent is usually very bady for the purpose of # selecting an imaginary mode to follow uphill. So we construct a better # HEI tangent by mixing the two tangents closest to the splined HEI. hei_coords, hei_energy, splined_hei_tangent, hei_index = cos.get_splined_hei() hei_image = Geometry(atoms, hei_coords) close_to_first = hei_index < 0.5 close_to_last = hei_index > len(cos.images) - 1.5 if close_to_first or close_to_last: close_to = "first" if close_to_first else "last" print( f"Splined HEI is too close to the {close_to} image. Aborting TS optimization!" ) raise HEIIsFirstOrLastException # The hei_index is a float. We split off the decimal part and mix the two # nearest tangents accordingly. frac, floor = modf(hei_index) # Indices of the two nearest images with integer indices. floor = int(floor) ceil = floor + 1 floor_tangent = cart_cos.get_tangent(floor) ceil_tangent = cart_cos.get_tangent(ceil) print(f"Creating mixed HEI tangent, using tangents at images {(floor, ceil)}.") print("Overlap of splined HEI tangent with these tangents:") for ind, tang in ((floor, floor_tangent), (ceil, ceil_tangent)): print(f"\t{ind:02d}: {splined_hei_tangent.dot(tang):.6f}") # When frac is big, e.g. 0.9 the tangent resembles the tangent at 'ceil' # so we mix in only (1-frac) == (1-0.9) == 0.1 of the 'floor' tangent. cart_hei_tangent = (1 - frac) * floor_tangent + frac * ceil_tangent cart_hei_tangent /= np.linalg.norm(cart_hei_tangent) # print(f"\t(1-{frac:.4f})*t({floor})+{frac:.4f}*t({ceil}): " # f"{splined_hei_tangent.dot(cart_hei_tangent):.6f}" # ) else: raise Exception(f"Invalid hei_kind='{hei_kind}'!") print(f"Index of {hei_kind} highest energy image (HEI) is {hei_index:.2f}.") print() # When the COS was optimized in internal coordinates the united primitive # indices are already present and we just keep on using them. try: typed_prims = hei_image.internal.typed_prims # If the COS was optimized in Cartesians we have to generated a new # set of primitive internals. except AttributeError: def get_int_geom(geom): return Geometry(geom.atoms, geom.cart_coords, coord_type="redund") internal_geom1 = get_int_geom(cos.images[0]) internal_geom2 = get_int_geom(cos.images[-1]) typed_prims = form_coordinate_union(internal_geom1, internal_geom2) coord_kwargs = dict() if coordinate_union == "all": coord_kwargs["typed_prims"] = typed_prims union_msg = "Using full coordinate union for TS guess. Probably a bad idea!" elif coordinate_union in ("bonds", "bonds_bends_dihedrals"): # Only keep actual bonds ... valid_prim_types = (PrimTypes.BOND,) # ... and bends and dihedrals, if requested if coordinate_union == "bonds_bends_dihedrals": valid_prim_types += (PrimTypes.BEND, PrimTypes.PROPER_DIHEDRAL) coord_kwargs["define_prims"] = [ tp for tp in typed_prims if tp[0] in valid_prim_types ] union_msg = f"Kept primitive types: {valid_prim_types}" else: union_msg = "No coordinate union." print(union_msg) ts_geom_kwargs = tsopt_kwargs.pop("geom") ts_coord_type = ts_geom_kwargs.pop("type") if ts_coord_type != "cart": ts_geom_kwargs["coord_kwargs"].update(coord_kwargs) ts_geom = Geometry( hei_image.atoms, hei_image.cart_coords, coord_type=ts_coord_type, **ts_geom_kwargs, ) # Convert tangent from whatever coordinates to redundant internals. # When the HEI was splined the tangent will be in Cartesians. if ts_coord_type in ("cart", "cartesian"): ref_tangent = cart_hei_tangent elif ts_coord_type in ("redund", "dlc", "tric"): ref_tangent = ts_geom.internal.B @ cart_hei_tangent else: raise Exception(f"Invalid coord_type='{ts_coord_type}'!") ref_tangent /= np.linalg.norm(ref_tangent) # Dump HEI data # # Cartesian tangent and an animated .trj file cart_hei_fn = "cart_hei_tangent" np.savetxt(cart_hei_fn, cart_hei_tangent) trj = get_tangent_trj_str( ts_geom.atoms, ts_geom.cart_coords, cart_hei_tangent, points=10 ) trj_fn = cart_hei_fn + ".trj" with open(trj_fn, "w") as handle: handle.write(trj) print(f"Wrote animated HEI tangent to {trj_fn}\n") # Print HEI information (coords & tangent) print(f"{hei_kind.capitalize()} HEI (TS guess)") print(ts_geom.as_xyz()) print() dummy = Geometry(atoms, cart_hei_tangent) print(f"{hei_kind.capitalize()} Cartesian HEI tangent") print(dummy.as_xyz()) print() # Write out HEI information (coords & tangent) hei_xyz_fn = f"{hei_kind}_hei.xyz" with open(hei_xyz_fn, "w") as handle: handle.write(ts_geom.as_xyz()) print(f"Wrote {hei_kind} HEI coordinates to '{hei_xyz_fn}'") ts_calc = calc_getter() def wrapped_calc_getter(): return ts_calc ts_geom.set_calculator(ts_calc) tsopt_kwargs["prefix"] = "ts" if tsopt_key == "dimer": # Right now Dimer optimization is rectricted to cartesian # rotations and translations, even though translation in # internals would be possible. ts_geom = Geometry(hei_image.atoms, hei_image.cart_coords) dimer_kwargs = tsopt_kwargs.pop("dimer_kwargs", {}) dimer_kwargs.update( { "N_raw": cart_hei_tangent, "base_name": "dimer", } ) def wrapped_calc_getter(): return Dimer(ts_calc, **dimer_kwargs) tsopt_key = "plbfgs" # When calling run_opt we pass the Hessian as additional argument, # so it is not recalculated unecessary. As no Hessian is available # for the dimer method we set it None. cart_hessian = None # Determine which imaginary mode has the highest overlap with the splined HEI tangent. else: print(f"Calculating Hessian at {hei_kind} TS guess.") # Calculate Hessian cart_hessian = ts_geom.cart_hessian # Continue Hessian in whatever coordinate system is actually in use H = ts_geom.hessian eigvals, eigvecs = np.linalg.eigh(H) neg_inds = eigvals < -1e-4 if sum(neg_inds) == 0: raise Exception("No negative eigenvalues found at splined HEI. Exiting!") eigval_str = np.array2string(eigvals[neg_inds], precision=6) print(f"Negative eigenvalues at splined HEI:\n{eigval_str}") neg_eigvals = eigvals[neg_inds] neg_eigvecs = eigvecs.T[neg_inds] ovlps = [np.abs(imag_mode.dot(ref_tangent)) for imag_mode in neg_eigvecs] print("Overlaps between HEI tangent and imaginary modes:") for i, ov in enumerate(ovlps): print(f"\t{i:02d}: {ov:.6f}") max_ovlp_ind = np.argmax(ovlps) max_ovlp = ovlps[max_ovlp_ind] print( f"Imaginary mode {max_ovlp_ind} has highest overlap ({max_ovlp:.2%}) " "with splined HEI tangent." ) rel_ovlps = np.array(ovlps) / max(ovlps) similar_inds = rel_ovlps > 0.80 # Only 1 big overlap is present if (max_ovlp >= ovlp_thresh) and (similar_inds.sum() == 1): ovlp_root = np.argmax(ovlps) # Multiple big and similar overlaps are present. elif (max_ovlp >= ovlp_thresh) and (similar_inds.sum() > 1): # Will yield the first occurence of True, which corresponds to a # similar overlaps with the most negative eigenvalue. ovlp_root = similar_inds.argmax() neg_eigval = neg_eigvals[ovlp_root] verbose_inds = np.arange(neg_eigvals.size)[similar_inds] print( f"Overlaps {verbose_inds} are very similar! Falling back to the " f"one with the most negative eigenvalue {neg_eigval:.6f} " f"(mode {ovlp_root})." ) # Fallback to the most negative eigenvalue when all overlaps are too low. else: ovlp_root = neg_eigvals.argmin() neg_eigval = neg_eigvals[ovlp_root] print( f"Highest overlap {max_ovlp:.6f} is below the threshold " f"of {ovlp_thresh:.6f}.\nFalling back to mode {ovlp_root} with most " f"negative eigenvalue {neg_eigval:.6f}." ) root = tsopt_kwargs.get("root", None) if root is None: # Use mode with highest overlap as initial root tsopt_kwargs["root"] = ovlp_root else: print( f"Initial root={root} given, neglecting root {ovlp_root} " "determined from overlaps." ) opt_result = run_opt( ts_geom, calc_getter=wrapped_calc_getter, opt_key=tsopt_key, opt_kwargs=tsopt_kwargs, cart_hessian=cart_hessian, title="TS-Optimization", copy_final_geom="ts_opt.xyz", ) ts_geom = opt_result.geom ts_opt = opt_result.opt # Restore original calculator for Dimer calculations if tsopt_key == "dimer": ts_geom.set_calculator(ts_calc) print(f"Optimized TS coords:") print(ts_geom.as_xyz()) # Include ts_ prefix ts_opt_fn = ts_opt.get_path_for_fn("opt.xyz") with open(ts_opt_fn, "w") as handle: handle.write(ts_geom.as_xyz()) print(f"Wrote TS geometry to '{ts_opt_fn}\n'") ts_energy = ts_geom.energy first_cos_energy = cos.images[0].energy last_cos_energy = cos.images[-1].energy print_barrier(ts_energy, first_cos_energy, "TS", "first COS image") print_barrier(ts_energy, last_cos_energy, "TS", "last COS image") print() return opt_result
[docs] def run_calculations( geoms: list[Geometry], calc_getter: Callable, scheduler: Union[str, distributed.LocalCluster] = None, assert_track: bool = False, run_func: str = None, one_calculator: bool = False, ) -> tuple[list[Geometry], list[dict]]: """Run calculations for all given geometries. Sometimes one just wants to run a series of calculations for list of geometries, w/o any optimization or IRC integration etc. This function will be executed when the YAML inputs lacks any opt/tsopt/irc/... section. Parameters ---------- geoms List of geometries. Depending on the values of the other arguments all geometries must be/don't have to be compatible (same atom order). When only one calculator is utilized all geometries must be compatible. calc_getter Function that returns a new calculator. scheduler Optional; address or distributed.LocalCluster that is utilized to carry out all calculations in parallel. assert_track Whether it is asserted that ES tracking is enabled for all calculators. I seem to have addes this flag > 5 years ago and today I can't tell you why I did this. run_func By default 'run_calculation()' will be called for the calculator, but with run_func another method name can be specified, e.g., 'get_hessian'. one_calculator Defaults to false. When enabled all calculations will be carried out using the same calculator object. All geometries must be compatible (same atoms and same atom order) and parallel calculations are not possible. This is a useful option for excited state calculations along a path, e.g., from a COS calculation, as all ES data will be dumped in one 'overlap_data.h5' file. Returns ------- geoms List of geometries that were used for the calculations. all_resuls List of calculation results from the different calculations. """ print(highlight_text("Running calculations")) func_name = "run_calculation" if run_func is None else run_func def par_calc(geom): return getattr(geom.calculator, func_name)(geom.atoms, geom.coords) if one_calculator: geom0 = geoms[0] for other_geom in geoms[1:]: geom0.assert_compatibility(other_geom) print("Doing all calculations with the same calculator.") calc = calc_getter() # Overwrite calc_getter to always return the same calculator calc_getter = lambda: calc if scheduler: print( "Parallel calculations are not possible with only one calculator. " "Switching to serial mode." ) scheduler = None for geom in geoms: geom.set_calculator(calc_getter()) if assert_track: assert all( [geom.calculator.track for geom in geoms] ), "'track: True' must be present in calc section." if scheduler: client = distributed.Client(scheduler, pure=False, silence_logs=False) results_futures = client.map(par_calc, geoms) all_results = client.gather(results_futures) else: all_results = list() i_fmt = "02d" for i, geom in enumerate(geoms): print(highlight_text(f"Calculation {i:{i_fmt}}", level=1)) start = time.time() print(geom) results = getattr(geom.calculator, func_name)(geom.atoms, geom.cart_coords) # results dict of MultiCalc will contain keys that can't be dumped yet. So # we skip the JSON dumping when KeyError is raised. try: as_json = results_to_json(results) calc = geom.calculator # Decrease counter, because it will be increased by 1, w.r.t to the # calculation. json_fn = calc.make_fn("results", counter=calc.calc_counter - 1) with open(json_fn, "w") as handle: handle.write(as_json) msg = f"Dumped JSON results to '{json_fn}'." except KeyError: msg = "Skipped JSON dump of calculation results!" print(msg) hessian_results = recursive_extract(results, "hessian") energy_results = recursive_extract(results, "energy") for (*rest, _), hessian in hessian_results.items(): energy_key = (*rest, "energy") energy = energy_results[energy_key] prefix = ("_".join(rest) if rest else calc.name) + "_" h5_fn = f"{prefix}hessian.h5" save_hessian( h5_fn, geom, cart_hessian=hessian, energy=energy, ) print(f"Dumped hessian to '{h5_fn}'.") all_results.append(results) if i < (len(geoms) - 1): try: cur_calculator = geom.calculator next_calculator = geoms[i + 1].calculator next_calculator.set_chkfiles(cur_calculator.get_chkfiles()) msg = f"Set chkfiles of calculator {i:{i_fmt}} on calculator {i+1:{i_fmt}}" except AttributeError: msg = "Calculator does not support set/get_chkfiles!" print(msg) end = time.time() diff = end - start print(f"Calculation took {diff:.1f} s.\n") sys.stdout.flush() print() for geom, results in zip(geoms, all_results): try: geom.set_results(results) except KeyError: pass return geoms, all_results
[docs] def run_stocastic(stoc): # Fragment stoc.run() print() return stoc
[docs] def run_md(geom, calc_getter, md_kwargs): print(highlight_text(f"Running Molecular Dynamics")) calc = calc_getter() geom.set_calculator(calc) T_init_vel = md_kwargs.pop("T_init_vel") steps = md_kwargs.pop("steps") dt = md_kwargs.pop("dt") seed = md_kwargs.pop("seed", None) _gaussian = md_kwargs.pop("gaussian", {}) gaussians = list() for g_name, g_kwargs in _gaussian.items(): # Create collective variable cv_kwargs = g_kwargs.pop("colvar") cv_key = cv_kwargs.pop("type") colvar = get_colvar(cv_key, cv_kwargs) # Create & append Gaussian g_w = g_kwargs.pop("w") g_s = g_kwargs.pop("s") g_stride = g_kwargs.pop("stride") gau = Gaussian(w=g_w, s=g_s, colvar=colvar, dump_name=g_name) gaussians.append((g_name, gau, g_stride)) remove_com_v = md_kwargs.get("remove_com_v") v0 = get_mb_velocities_for_geom( geom, T_init_vel, seed=seed, remove_com_v=remove_com_v, remove_rot_v=False ).flatten() md_result = md(geom, v0=v0, steps=steps, dt=dt, gaussians=gaussians, **md_kwargs) # from pysisyphus.xyzloader import coords_to_trj # trj_fn = "md.trj" # _ = coords_to_trj( # trj_fn, geom.atoms, md_result.coords[::md_kwargs["dump_stride"]] # ) print() return md_result
[docs] def run_scan(geom, calc_getter, scan_kwargs, callback=None): print(highlight_text("Relaxed Scan") + "\n") assert ( geom.coord_type != "cart" ), "Internal coordinates are required for coordinate scans." type_ = scan_kwargs["type"] indices = scan_kwargs["indices"] steps = scan_kwargs["steps"] start = scan_kwargs.get("start", None) end = scan_kwargs.get("end", None) step_size = scan_kwargs.get("step_size", None) symmetric = scan_kwargs.get("symmetric", False) # The final prim value is determined either as # start + steps*step_size # or # (end - start) / steps . # # So we always require steps and either end or step_size. # bool(a) != bool(b) amounts to an logical XOR. assert (steps > 0) and ( bool(end) != bool(step_size) ), "Please specify either 'end' or 'step_size'!" if symmetric: assert step_size and ( start is None ), "'symmetric: True' requires 'step_size' and 'start == None'!" constrain_prims = normalize_prim_inputs(((type_, *indices),)) constr_prim = constrain_prims[0] start_was_none = start is None if start_was_none: try: constr_ind = geom.internal.get_index_of_typed_prim(constr_prim) # Recreate geom with appropriate primitive except PrimitiveNotDefinedException: geom = geom.copy(coord_kwargs={"define_prims": constrain_prims}) constr_ind = geom.internal.get_index_of_typed_prim(constr_prim) # The given indices may not correspond exactly to a typed primitives, # as they may be reversed. So we fetch the actual typed primitive. constr_prim = geom.internal.typed_prims[constr_ind] start = geom.coords[constr_ind] if step_size is None: step_size = (end - start) / steps opt_kwargs = scan_kwargs["opt"].copy() opt_key = opt_kwargs.pop("type") def wrapper(geom, start, step_size, steps, pref=None): return relaxed_1d_scan( geom, calc_getter, [ constr_prim, ], start, step_size, steps, opt_key, opt_kwargs, pref=pref, callback=callback, ) if not symmetric: scan_geoms, scan_vals, scan_energies = wrapper(geom, start, step_size, steps) else: # Negative direction print(highlight_text("Negative direction", level=1) + "\n") minus_geoms, minus_vals, minus_energies = wrapper( geom, start, -step_size, steps, pref="minus" ) init_geom = minus_geoms[0].copy() # Positive direction. Compared to the negative direction we start at a # displaced geometry and reduce the number of steps by 1. print(highlight_text("Positive direction", level=1) + "\n") plus_start = start + step_size # Do one step less, as we already start from the optimized geometry plus_steps = steps - 1 plus_geoms, plus_vals, plus_energies = wrapper( init_geom, plus_start, step_size, plus_steps, pref="plus" ) scan_geoms = minus_geoms[::-1] + plus_geoms scan_vals = np.concatenate((minus_vals[::-1], plus_vals)) scan_energies = np.concatenate((minus_energies[::-1], plus_energies)) trj = "\n".join([geom.as_xyz() for geom in scan_geoms]) with open("relaxed_scan.trj", "w") as handle: handle.write(trj) scan_data = np.stack((scan_vals, scan_energies), axis=1) np.savetxt(f"relaxed_scan.dat", scan_data) return scan_geoms, scan_vals, scan_energies
[docs] def run_preopt(first_geom, last_geom, calc_getter, preopt_key, preopt_kwargs): strict = preopt_kwargs.pop("strict", False) geom_kwargs = preopt_kwargs.pop("geom") coord_type = geom_kwargs.pop("type") def recreate_geom(geom): return Geometry(geom.atoms, geom.coords, coord_type=coord_type, **geom_kwargs) first_geom = recreate_geom(first_geom) last_geom = recreate_geom(last_geom) opt_results = list() for geom, key in ((first_geom, "first"), (last_geom, "last")): # Backup original geometry for RMSD calculation org_geom = geom.copy(coord_type="cart") prefix = f"{key}_pre" opt_kwargs = preopt_kwargs.copy() opt_kwargs.update( { "prefix": prefix, "h5_group_name": prefix, } ) opt_result = run_opt( geom, calc_getter, preopt_key, opt_kwargs, title=f"{key} preoptimization" ) opt = opt_result.opt opt_results.append(opt_result) # Continue with next pre-optimization when stopped manually if strict and not opt.stopped and not opt.is_converged: print(f"Problem in preoptimization of {key}. Exiting!") sys.exit() print(f"Preoptimization of {key} geometry converged!") opt_fn = opt.get_path_for_fn(f"opt.xyz") shutil.move(opt.final_fn, opt_fn) print(f"Saved final preoptimized structure to '{opt_fn}'.") rmsd = org_geom.rmsd(opt_result.geom) print(f"RMSD with initial geometry: {rmsd:.6f} au") print() return opt_results
[docs] def run_irc(geom, irc_key, irc_kwargs, calc_getter): print(highlight_text(f"Running IRC") + "\n") calc = calc_getter() calc.base_name = "irc" geom.set_calculator(calc, clear=False) # Recreate geometry with Cartesian coordinates if needed. if geom.coord_type != "cart": geom = geom.copy_all(coord_type="cart") irc = IRC_DICT[irc_key](geom, **irc_kwargs) irc.run() return irc
[docs] def do_rmsds(xyz, geoms, end_fns, end_geoms, preopt_map=None, similar_thresh=0.25): if len(end_fns) != 2 or len(end_geoms) != 2: return end_fns = [str(fn) for fn in end_fns] max_end_len = max(len(s) for s in end_fns) if len(geoms) == 1: return elif len(geoms) > 2: geoms = (geoms[0], geoms[-1]) assert len(geoms) == 2 if isinstance(xyz, str): xyz = (f"{xyz}, first entry", f"{xyz}, last entry") elif (not isinstance(xyz, str)) and len(xyz) >= 2: xyz = (xyz[0], xyz[-1]) assert len(xyz) == 2 # Append original filenames, if supplied if preopt_map is not None: xyz = [f"{preopt}/{preopt_map[preopt]}" for preopt in xyz] max_len = max(len(s) for s in xyz) print(highlight_text(f"RMSDs After End Optimizations")) print() start_cbms = [get_bond_mat(geom) for geom in geoms] end_cbms = [get_bond_mat(geom) for geom in end_geoms] for i, start_geom in enumerate(geoms): fn = xyz[i] found_similar = False print(f"start geom {i:>2d} ({fn:>{max_len}s})") # Condensed bond mat start_cbm = start_cbms[i] for j, end_geom in enumerate(end_geoms): end_fn = end_fns[j] # Compare bond matrices cbm_match = (start_cbm == end_cbms[j]).all() cbm_str = "(bond matrices match)" if cbm_match else "" rmsd = start_geom.rmsd(end_geom) similar_str = "" if rmsd < similar_thresh: found_similar = True similar_str = " (similar)" print( f"\tend geom {j:>2d} ({end_fn:>{max_end_len}s}): " f"RMSD={rmsd:>8.6f} au{similar_str} " + cbm_str ) if not found_similar: print(f"\tRMSDs of end geometries are dissimilar to '{fn}'!") print()
[docs] def run_endopt(irc, endopt_key, endopt_kwargs, calc_getter): print(highlight_text(f"Optimizing reaction path ends")) # Gather geometries that shall be optimized and appropriate keys. to_opt = list() if irc.forward: coords = irc.all_coords[0] to_opt.append((coords, "forward")) if irc.backward: coords = irc.all_coords[-1] to_opt.append((coords, "backward")) if irc.downhill: coords = irc.all_coords[-1] to_opt.append((coords, "downhill")) def to_frozensets(sets): return [frozenset(_) for _ in sets] separate_fragments = endopt_kwargs.pop("fragments", False) total = separate_fragments in ("total", False) # Convert to array for easy indexing with the fragment lists atoms = np.array(irc.atoms) fragments_to_opt = list() # Expand endpoints into fragments if requested for coords, key in to_opt: base_name = f"{key}_end" c3d = coords.reshape(-1, 3) fragments = list() fragment_names = list() # Detect separate fragments if requested. if separate_fragments: bond_sets = to_frozensets(get_bond_sets(atoms.tolist(), c3d)) # Sort atom indices, so the atoms don't become totally scrambled. fragments.extend([sorted(frag) for frag in merge_sets(bond_sets)]) # Disable higher fragment counts. I'm looking forward to the day # this ever occurs and someone complains :) assert len(fragments) < 10, "Something probably went wrong" fragment_names.extend( [f"{base_name}_frag{i:02d}" for i, _ in enumerate(fragments)] ) print(f"Found {len(fragments)} fragment(s) at {base_name}") for frag_name, frag in zip(fragment_names, fragments): print(f"\t{frag_name}: {len(frag)} atoms") # Optimize the whole geometries, without splitting them into fragments # Skip this optimization if separate fragments are requested and only # one fragment is present, which would result in twice the same optimization. skip_one_frag = separate_fragments and len(fragments) == 1 if total and not skip_one_frag: # Dummy fragment containing all atom indices. fragments.extend( [ range(len(atoms)), ] ) fragment_names.extend( [ base_name, ] ) elif skip_one_frag: print( f"Only one fragment present for '{key}'. Skipping optimization " "of total system." ) fragment_keys = [key] * len(fragments) fragment_atoms = [tuple(atoms[list(frag)]) for frag in fragments] fragment_coords = [c3d[frag].flatten() for frag in fragments] fragments_to_opt.extend( list(zip(fragment_keys, fragment_names, fragment_atoms, fragment_coords)) ) print() to_opt = fragments_to_opt geom_kwargs = endopt_kwargs.pop("geom") coord_type = geom_kwargs.pop("type") results = {k: list() for k in ("forward", "backward", "downhill")} for key, name, atoms, coords in to_opt: geom = Geometry( atoms, coords, coord_type=coord_type, **geom_kwargs, ) initial_fn = f"{name}_initial.xyz" with open(initial_fn, "w") as handle: handle.write(geom.as_xyz()) def wrapped_calc_getter(): calc = calc_getter() calc.base_name = name return calc opt_kwargs = endopt_kwargs.copy() opt_kwargs.update( { "prefix": name, "h5_group_name": name, "dump": True, } ) try: opt_result = run_opt( geom, wrapped_calc_getter, endopt_key, opt_kwargs, title=f"{name} Optimization", level=1, ) except Exception as err: print(f"{err}\nOptimization crashed!") continue final_fn = opt_result.opt.final_fn opt_fn = f"{name}_opt.xyz" shutil.move(final_fn, opt_fn) print(f"Moved '{final_fn.name}' to '{opt_fn}'.\n") results[key].append(opt_result) print() return results["forward"], results["backward"], results["downhill"]
[docs] def run_mdp(geom, calc_getter, mdp_kwargs): cwd = Path(".").resolve() for i in range(3): out_dir = cwd / f"outdir{i:02d}" try: os.mkdir(out_dir) except FileExistsError: print(out_dir, "exists") add_kwargs = { "out_dir": out_dir, } calc = calc_getter(**add_kwargs) geom.set_calculator(calc) mdp_result = mdp(geom, **mdp_kwargs) break return mdp_result
[docs] def copy_yaml_and_geometries(run_dict, yaml_fn, dest_and_add_cp, new_yaml_fn=None): destination, *copy_also = dest_and_add_cp src_path = Path(yaml_fn).resolve().parent destination = Path(destination) try: print(f"Trying to create directory '{destination}' ... ", end="") os.mkdir(destination) print("done") except FileExistsError: print("already exists") if "geom" in run_dict: xyzs = run_dict["geom"]["fn"] try: xyzs = list(xyzs.values()) except AttributeError: pass else: xyzs = run_dict["xyz"] print("Copying:") # Copy geometries # When newlines are present we have an inline xyz formatted string if not "\n" in xyzs: if isinstance(xyzs, str): xyzs = [ xyzs, ] for xyz in xyzs: if xyz.startswith("lib:"): continue shutil.copy(src_path / xyz, destination) print("\t", xyz) else: print("Found inline xyz formatted string. No files to copy!") # Copy additional files for src in copy_also: try: shutil.copy(src_path / src, destination) print(f"\t{src}") except FileNotFoundError: print(f"\tCould not find '{src}'. Skipping!") # Update yaml_fn to match destination yaml_dest_fn = Path(destination.stem).with_suffix(".yaml") shutil.copy(yaml_fn, destination / yaml_dest_fn) print("\t", yaml_fn)
[docs] def get_defaults(conf_dict, T_default=T_DEFAULT, p_default=p_DEFAULT): # Defaults dd = { "assert": None, "afir": None, "barrier": None, "calc": { "type": "dummy", }, "cos": None, "endopt": None, "geom": None, "glob": None, "interpol": None, "irc": None, "md": None, "mdp": None, "opt": None, "perf": None, "precontr": None, "preopt": None, "scan": None, "stocastic": None, "shake": None, "tsopt": None, } mol_opt_defaults = { "dump": True, "max_cycles": 150, "overachieve_factor": 5, "type": "rfo", "do_hess": False, "T": T_default, "p": p_default, } cos_opt_defaults = { "type": "qm", "align": True, "dump": True, } if "interpol" in conf_dict: dd["interpol"] = { "align": True, } if "cos" in conf_dict: dd["cos"] = { "type": "neb", "fix_first": True, "fix_last": True, } dd["opt"] = cos_opt_defaults.copy() # Use a different, more powerful, optimizer when we are not dealing # with a COS-optimization. elif "opt" in conf_dict: dd["opt"] = mol_opt_defaults.copy() elif "stocastic" in conf_dict: dd["stocastic"] = { "type": "frag", } def get_opt_geom_defaults(): return { "type": "redund", "coord_kwargs": {}, } if "tsopt" in conf_dict: dd["tsopt"] = mol_opt_defaults.copy() dd["tsopt"].update( { "type": "rsprfo", "h5_group_name": "tsopt", "prefix": "ts", } ) if "cos" in conf_dict: dd["tsopt"]["geom"] = get_opt_geom_defaults() if "precontr" in conf_dict: dd["precontr"] = { "prefix": "precontr", } if "preopt" in conf_dict: # We can't just copy dd["opt"] because there will probably be # some COS specific optimizer, but we just wan't to optimize the # (molecular) endpoints. dd["preopt"] = mol_opt_defaults.copy() dd["preopt"].update( { # Optimization specific # We are a bit more cautious here "max_cycles": 100, "thresh": "gau_loose", "trust_max": 0.3, "geom": get_opt_geom_defaults(), # Preopt specific "strict": False, } ) # dd["preopt"]["geom"]["type"] = "tric" if "endopt" in conf_dict: dd["endopt"] = mol_opt_defaults.copy() dd["endopt"].update( { "thresh": "gau", "fragments": False, "geom": get_opt_geom_defaults(), } ) if "barriers" in conf_dict: dd["barriers"] = { "T": T_default, "p": p_default, "solv_calc": {}, "do_standard_state_corr": True, } if "shake" in conf_dict: dd["shake"] = { "scale": 0.1, "seed": None, } if "irc" in conf_dict: dd["irc"] = { "type": "eulerpc", "rms_grad_thresh": 1e-3, } if "assert" in conf_dict: dd["assert"] = {} if "geom" in conf_dict: dd["geom"] = { "type": "cart", "coord_kwargs": {}, } if "mdp" in conf_dict: dd["mdp"] = {} if "scan" in conf_dict: dd["scan"] = { "opt": mol_opt_defaults.copy(), "symmetric": False, } dd["scan"]["opt"]["dump"] = False if "md" in conf_dict: md_T = T_default dd["md"] = { "T": md_T, "T_init_vel": md_T, "dt": 0.5, "thermostat": "csvr_2", "timecon": 50, "print_stride": 100, "dump_stride": 10, "remove_com_v": True, } if "perf" in conf_dict: dd["perf"] = { "mems": 2500, "repeat": 3, } if "afir" in conf_dict: dd["afir"] = {} return dd
[docs] def get_last_calc_cycle(): def keyfunc(path): return re.match(r"image_\d+.(\d+).out", str(path))[1] cwd = Path(".") calc_logs = [str(cl) for cl in cwd.glob("image_*.*.out")] calc_logs = sorted(calc_logs, key=keyfunc) grouped = it.groupby(calc_logs, key=keyfunc) # Find the last completly finished cycle. last_length = 0 last_calc_cycle = 0 for calc_cycle, group in grouped: cycle_length = len(list(group)) if cycle_length < last_length: # When this is True we have a cycle that has less # items than last one, that is an unfinished cycle. break last_length = cycle_length last_calc_cycle = int(calc_cycle) if last_calc_cycle == 0: print("Can't find any old calculator logs.") print(f"Last calculation counter is {last_calc_cycle}.") return last_calc_cycle
VALID_KEYS = { "assert", "afir", "barriers", "calc", "cos", "endopt", "geom", "interpol", "irc", "md", "mdp", "opt", "perf", "precontr", "preopt", "scan", "shake", "stocastic", "tsopt", }
[docs] def setup_run_dict(run_dict): org_dict = run_dict.copy() # Load defaults to have a sane baseline run_dict = get_defaults(run_dict) # Update nested entries that are dicts by themselves # Take care to insert a , after the string! key_set = set(org_dict.keys()) assert ( key_set <= VALID_KEYS ), f"Found invalid keys in YAML input: {key_set - VALID_KEYS}" for key in key_set & VALID_KEYS: try: # Recursive update, because there may be nested dicts recursive_update(run_dict[key], org_dict[key]) except TypeError: print(f"Using default values for '{key}' section.") return run_dict
RunResult = namedtuple( "RunResult", ( "preopt_first_geom preopt_last_geom " "cos cos_opt " "ts_geom ts_opt " "end_geoms irc irc_geom " "mdp_result " "opt_geom opt " "calced_geoms calced_results " "stocastic calc_getter " "scan_geoms scan_vals scan_energies " "perf_results " ), )
[docs] def main(run_dict, restart=False, yaml_dir="./", scheduler=None): # Dump run_dict run_dict_copy = run_dict.copy() run_dict_copy["version"] = __version__ with open("RUN.yaml", "w") as handle: yaml.dump(run_dict_copy, handle) if run_dict["interpol"]: interpol_key = run_dict["interpol"].pop("type") interpol_kwargs = run_dict["interpol"] # Preoptimization prior to COS optimization if run_dict["preopt"]: preopt_key = run_dict["preopt"].pop("type") preopt_kwargs = run_dict["preopt"] # Optimization of fragments after IRC integration if run_dict["endopt"]: endopt_key = run_dict["endopt"].pop("type") endopt_kwargs = run_dict["endopt"] if run_dict["opt"]: opt_key = run_dict["opt"].pop("type") opt_kwargs = run_dict["opt"] if run_dict["cos"]: cos_key = run_dict["cos"].pop("type") cos_kwargs = run_dict["cos"] cos_kwargs["scheduler"] = scheduler if run_dict["stocastic"]: stoc_key = run_dict["stocastic"].pop("type") stoc_kwargs = run_dict["stocastic"] if run_dict["tsopt"]: tsopt_key = run_dict["tsopt"].pop("type") tsopt_kwargs = run_dict["tsopt"] if run_dict["irc"]: irc_key = run_dict["irc"].pop("type") irc_kwargs = run_dict["irc"] if run_dict["afir"]: afir_key = run_dict["afir"].pop("type") afir_kwargs = run_dict["afir"] # Handle geometry input. This section must always be present. geom_kwargs = run_dict["geom"] xyz = geom_kwargs.pop("fn") coord_type = geom_kwargs.pop("type") union = geom_kwargs.pop("union", None) #################### # CALCULATOR SETUP # #################### # Prepare calculator calc_key = run_dict["calc"].pop("type") calc_kwargs = run_dict["calc"] calc_run_func = calc_kwargs.pop("run_func", None) calc_one_calc = calc_kwargs.pop("one_calculator", False) calc_kwargs["out_dir"] = calc_kwargs.get("out_dir", yaml_dir / OUT_DIR_DEFAULT) calc_base_name = calc_kwargs.get("base_name", "calculator") if calc_key in ("oniom", "ext"): geoms = get_geoms(xyz, quiet=True) iter_dict = { "geom": iter(geoms), } elif calc_key == "multi": geoms = get_geoms(xyz, quiet=True) iter_dict = { "base_name": iter([geom.name for geom in geoms]), } else: iter_dict = None calc_getter = get_calc_closure( calc_base_name, calc_key, calc_kwargs, iter_dict=iter_dict ) # Create second function that returns a wrapped calculator. This may be # useful if we later want to drop the wrapper and use the actual calculator. if "calc" in calc_kwargs: act_calc_kwargs = calc_kwargs["calc"].copy() act_calc_key = act_calc_kwargs.pop("type") act_calc_getter = get_calc_closure( "act_calculator", act_calc_key, act_calc_kwargs ) try: solv_calc_kwargs = run_dict["barriers"].pop("solv_calc") solv_calc_key = solv_calc_kwargs.pop("type") solv_calc_getter = get_calc_closure( "solv_calculator", solv_calc_key, solv_calc_kwargs ) except KeyError: solv_calc_getter = None ################## # GEOMETRY SETUP # ################## # Initial loading of geometries from file(s) geoms = get_geoms(xyz, coord_type="cart") # ------------------------+ # Preconditioning of | # Translation & Rotation | # ------------------------+ if run_dict["precontr"]: ptr_geom0, ptr_geom_m1 = run_precontr( geoms[0], geoms[1], prefix=run_dict["precontr"]["prefix"] ) geoms[0] = ptr_geom0 geoms[-1] = ptr_geom_m1 # -----------------------+ # Preoptimization of | # first & last image(s) | # -----------------------+ # Preoptimization only makes sense with a subsequent COS run. if run_dict["preopt"] and (run_dict["cos"] or run_dict["afir"]): assert len(geoms) > 1 # Preopt should be expanded to support > 2 fragments with AFIR if run_dict["afir"] and len(geoms) > 2: raise Exception("Currently, only the first & last geometry are optimized!") first_opt_result, last_opt_result = run_preopt( geoms[0], geoms[-1], calc_getter, preopt_key, preopt_kwargs, ) # Update with (pre)optimized geometries. 'preopt_first_geom'/'preopt_last_geom' # are assigned here so they can later be assigned to 'run_result': geoms[0] = preopt_first_geom = first_opt_result.geom.copy(coord_type=coord_type) geoms[-1] = preopt_last_geom = last_opt_result.geom.copy(coord_type=coord_type) if run_dict["interpol"]: # Interpolate. Will return the original geometries for between = 0 geoms = interpolate_all(geoms, kind=interpol_key, **interpol_kwargs) dump_geoms(geoms, "interpolated") # Recreate geometries with desired coordinate type and keyword arguments geoms = standardize_geoms(geoms, coord_type, geom_kwargs, union=union) # Create COS objects and supply a function that yields new Calculators, # as needed for growing COS classes, where images are added over time. if run_dict["cos"]: cos_cls = COS_DICT[cos_key] if issubclass(cos_cls, GrowingChainOfStates) or isinstance( cos_cls, type(FreezingString) ): cos_kwargs["calc_getter"] = get_calc_closure("image", calc_key, calc_kwargs) geom = COS_DICT[cos_key](geoms, **cos_kwargs) elif len(geoms) == 1: geom = geoms[0] if run_dict["stocastic"]: stoc_kwargs["calc_kwargs"] = calc_kwargs stocastic = STOCASTIC_DICT[stoc_key](geom, **stoc_kwargs) stocastic = run_stocastic(stocastic) elif run_dict["md"]: md_kwargs = run_dict["md"].copy() run_md(geom, calc_getter, md_kwargs) elif run_dict["scan"]: scan_kwargs = run_dict["scan"] scan_geoms, scan_vals, scan_energies = run_scan(geom, calc_getter, scan_kwargs) elif run_dict["perf"]: perf_results = run_perf(geom, calc_getter, **run_dict["perf"]) print_perf_results(perf_results) elif run_dict["afir"]: ts_guesses, afir_paths = run_afir_paths( afir_key, geoms, calc_getter, **afir_kwargs, ) # This case will handle most pysisyphus runs. A full run encompasses # the following steps: # # (0. Preoptimization, already handled) # 1. (COS)-Optimization # 2. TS-Optimization by TSHessianOptimizer or Dimer method # 3. IRC integration # 4. Optimization of IRC endpoints # # Everything can be chained. All functions operate on the 'geom' object, # which is propagated along through all functions calls. # # All keys are present in 'run_dict', but most of the corresponding values will # be set to zero. elif any( [run_dict[key] is not None for key in ("opt", "tsopt", "irc", "mdp", "endopt")] ): ####### # OPT # ####### if run_dict["opt"]: if run_dict["shake"]: print(highlight_text("Shake coordinates")) shaked_coords = shake_coords(geom.coords, **run_dict["shake"]) geom.coords = shaked_coords print(f"Shaken coordinates:\n{geom.as_xyz()}") opt_result = run_opt( geom, calc_getter, opt_key, opt_kwargs, print_thermo=True ) opt_geom = opt_result.geom opt = opt_result.opt # Keep a backup of the optimized geometry if isinstance(opt_geom, ChainOfStates.ChainOfStates): # Set some variables that are later collected into RunResult cos = opt_geom cos_opt = opt # copy() is not present for ChainOfState objects, so we just keep # using the COS object with a different name. geom = opt_geom else: geom = opt_geom.copy() ######### # TSOPT # ######### abort = False if run_dict["tsopt"]: # Use a separate implementation for TS-Optimizations started from # COS-optimizations. try: if isinstance(geom, ChainOfStates.ChainOfStates): ts_calc_getter = get_calc_closure(tsopt_key, calc_key, calc_kwargs) ts_opt_result = run_tsopt_from_cos( geom, tsopt_key, tsopt_kwargs, ts_calc_getter ) else: ts_opt_result = run_opt( geom, calc_getter, tsopt_key, tsopt_kwargs, title="TS-Optimization", copy_final_geom="ts_opt.xyz", ) ts_geom = ts_opt_result.geom ts_opt = ts_opt_result.opt geom = ts_geom.copy_all() except HEIIsFirstOrLastException: abort = True ####### # IRC # ####### ran_irc = False if (not abort) and run_dict["irc"]: # After a Dimer run we continue with the actual calculator # and not the Dimer calculator. if calc_key == "dimer": calc_getter = act_calc_getter irc_geom = geom.copy() irc = run_irc(geom, irc_key, irc_kwargs, calc_getter) # IRC geom won't have a calculator, so we set the appropriate values here. irc_geom.energy = irc.ts_energy irc_geom.cart_hessian = irc.init_hessian ran_irc = True ########## # ENDOPT # ########## # Run 'endopt' when a previous IRC calculation was done # if ran_irc and run_dict["endopt"]: if run_dict["endopt"]: if not ran_irc: _, irc_geom, _ = geoms # IRC geom should correspond to the TS class DummyIRC: def __init__(self, geoms): first, _, last = geoms self.atoms = copy.copy(first.atoms) self.forward = True self.backward = True self.downhill = False self.all_coords = [ geom.cart_coords.copy() for geom in (first, last) ] self.hessian_init = "dummy" irc = DummyIRC(geoms) do_thermo = run_dict["endopt"].get("do_hess", False) and can_thermoanalysis T = run_dict["endopt"]["T"] p = run_dict["endopt"]["p"] # Order is forward, backward, downhill endopt_results = run_endopt(irc, endopt_key, endopt_kwargs, calc_getter) # Determine "left" and "right" geoms # Only downhill if irc.downhill: left_results = endopt_results[2] # Only backward elif irc.backward and not irc.forward: left_results = endopt_results[1] # Forward and backward run else: left_results = endopt_results[0] left_geoms = [result.geom for result in left_results] left_fns = [result.fn for result in left_results] # Use 'backward' results; might be empty. right_geoms = [result.geom for result in endopt_results[1]] right_fns = [result.fn for result in endopt_results[1]] end_geoms = left_geoms + right_geoms if run_dict["cos"]: end_fns = left_fns + right_fns do_rmsds(xyz, geoms, end_fns, end_geoms) # Try to compute barriers. barrier_kwargs = { "do_thermo": do_thermo, "T": T, "p": p, "calc_getter": calc_getter, "solv_calc_getter": solv_calc_getter, } barrier_kwargs_ = run_dict.get("barriers", {}) barrier_kwargs.update(barrier_kwargs_) do_endopt_ts_barriers( irc_geom, left_geoms, right_geoms, left_fns=left_fns, right_fns=right_fns, **barrier_kwargs, ) # Dump TS and endopt geoms to .trj. But only when we did not optimize # separate fragments. if len(left_geoms) == 1 and len(right_geoms) in (0, 1): trj_fn = "left_ts_right_geoms.trj" write_geoms_to_trj( list(it.chain(left_geoms, [irc_geom], right_geoms)), trj_fn, comments=list(it.chain(left_fns, ["IRC start"], right_fns)), ) print(f"Wrote optimized end-geometries and TS to '{trj_fn}'") if run_dict["mdp"]: mdp_kwargs = run_dict["mdp"] mdp_result = run_mdp(geom, calc_getter, mdp_kwargs) # Fallback when no specific job type was specified else: calced_geoms, calced_results = run_calculations( geoms, calc_getter, scheduler, run_func=calc_run_func, one_calculator=calc_one_calc, ) # We can't use locals() in the dict comprehension, as it runs in its own # local scope. locals_ = locals() results = {key: locals_.get(key, None) for key in RunResult._fields} run_result = RunResult(**results) return run_result
[docs] def check_asserts(results, run_dict): print(highlight_text(f"Asserting results")) assert_ = run_dict["assert"] keys = list(assert_.keys()) objs_attrs = [key.split(".") for key in keys] ref_vals = [assert_[k] for k in keys] matches = list() for i, ((obj, attr), ref_val) in enumerate(zip(objs_attrs, ref_vals)): cur_val = getattr(getattr(results, obj), attr) matched = approx_float(cur_val, ref_val) print(f"{i:02d}: {obj}.{attr}") print(f"\tReference: {ref_val}") print(f"\t Current: {cur_val}") print(f"\t Matches: {bool_color(matched)}") matches.append(matched) assert all(matches) print()
[docs] def do_clean(force=False): """Deletes files from previous runs in the cwd. A similar function could be used to store everything ...""" cwd = Path(".").resolve() rm_globs = ( "cycle*.trj", "interpolated.trj", "interpolated.image*.xyz", "calculator.log", "optimizer.log", "tsoptimizer.log", "wfoverlap.log", "host_*.calculator.log", "host_*.wfoverlap.log", "wfo_*.out" "optimization.trj", "cos.log", "*.gradient", "optimizer_results.yaml", # ORCA specific "*.orca.gbw", "*.orca.cis", "*.orca.engrad", "*.orca.hessian", "*.orca.inp", "*.orca.hess", "*.orca.molden", # OpenMOLCAS specific "calculator*.out", "calculator*.JobIph", "calculator*.RasOrb", "*rasscf.molden", # Turbomole specific "calculator_*.control", "calculator_*.coord", "calculator_*.mos", "calculator_*.ciss_a", "calculator*.sing_a", "*wavefunction.molden", "*input.xyz", "*.coord", # PySCF specific "calculator*.chkfile", "*.pyscf.out", "*.chkfile", # WFOverlap specific "wfo_*.*.out", # XTB specific "image*.grad", "calculator*.grad", "calculator*.xcontrol", "calculator*.charges", "image_*", "splined_ts_guess.xyz", "splined_hei_tangent", "cart_hei_tangent.trj", "dimer_ts.xyz", "dimer_pickle", "interpolated.geom_*.xyz", # Wavefunction overlap "wfo_*", "image*.molden", "jmol.spt", "overlap_data.h5", "*_CDD.png", "*_CDD.cub", "internal_coords.log", "hei_tangent", "optimization.trj", "splined_hei.xyz", "ts_opt.xyz", "final_geometry.xyz", "calculated_init_hessian", "cur_out", # HDF5 files "optimization.h5", "afir.h5", # Optimization files "*_optimization.trj", # Preopt files "first_*", "last_*", # TSOpt "rsirfo*", # IRC files "irc_*", "irc.log", "finished_*", # IRC/Endopt files "backward_*", "forward_*", # Misc "*imaginary_mode_*.trj", "cart_hei_tangent", "ts_calculated_init_cart_hessian", "calculated_final_cart_hessian", "*final_geometry.xyz", "*final_geometries.trj", "current_geometry.xyz", "*current_geometries.trj", "hess_calc_cyc*.h5", "ts_hess_calc_cyc*.h5", "hess_init_irc.h5", "final_hessian.h5", "ts_current_geometry.xyz", "dimer_*", "plain_hei_tangent", "plain_hei.xyz", "hess_calc_irc*.h5", "rebuilt_primitives.xyz", "RUN.yaml", "middle_for_preopt.trj", "relaxed_scan.trj", "too_similar.trj", # MDP "mdp_ee_ascent.trj", "mdp_ee_fin_*.trj", "mdp_ee_init_*.trj", "aligned.geom*xyz", "cos_hei.trj", # Dimer "calculator_*.N", "calculator_*.N.trj", "dimer.log", "*.gfnff_topo", # DFTB+ "*.detailed.out", "*.geometry.gen", "*.dftb_in.hsd", "*.EXC.DAT", "*.XplusY.DAT", "*.dftb.out", "rsprfo_*", "reparametrized.trj", "end_geoms_and_ts.trj", "left_ts_right_geoms.trj", "ts_final_hessian.h5", "third_deriv.h5", "*.ao_ovlp_rec", # MOPAC "*.mopac.aux", "*.mopac.arc", "*.mopac.mop", "*.mopac.out", ) to_rm_paths = list() for glob in rm_globs: to_rm_paths.extend(list(cwd.glob(glob))) to_rm_strs = [str(p) for p in to_rm_paths] for s in to_rm_strs: print(s) def delete(): for p in to_rm_paths: try: os.remove(p) print(f"Deleted {p}") except FileNotFoundError: pass try: os.unlink("cur_out") except FileNotFoundError: pass if force: delete() return # If we dont force the cleaning ask for confirmation first elif to_rm_paths and confirm_input("Delete these files?"): delete() else: print("No files found for removal.")
[docs] def run_from_dict( run_dict, cwd=None, set_defaults=True, yaml_fn=None, cp=None, scheduler=None, clean=False, fclean=False, version=False, restart=False, ): if cwd is None: cwd = Path(".") print_header() # Citation citation = ( "If pysisyphus benefitted your research please cite:\n\n" "\thttps://doi.org/10.1002/qua.26390\n\nGood luck!\n" ) print(citation) init_logging(cwd, scheduler) # Load defaults etc. if set_defaults: run_dict = setup_run_dict(run_dict) sys.stdout.flush() if cp: copy_yaml_and_geometries(run_dict, yaml_fn, cp) return elif clean: do_clean() return elif fclean: do_clean(force=True) return # Return after header was printed elif version: return run_dict_without_none = {k: v for k, v in run_dict.items() if v is not None} pprint(run_dict_without_none) print() sys.stdout.flush() run_result = main(run_dict, restart, cwd, scheduler) if run_dict["assert"] is not None: print() check_asserts(run_result, run_dict) return run_result
[docs] def load_run_dict(yaml_fn): with open(yaml_fn) as handle: yaml_str = handle.read() try: loader = get_loader() try: run_dict = yaml.load(yaml_str, Loader=loader) except yaml.constructor.ConstructorError as err: mobj = re.compile(r"for the tag '\!(\w+)'").search(err.problem) if mobj: err_unit = mobj.group(1) best_match, _ = find_closest_sequence(err_unit, UNITS) print( f"Unknown unit!\nKnown units are\n'{UNITS}'.\n" f"Did you mean '{best_match}', instead of '{err_unit}'?\n" ) raise err assert type(run_dict) == type(dict()) except (AssertionError, yaml.parser.ParserError) as err: print(err) if not (yaml_fn.lower().endswith(".yaml")): print("Are you sure that you supplied a YAML file?") sys.exit(1) return run_dict
[docs] def run(): start_time = datetime.datetime.now() args = parse_args(sys.argv[1:]) # Defaults run_dict = {} yaml_dir = Path(".") if args.yaml: run_dict = load_run_dict(args.yaml) yaml_dir = Path(os.path.abspath(args.yaml)).parent elif args.bibtex: print_bibtex() return run_kwargs = { "cwd": yaml_dir, "set_defaults": True, "yaml_fn": args.yaml, "cp": args.cp, "scheduler": args.scheduler, "clean": args.clean, "fclean": args.fclean, "version": args.version, "restart": args.restart, } run_result = run_from_dict(run_dict, **run_kwargs) end_time = datetime.datetime.now() duration = end_time - start_time # Only keep hh:mm:ss duration_hms = str(duration).split(".")[0] print(f"pysisyphus run took {duration_hms} h.") return 0
if __name__ == "__main__": run()