# [1] https://doi.org/10.26434/chemrxiv-2022-253hc-v2
# Identifying the Marcus dimension of electron transfer from
# ab initio calculations
# Šrut, Lear, Krewald, 2022 on chemrxiv
# [2] https://doi.org/10.1039/D3SC01402A
# Identifying the Marcus dimension of electron transfer from
# ab initio calculations
# Šrut, Lear, Krewald, 2023, actual published version
import datetime
from collections.abc import Callable
import functools
from pathlib import Path
import sys
import time
from typing import List, Optional, Tuple
from distributed import Client
import matplotlib.pyplot as plt
import numpy as np
from pysisyphus import plot_helpers
from pysisyphus.constants import BOHR2ANG, C
from pysisyphus.exceptions import (
CalculationFailedException,
RunAfterCalculationFailedException,
)
from pysisyphus.marcus_dim.config import RMS_THRESH, FIT_RESULTS_FN
import pysisyphus.marcus_dim.types as mdtypes
from pysisyphus.dynamics.wigner import get_wigner_sampler, plot_normal_coords
from pysisyphus.Geometry import Geometry
from pysisyphus.helpers import (
get_tangent_trj_str,
get_fragment_xyzs,
check_for_end_sign,
)
from pysisyphus.helpers_pure import (
approx_float,
eigval_to_wavenumber,
highlight_text,
rms,
)
from pysisyphus.TablePrinter import TablePrinter
from pysisyphus.wavefunction.pop_analysis import (
iao_charges,
mulliken_charges,
)
from pysisyphus.xyzloader import make_xyz_str
_CORR_THRESH = 0.10
########################################
# Plot & report expansion coefficients #
########################################
[docs]
def create_marcus_coefficient_plot(nqs, coeffs, batch=None, ncalcs=None):
fig, ax = plot_helpers.fig_ax_from_canvas_agg()
ax.bar(np.arange(nqs) + 6, coeffs)
ax.set_xlabel("Mode")
ax.set_ylabel("Coefficient")
title_frags = list()
if batch is not None:
title_frags.append(f"Batch {batch}")
if ncalcs is not None:
title_frags.append(f"{ncalcs} calculations")
title = ", ".join(title_frags)
fig.suptitle(title)
return fig, ax
[docs]
def report_marcus_coefficients(coeffs, nus, drop_first, nhighest=10):
sort_inds = np.abs(coeffs).argsort()
highest_inds = sort_inds[-nhighest:][::-1]
coeff_table = TablePrinter(
header=("internal", "0-based", "1-based", "coeff. b", "ṽ / cm⁻¹"),
col_fmts=("int_short", "int_short", "int_short", "float", "float_short"),
)
print("10 highest (absolute) Normal mode expansion coefficients:")
coeff_table.print_header()
for hi in highest_inds:
coeff_table.print_row(
(hi, hi + drop_first, hi + drop_first + 1, coeffs[hi], nus[hi])
)
##############################
# Plot & report correlations #
##############################
[docs]
def create_correlation_plots(
qs, nus, corrs, depos, drop_first, batch, out_dir=".", corr_thresh=_CORR_THRESH
):
out_dir = Path(out_dir)
fig, ax = plot_helpers.fig_ax_from_canvas_agg()
# Normal coordinate limits to get sensible y-axis limits
deposmin = depos.min()
deposmax = depos.max()
deposfit = np.linspace(deposmin, deposmax, 2)
qmin = qs.min()
qmax = qs.max()
print(f"Correlation plots with ρ >= {corr_thresh:.6f}:")
# One plot per normal coordinate
nplots = 0
for i, (q, nu) in enumerate(zip(qs.T, nus)):
corr = corrs[i]
if abs(corr) < corr_thresh:
continue
print(f"\t... processing mode {i:03d}, ρ={corr: >12.6f}")
ax.scatter(depos, q)
# Linear fit
coef = np.polynomial.polynomial.polyfit(depos, q, deg=1, full=False)
poly = np.polynomial.polynomial.Polynomial(coef, domain=(deposmin, deposmax))
ax.plot(deposfit, poly(deposfit), c="red")
ax.set_xlim(deposmin, deposmax)
ax.set_ylim(qmin, qmax)
ax.set_xlabel("Δepos")
ax.set_ylabel(f"$q_{{{i + drop_first}}}$")
title = rf"Batch {batch:02d}: Mode {i:03d}, {nu: >8.2f} cm⁻¹, $\rho$ = {corr:> 8.4f}"
ax.set_title(title)
fn = f"correlation_batch_{batch:02d}_mode_{i:03d}.svg"
fig.savefig(out_dir / fn)
ax.cla()
nplots += 1
print(f"Created {nplots} correlation plots.\n")
plt.close(fig)
[docs]
def report_correlations(nus, corrs, drop_first, corr_thresh=_CORR_THRESH):
corrs_above = np.where(np.abs(corrs) >= corr_thresh)[0]
corrs_above_sorted = np.abs(corrs[corrs_above]).argsort()[::-1]
nabove = len(corrs_above)
print(f"Found {nabove} modes with correlation coefficients >= {corr_thresh:.4f}.\n")
corr_table = TablePrinter(
header=("internal", "0-based", "1-based", "corr. ρ", "ṽ / cm⁻¹"),
col_fmts=("int_short", "int_short", "int_short", "float", "float_short"),
)
corr_table.print_header()
for cas in corrs_above_sorted:
ca = corrs_above[cas]
corr_table.print_row(
(ca, ca + drop_first, ca + drop_first + 1, corrs[ca], nus[ca])
)
print()
[docs]
def get_marcus_dim(cart_displs, qs, properties):
"""Determine Marcus dimension from normal coordinates and properties.
Parameters
----------
cart_displs
2d array with shape (3N, nmodes) holding normal modes, with N being the
number of atoms.
masses
1d array with shape (N, ) holding atomic masses.
qs
2d array with shape (nsamples, nmodes) holding the normal coordinates
for each sample.
properties
1d array with shape (nsamples, ) holding the calculated properties, e.g,.
electron position (centroid of spin density) or exciation energy.
Returns
-------
corrs
Pearson correlation coefficients.
coeffs
Normal mode expansion coefficients.
marcus_dim
Normalized Marcus dimension in unweighted Cartesiand coordinates
of shape (3*natoms, ).
"""
# Only use non-NaN values
mask = ~np.isnan(properties)
qs = qs[mask]
properties = properties[mask]
# Number of normal modes
nmodes = cart_displs.shape[1]
corrs = np.zeros(nmodes)
for i, q in enumerate(qs.T):
corr = np.corrcoef(properties, q)[0, 1]
corrs[i] = corr
# Do least-squares.
print("Solving least squares 'qs . coeffs = depos' for 'coeffs'")
# coeffs, residual, rank, singvals = np.linalg.lstsq(...)
coeffs, *_ = np.linalg.lstsq(qs, properties, rcond=None)
# Construct Marcus dimension as linear combination of normal modes
# according to coefficients b from least-squares fit ...
marcus_dim = np.einsum("i,ki->k", coeffs, cart_displs)
# ... and renormalize vector.
marcus_dim /= np.linalg.norm(marcus_dim)
# Marcus dimension in normal coordinates.
# Normalize coefficients for convenience
marcus_dim_q = coeffs / np.linalg.norm(coeffs)
return corrs, coeffs, marcus_dim, marcus_dim_q
[docs]
def get_displaced_coordinates(wigner_sampler, cart_displs, coords_eq):
"""Displaced (normal) coordinates from Wigner sampling."""
# Sample from wigner distribution
sample = wigner_sampler()
displ_coords = sample.coords3d.flatten()
# Calculate displacements from equilibrium geometry and transform to normal coordinates.
norm_coords = cart_displs.T @ (displ_coords - coords_eq).flatten()
return displ_coords, norm_coords
POP_FUNCS = {
mdtypes.PopKind.IAO: iao_charges,
mdtypes.PopKind.MULLIKEN: mulliken_charges,
}
[docs]
def epos_from_wf(wf, fragments, pop_kind: mdtypes.PopKind = mdtypes.PopKind.IAO):
"""Fragment mulliken charges using intrinsic atomic orbital."""
pop_func = POP_FUNCS[pop_kind]
pop_ana = pop_func(wf)
assert approx_float(pop_ana.tot_charge, wf.charge)
def sum_spin_pop(spin_pop):
per_frags = [spin_pop[frag] for frag in fragments]
frag_sums = [pf.sum() for pf in per_frags]
weights = np.arange(1, len(frag_sums) + 1)
# Centroid of spin density; basically eq. (3) from [2].
epos = (weights * frag_sums).sum()
return frag_sums, epos
_, tot_epos = sum_spin_pop(pop_ana.spin_pop) # alpha + beta
_, alpha_epos = sum_spin_pop(pop_ana.alpha_spin_pop) # alpha only
return tot_epos, alpha_epos
[docs]
def epos_property(geom, fragments, pop_kind):
wf = geom.calculator.get_stored_wavefunction()
# Only use alpha part
# TODO: make this toggable?
_, alpha_epos = epos_from_wf(wf, fragments, pop_kind=pop_kind)
property = alpha_epos
return property
[docs]
def epos_property_iao(geom, fragments):
return epos_property(geom, fragments, pop_kind=mdtypes.PopKind.IAO)
[docs]
def epos_property_mulliken(geom, fragments):
return epos_property(geom, fragments, pop_kind=mdtypes.PopKind.MULLIKEN)
[docs]
def en_exc_property(geom, fragments):
gs_energy, *es_energies = geom.all_energies
assert len(es_energies) > 0
property = es_energies[0] - gs_energy
return property
[docs]
def calculate_property(
i, coords, geom, calc_getter, pal, fragments, prop_eq, property_func
):
displ_geom = geom.copy()
displ_geom.coords = coords
displ_geom.set_calculator(calc_getter(pal=pal, calc_number=i, base_name="displ"))
prop = np.nan
try:
# TODO: move actual calculation into property functions?!
# displ_geom.all_energies
# TODO: do a different calculation for DEXC
displ_geom.calc_wavefunction()
prop = property_func(displ_geom, fragments)
# Subtract property at equilibrium geometry
# If we would not subtract the equilibrium property we would have to pad
# the normal coordinate matrix with an additional column.
prop = prop - prop_eq
except CalculationFailedException as err:
print(err)
print(f"Calculation {i:03d} failed!")
except RunAfterCalculationFailedException as err:
print(err)
print(f"Postprocessing of calculation {i:03d} failed!")
return prop
[docs]
def get_mass(marcus_dim: np.ndarray, masses: np.ndarray) -> float:
"""Mass of fictious particle moving along Marcus dimension.
Eq. (2) in SI of [2].
Parameters
----------
marcus_dim
Array of shape (3N, ) or (N, 3) with N being the number of atoms.
masses
Array of shape (N, ) containing the atom masses in atomic mass units.
Returns
-------
mass
Mass of fictious particle moving along Marcus dimension.
"""
marcus_dim3d = marcus_dim.reshape(-1, 3)
natoms, _ = marcus_dim3d.shape
assert natoms == len(masses)
return (marcus_dim3d**2 * masses[:, None]).sum()
[docs]
def get_wavenumber(
marcus_dim: np.ndarray, nus: np.ndarray, eigvecs: np.ndarray, masses: np.ndarray
) -> float:
"""Wavenumber of Marcus dimension.
Eq. (3) in SI of [2].
The angular frequency is obtained from the square root of the quotient between
force constants and reduced mass.
ω = sqrt(k/μ)
The 'normal' frequency ν is obtained as
ω_i = 2π ν_i = sqrt(k_i/μ_i)
ν_i = 1/2π * sqrt(k_i/μ_i)
ν_i = sqrt(k_i/(4π²μ_i)) (1)
Force constant k_i is
k_i = 4π²ν_i²μ_i (2)
The general ideo of Eq. (3) in the SI of [2] seems to be to calculate the
force constants of all normal modes in the numerator (Ω² C^T M C; compare to
ν² μ in (2)), then contract these values with the Marcus dimension in normal
coordinates. This number is then divided by the mass of the Marcus dimension.
The square root of this fraction yields a frequency with unit 1/s!
Parameters
----------
marcus_dim
Array of shape (3N, ) or (N, 3) with N being the number of atoms.
nus
Array of wavenumbers of shape (N, ).
eigvecs
Array of eigenvectors of the projected mass-weighted Hessian with shape (3N, 3N).
masses
Array of shape (N, ) containing the atom masses in atomic mass units.
Returns
-------
nu_marcus
Wavenumber along Marcus dimension in cm⁻¹.
"""
masses_rep = np.repeat(masses, 3)
a = eigvecs.T @ (np.sqrt(masses_rep) * marcus_dim)
a = a / np.linalg.norm(a)
nus_m = nus * 100 # Wavenumber in 1/m
freqs_s = C * nus_m
# Matrix multiplications with diagonal matrices are replaced by
# element-wise multiplications.
force_constant = (
a[None, :]
[docs]
@ ((freqs_s**2)[:, None] * eigvecs.T)
@ (masses_rep[:, None] * eigvecs)
@ a
)
mass = get_mass(marcus_dim, masses)
freq_marcus = np.sqrt(force_constant / mass)
nu_marcus = float(freq_marcus / C / 100)
return nu_marcus
def get_wavenumber_from_coeffs(coeffs: np.ndarray, nus: np.ndarray) -> float:
"""Wavenumber in cm⁻¹ of Marcus dimension from fit coefficients."""
coeffs2 = coeffs**2
coeff_norm = (coeffs2).sum()
marcus_nu = (coeffs2 * nus).sum() / coeff_norm
return marcus_nu
[docs]
def dump_marcus_dim_xyz_trj(
atoms, coords3d_eq: np.ndarray, marcus_dim: np.ndarray, out_path: Path
):
# XYZ representation of Marcus dimension and animation
marcus_dim_xyz = make_xyz_str(atoms, BOHR2ANG * marcus_dim.reshape(-1, 3))
xyz_path = out_path.with_suffix(".xyz")
with open(xyz_path, "w") as handle:
handle.write(marcus_dim_xyz)
print(f"Current Marcus dimension:\n{marcus_dim_xyz}\n")
marcus_dim_trj = get_tangent_trj_str(
atoms, coords3d_eq.flatten(), marcus_dim, comment="Marcus dimension"
)
# Dump animated Marcus dimension
trj_path = out_path.with_suffix(".trj")
with open(trj_path, "w") as handle:
handle.write(marcus_dim_trj)
return marcus_dim_xyz, marcus_dim_trj
[docs]
def setup_calc_client(geom, scheduler):
if scheduler is not None:
client = Client(scheduler)
pal = 1
sched_info = client.scheduler_info()
n_workers = len(sched_info["workers"])
calc_msg = f"Running {n_workers} calculations in parallel with {pal=} each."
else:
client = None
pal = geom.calculator.pal
calc_msg = f"Running calculations in serial with {pal=}."
return client, pal, calc_msg
[docs]
def run_batch_calcs(batch_size, start_ind, sampler, client, calc_func):
# Get and store displaced coordinates and normal coordinates
batch_displ_coords = list()
batch_norm_coords = list()
for i in range(batch_size):
displ_coords, norm_coords = sampler()
batch_displ_coords.append(displ_coords)
batch_norm_coords.append(norm_coords)
# Loop over cacluclations in current batch
batch_range = range(start_ind, start_ind + batch_size)
if client is not None:
futures = client.map(calc_func, batch_range, batch_displ_coords, pure=False)
batch_properties = client.gather(futures)
else:
batch_properties = list()
for i, displ_coords in enumerate(batch_displ_coords, start_ind):
if i % 5 == 0:
print(f"\t{i}")
sys.stdout.flush()
prop = calc_func(i, displ_coords)
batch_properties.append(prop)
return batch_displ_coords, batch_norm_coords, batch_properties
[docs]
def get_batch_calc_func(
geom,
calc_getter: Callable,
property: mdtypes.Property,
fragments: List[Tuple[int]],
scheduler,
cart_displs: np.ndarray,
temperature: float,
seed=None,
out_dir=Path("."),
):
assert temperature >= 0.0, f"{temperature=} must be positive!"
# Dump fragments
fragments_trj = "\n".join(
get_fragment_xyzs(geom, fragments, with_geom=True, with_dummies=True)
)
fragment_trj_fn = "fragments.trj"
with open(out_dir / fragment_trj_fn, "w") as handle:
handle.write(fragments_trj)
print(f"Wrote fragments to '{fragment_trj_fn}'")
property_funcs = {
property.EPOS_IAO: epos_property_iao,
property.EPOS_MULLIKEN: epos_property_mulliken,
property.EEXC: en_exc_property,
}
property_func = property_funcs[property]
try:
charge = geom.calculator.charge
mult = geom.calculator.mult
except AttributeError:
charge = None
mult = None
if (charge is not None) and (mult is not None):
print(f" Charge: {charge}")
print(f" Multiplicity: {mult}")
print(f" Temperature: {temperature:.2f} K")
print(f" Property: {property}")
# Calculate wavefunction at equilibrium geometry
print("Starting calculation at equilibrium geometry")
# TODO: figure out why this is even needed here ... probably because the calculations aren't done
# inside the property function(s).
gs_energy_eq, *_ = geom.all_energies
# TODO: using 'geom.calc_wavefunction()' leads to a pickling-error later on
prop_eq = property_func(geom, fragments)
print("Finished calculation at equilibrium geometry")
eq_results = {"property_eq": prop_eq, "gs_energy_eq": gs_energy_eq}
client, pal, calc_msg = setup_calc_client(geom, scheduler)
print(calc_msg)
calculate_property_wrapped = functools.partial(
calculate_property,
geom=geom,
calc_getter=calc_getter,
pal=pal,
fragments=fragments,
prop_eq=prop_eq,
property_func=property_func,
)
# Function that create displaced geometries by drawing from a Wigner distribution
wigner_sampler, seed = get_wigner_sampler(geom, temperature=temperature, seed=seed)
sampler_wrapped = functools.partial(
get_displaced_coordinates,
wigner_sampler=wigner_sampler,
cart_displs=cart_displs,
coords_eq=geom.cart_coords,
)
print(f"Seed for Wigner sampling: {seed}")
batch_calc_func = functools.partial(
run_batch_calcs,
sampler=sampler_wrapped,
client=client,
calc_func=calculate_property_wrapped,
)
return eq_results, batch_calc_func
[docs]
def get_batch_calc_func_from_npz(npz_fn):
data = np.load(npz_fn)
displ_coords = data["cart_coords"]
norm_coords = data["normal_coordinates"]
properties = data["properties"]
end_ind = data["end_ind"]
ncalcs = end_ind + 1
def batch_calc_func(batch_size, start_ind):
end_ind = start_ind + batch_size
batch_displ_coords = displ_coords[start_ind:end_ind]
batch_norm_coords = norm_coords[start_ind:end_ind]
batch_properties = properties[start_ind:end_ind]
return batch_displ_coords, batch_norm_coords, batch_properties
return batch_calc_func, ncalcs
[docs]
def fit_marcus_dim_batched(
atoms,
coords3d_eq,
masses,
batch_calc_func: Callable[[int, int], tuple[np.ndarray, np.ndarray, np.ndarray]],
nus,
cart_displs,
to_save: dict,
batch_size: int = 25,
max_batches: int = 20,
rms_thresh: float = RMS_THRESH,
correlations: bool = False,
corr_thresh: float = _CORR_THRESH,
out_dir=".",
):
assert batch_size > 0
assert max_batches > 0
assert rms_thresh > 0.0
ncart_coords, nmodes = cart_displs.shape
drop_first = ncart_coords - nmodes
ncalcs_max = max_batches * batch_size
assert ncalcs_max > 0, "max_batches * batch_size must be greater than zero!"
out_dir = Path(out_dir)
# Arrays holding displace Cartesian coordinates, normal coordinates and properties
all_displ_coords = np.zeros((ncalcs_max, ncart_coords))
all_norm_coords = np.zeros((ncalcs_max, nmodes))
all_properties = np.zeros(ncalcs_max)
if not out_dir.exists():
out_dir.mkdir()
to_save.update(
{
"cart_coords": all_displ_coords,
"normal_coordinates": all_norm_coords,
"properties": all_properties,
"rms_thresh": rms_thresh,
"max_batches": max_batches,
}
)
print(f" Batch size: {batch_size}")
print(f" Max batches: {max_batches}")
print(f"rms threshold: {rms_thresh}")
print(f"Doing at most {ncalcs_max} calculations")
results_fn = out_dir / FIT_RESULTS_FN
# Start calculation batches
prev_coeffs = None
prev_marcus_dim = None
rms_coeffs = None
rms_marcus_dim = None
for batch in range(max_batches):
batch_str = f"batch_{batch:02d}"
start_ind = batch * batch_size
end_ind = start_ind + batch_size
now = datetime.datetime.now()
print(highlight_text(f"Batch {batch}") + "\n")
print(f"Starting at index {start_ind} on {now.strftime('%c')}")
print(f"Doing calculations with indices {start_ind} to {end_ind - 1}.")
sys.stdout.flush()
# Run the actual calculatiions
batch_dur = time.time()
batch_displ_coords, batch_norm_coords, batch_properties = batch_calc_func(
batch_size, start_ind
)
batch_dur = time.time() - batch_dur
# and store the results of this batch in the respective arrays
all_displ_coords[start_ind:end_ind] = batch_displ_coords
all_norm_coords[start_ind:end_ind] = batch_norm_coords
all_properties[start_ind:end_ind] = batch_properties
calc_dur = batch_dur / batch_size
print(f"Did {batch_size} calculations.")
print(f"Full batch took {batch_dur:.2f} s, {calc_dur:.2f} s / calculation")
print(f"Total number of calculations done until now: {end_ind}")
sys.stdout.flush()
# Calculate Marcus dimension using least-squares from normal coordinates and properties
corrs, coeffs, marcus_dim, marcus_dim_q = get_marcus_dim(
cart_displs,
all_norm_coords[:end_ind],
all_properties[:end_ind],
)
# Report expansion coefficients and rms values
report_marcus_coefficients(coeffs, nus, drop_first)
# Dump Marcus dimension and .trj file
marcus_dim_trj_fn = out_dir / f"marcus_dim_{batch_str}.trj"
# Suppress the 2nd argument (marcus_dim_trj)
marcus_dim_xyz, _ = dump_marcus_dim_xyz_trj(
atoms, coords3d_eq, marcus_dim, marcus_dim_trj_fn
)
# Property change along Marcus dimension from fitted coefficients
prop_change_along_marcus_dim = marcus_dim_q.dot(coeffs)
# Mass and wavenumber of Marcus dimension mode
mass_marcus = get_mass(marcus_dim, masses)
nu_marcus = get_wavenumber_from_coeffs(coeffs, nus)
print(f"Mass along Marcus dimension: {mass_marcus:.6f} amu")
print(f"Wavenumber of Marcus dimension: {nu_marcus:.6f} cm⁻¹")
# Dump results
to_save["coeffs"] = coeffs
to_save["marcus_dim"] = marcus_dim
to_save["marcus_dim_q"] = marcus_dim_q
to_save["mass_marcus"] = mass_marcus
to_save["nu_marcus"] = nu_marcus
to_save["prop_change_along_marcus_dim"] = prop_change_along_marcus_dim
to_save["end_ind"] = end_ind
sys.stdout.flush()
print()
if prev_coeffs is not None:
rms_coeffs = rms(prev_coeffs - coeffs)
rms_marcus_dim = rms(prev_marcus_dim - marcus_dim)
print(
f"rms(Δcoeffs)={rms_coeffs:.6f}, rms(ΔMarcus dim)={rms_marcus_dim:.6f}"
)
rms_converged = rms_marcus_dim <= rms_thresh
# Will result in scalar entries
to_save["rms_coeffs"] = rms_coeffs
to_save["rms_marcus_dim"] = rms_marcus_dim
to_save["rms_converged"] = rms_converged
else:
rms_converged = False
# Save the actual data here
np.savez(results_fn, **to_save)
# Plot of expansion coefficients
fig, _ = create_marcus_coefficient_plot(
nmodes, coeffs, batch=batch, ncalcs=end_ind
)
plot_fn = out_dir / f"marcus_coeffs_{batch_str}.svg"
fig.savefig(plot_fn)
print(f"Saved expansion coefficent plot to '{plot_fn}'.\n")
plt.close(fig)
# Plot summary of utilized normal coordinates
fig, _ = plot_normal_coords(
all_norm_coords[:end_ind], fig_ax_func=plot_helpers.fig_ax_from_canvas_agg
)
fig.savefig(out_dir / "normal_coords.svg", dpi=200)
plt.close(fig)
# Correlations between normal modes and properties
if correlations:
report_correlations(nus, corrs, drop_first, corr_thresh=corr_thresh)
create_correlation_plots(
all_norm_coords[:end_ind],
nus,
corrs,
all_properties[:end_ind],
drop_first,
batch=batch,
out_dir=out_dir,
corr_thresh=corr_thresh,
)
sys.stdout.flush()
# Convergence check
if (rms_marcus_dim is not None) and rms_converged:
print(
f"\nCalculation of Marcus dimension converged after {end_ind} calculations!"
)
break
sign = check_for_end_sign()
if sign in ("stop", "converged"):
print(f"Found '{sign}' sign. Breaking.")
break
prev_coeffs = coeffs
prev_marcus_dim = marcus_dim
print()
else:
print("Reached maximum number of requested batches!")
# End of loop
# Write final Marcus dimension
with open(out_dir / "marcus_dim.xyz", "w") as handle:
handle.write(marcus_dim_xyz)
# Report failed calculations
calc_indices = np.arange(end_ind)
failed_mask = np.isnan(all_properties[:end_ind])
for i in calc_indices[failed_mask]:
print(f"Calculation {i:03d} failed!")
print(f"{failed_mask.sum()}/{end_ind} calculations failed.")
print()
print(f"Final Marcus dimension:\n{marcus_dim_xyz}\n")
print(f"Mass along final Marcus dimension: {mass_marcus:.6f} amu")
print(f"Wavenumber of final Marcus dimension: {nu_marcus:.6f} cm⁻¹")
return to_save
[docs]
def fit_marcus_dim(
geom: Geometry,
calc_kwargs: Optional[dict] = None,
batch_kwargs: Optional[dict] = None,
batch_calc_func: Optional[
Callable[[int, int], tuple[np.ndarray, np.ndarray, np.ndarray]]
] = None,
out_dir=".",
):
if batch_kwargs is None:
batch_kwargs = {}
if calc_kwargs is None:
calc_kwargs = dict()
calc_kwargs = calc_kwargs.copy()
_calc_kwargs = {
"scheduler": None,
"seed": None,
}
_calc_kwargs.update(calc_kwargs)
out_dir = Path(out_dir)
nus, eigvals, eigvecs, cart_displs = geom.get_normal_modes()
to_save = {
# Property key
"property": str(property),
# Equilibrium geometry
"cart_coords_eq": geom.cart_coords,
# Samples
"hessian": geom.cart_hessian,
"mw_hessian": geom.mw_hessian,
"linear": geom.is_linear,
"masses": geom.masses,
"eigvals": eigvals,
"eigvecs": eigvecs,
"cart_displs": cart_displs,
"nus": nus,
}
if batch_calc_func is None:
eq_results, batch_calc_func = get_batch_calc_func(
geom,
cart_displs=cart_displs,
out_dir=out_dir,
**calc_kwargs,
)
to_save.update(eq_results)
return fit_marcus_dim_batched(
geom.atoms,
geom.coords3d,
geom.masses,
batch_calc_func,
nus,
cart_displs,
to_save,
**batch_kwargs,
out_dir=out_dir,
)