Source code for pysisyphus.dynamics.mdp

# [1]
#     Minimum dynamic path
#     Unke, 2019

from collections import namedtuple
import operator
import re

import numpy as np

from pysisyphus.constants import AU2KJPERMOL
from pysisyphus.dynamics.driver import md, MDResult
from pysisyphus.dynamics.helpers import (
from pysisyphus.helpers_pure import highlight_text

[docs]def parse_raw_term_func(raw_term_func): funcs = { "<":, ">":, "<=": operator.le, ">=":, "==": operator.eq, } def comp_closure(indices, op, ref_value): a_ind, b_ind = indices func = funcs[op] def comp_func(coords3d): a = coords3d[a_ind] b = coords3d[b_ind] dist = np.linalg.norm(a - b) return func(dist, ref_value) return comp_func operator_re = re.compile("([<>=]+)") mobj = operator_re.split(raw_term_func) if mobj is None: print(f"Could not parse term_func '{raw_term_func}!'") return None indices, op, ref_value = mobj ref_value = float(ref_value) indices = [int(ind) for ind in indices.split(",")] return comp_closure(indices, op, ref_value)
[docs]def parse_raw_term_funcs(raw_term_funcs): comp_funcs = {} for k, v in raw_term_funcs.items(): comp_func = parse_raw_term_func(v) if comp_func: comp_funcs[k] = comp_func return comp_funcs
[docs]def run_md(geom, dt, steps, v0=None, term_funcs=None, external=False): if external and hasattr(geom.calculator, "run_md"): t = dt * steps t_ps = t * 1e-3 md_kwargs = { "atoms": geom.atoms, "coords": geom.coords, "t": t, "dt": dt, "velocities": v0, "dump": dt, } print("Running MD with external calculator implementation.") if term_funcs is not None: print("Termination functions are not supported in external MD!") geoms = geom.calculator.run_md(**md_kwargs) md_result = MDResult( coords=[geom.coords for geom in geoms], t_ps=t_ps, step=int(t / dt - 1), terminated=None, T=None, E_tot=None, ) else: md_kwargs = { "v0": v0, "steps": steps, "dt": dt, "term_funcs": term_funcs, "verbose": False, "remove_com_v": False, } print("Running MD with internal implementation.") md_result = md(geom, **md_kwargs) return md_result
MDPResult = namedtuple( "MDResult", "ascent_xs md_init_plus md_init_minus " "md_fin_plus md_fin_minus " "md_fin_plus_term md_fin_minus_term", )
[docs]def mdp( geom, steps, dt, term_funcs=None, steps_init=None, E_excess=0.0, displ_length=0.1, epsilon=5e-4, ascent_alpha=0.05, max_ascent_steps=25, max_init_trajs=10, dump=True, seed=None, external_md=False, ): # Sanity checks and forcing some types dt = float(dt) assert dt > 0.0 steps = int(steps) t = dt * steps # assert t > dt if steps_init is None: steps_init = steps // 10 print(f"No 'steps_init' provided! Using {steps_init}") E_excess = float(E_excess) assert E_excess >= 0.0 displ_length = float(displ_length) assert displ_length >= 0.0 if term_funcs is None: term_funcs = {} for k, v in term_funcs.items(): if callable(v): continue elif isinstance(v, str): term_funcs[k] = parse_raw_term_func(v) else: raise Exception(f"Invalid term function '{k}: {v}' encountered!") print(highlight_text("Minimum dynamic path calculation")) if seed is None: # 2**32 - 1 seed = np.random.randint(4294967295) np.random.seed(seed) print(f"Using seed {seed} to initialize the random number generator.\n") E_TS = E_tot = E_TS + E_excess # Distribute E_excess evenly on E_pot and E_kin E_pot_diff = 0.5 * E_excess E_pot_desired = E_TS + E_pot_diff print(f"E_TS={E_TS:.6f} au") # Determine transition vector w, v = np.linalg.eigh(geom.hessian) assert w[0] < -1e-8 trans_vec = v[:, 0] # Disable removal of translation/rotation for analytical potentials remove_com_v = remove_rot_v = geom.cart_coords.size > 3 if E_excess == 0.0: print("MDP without excess energy.") # Without excess energy we have to do an initial displacement along # the transition vector to get a non-vanishing gradient. initial_displacement = displ_length * trans_vec x0_plus = geom.coords + initial_displacement x0_minus = geom.coords - initial_displacement v0_zero = np.zeros_like(geom.coords) md_kwargs = { "v0": v0_zero.copy(), "t": t, "dt": dt, "term_funcs": term_funcs, "external": external_md, } geom.coords = x0_plus md_fin_plus = run_md(geom, **md_kwargs) geom.coords = x0_minus md_fin_minus = run_md(geom, **md_kwargs) if dump: dump_coords(geom.atoms, md_fin_plus.coords, "mdp_plus.trj") dump_coords(geom.atoms, md_fin_minus.coords, "mdp_minus.trj") mdp_result = MDPResult( ascent_xs=None, md_init_plus=None, md_init_minus=None, md_fin_plus=md_fin_plus, md_fin_minus=md_fin_minus, ) return mdp_result print(f"E_excess={E_excess:.6f} au, ({E_excess*AU2KJPERMOL:.1f} kJ/mol)") print(f"E_pot,desired=E_TS + {E_pot_diff*AU2KJPERMOL:.1f} kJ/mol") print() # Generate random vector perpendicular to transition vector perp_vec = np.random.rand(*trans_vec.shape) # Zero last element if we have an analytical surface if perp_vec.size == 3: perp_vec[2] = 0 # Orthogonalize vector perp_vec = perp_vec - (perp_vec @ trans_vec) * trans_vec perp_vec /= np.linalg.norm(perp_vec) # Initial displacement from x_TS to x, generating a point with # non-vanishing gradient. x = geom.coords + epsilon * perp_vec geom.coords = x # Do steepest ascent until E_tot is reached E_pot = ascent_xs = list() for i in range(max_ascent_steps): ascent_xs.append(geom.coords.copy()) ascent_converged = E_pot >= E_pot_desired if ascent_converged: break gradient = geom.gradient E_pot = direction = gradient / np.linalg.norm(gradient) step = ascent_alpha * direction new_coords = geom.coords + step geom.coords = new_coords # calc = geom.calculator # class Opt: # pass # _opt = Opt() # _opt.coords = np.array(ascent_xs) # calc.plot_opt(_opt, show=True) assert ascent_converged, "Steepest ascent didn't converge!" assert (E_tot - E_pot) > 0.0, ( "Potential energy after steepst ascent is greater than the desired " f"total energy ({E_pot:.6f} > {E_tot:.6f}). Maybe try a smaller epsilon? " f"The current value Ɛ={epsilon:.6f} may be too big!" ) ascent_xs = np.array(ascent_xs) if dump: dump_coords(geom.atoms, ascent_xs, "mdp_ee_ascent.trj") x0 = geom.coords.copy() print(highlight_text("Runninig initialization trajectories", level=1)) for i in range(max_init_trajs): # Determine random momentum vector for the given kinetic energy E_kin = E_tot - E_pot T = temperature_for_kinetic_energy(len(geom.atoms), E_kin) v0 = get_mb_velocities_for_geom( geom, T, remove_com_v=remove_com_v, remove_rot_v=remove_rot_v ).flatten() # Zero last element if we have an analytical surface if v0.size == 3: v0[2] = 0 # Run initial MD to check if both trajectories run towards different # basins of attraction. # First MD with positive v0 md_init_kwargs = { "v0": v0.copy(), "steps": steps_init, "dt": dt, "external": external_md, } geom.coords = x0.copy() md_init_plus = run_md(geom, **md_init_kwargs) # Second MD with negative v0 geom.coords = x0.copy() md_init_kwargs["v0"] = -v0.copy() md_init_minus = run_md(geom, **md_init_kwargs) dump_coords(geom.atoms, md_init_plus.coords, f"mdp_ee_init_plus_{i:02d}.trj") dump_coords(geom.atoms, md_init_minus.coords, f"mdp_ee_init_minus_{i:02d}.trj") # Check if both MDs run into different basins of attraction. # We (try to) do this by calculating the overlap between the # transition vector and the normalized vector defined by the # difference between x0 and the endpoint of the respective # test trajectory. Both overlaps should have different sings. end_plus = md_init_plus.coords[-1] pls = end_plus - x0 pls /= np.linalg.norm(pls) end_minus = md_init_minus.coords[-1] minus = end_minus - x0 minus /= np.linalg.norm(minus) p = trans_vec @ pls m = trans_vec @ minus init_trajs_converged = np.sign(p) != np.sign(m) if init_trajs_converged: print("Trajectories ran into different basins. Breaking.") break if dump: dump_coords(geom.atoms, md_init_plus.coords, "mdp_ee_init_plus.trj") dump_coords(geom.atoms, md_init_minus.coords, "mdp_ee_init_minus.trj") assert init_trajs_converged print(f"Ran 2*{i+1} initialization trajectories.") print() # Run actual trajectories, using the supplied termination functions if possible. print(highlight_text("Running actual full trajectories.", level=1)) def print_status(terminated, step): if terminated: msg = f"\tTerminated by '{terminated}' in step {step}." else: msg = "\tMax time steps reached!" print(msg) # "Production"/Final MDs md_fin_kwargs = { "v0": v0.copy(), "steps": steps, "dt": dt, "term_funcs": term_funcs, "external": external_md, } # MD with positive v0. geom.coords = x0.copy() md_fin_plus = run_md(geom, **md_fin_kwargs) print_status(md_fin_plus.terminated, md_fin_plus.step) # MD with negative v0. geom.coords = x0.copy() md_fin_kwargs["v0"] = -v0 md_fin_minus = run_md(geom, **md_fin_kwargs) print_status(md_fin_minus.terminated, md_fin_minus.step) md_fin_plus_term = md_fin_plus.terminated md_fin_minus_term = md_fin_minus.terminated if dump: dump_coords(geom.atoms, md_fin_plus.coords, "mdp_ee_fin_plus.trj") dump_coords(geom.atoms, md_fin_minus.coords, "mdp_ee_fin_minus.trj") mdp_result = MDPResult( ascent_xs=ascent_xs, md_init_plus=md_init_plus, md_init_minus=md_init_minus, md_fin_plus=md_fin_plus, md_fin_minus=md_fin_minus, md_fin_plus_term=md_fin_plus_term, md_fin_minus_term=md_fin_minus_term, ) return mdp_result