# [1] http://dx.doi.org/10.1063/1.1323224
# Improved tangent estimate in the nudged elastic band method
# for finding minimum energy paths and saddle points
# Henkelman, Jonsson, 2000
from copy import copy
import logging
import sys
from distributed import Client, LocalCluster
import numpy as np
import psutil
from scipy.interpolate import interp1d, splprep, splev
from pysisyphus.helpers import align_coords, get_coords_diffs
from pysisyphus.helpers_pure import hash_arr, rms
from pysisyphus.modefollow import geom_lanczos
# from pysisyphus.calculators.parallel import distributed_calculations
from pysisyphus.cos.distributed import distributed_calculations
[docs]
class ChainOfStates:
logger = logging.getLogger("cos")
valid_coord_types = ("cart", "cartesian", "dlc")
def __init__(
self,
images,
fix_first=True,
fix_last=True,
align_fixed=True,
climb=False,
climb_rms=5e-3,
climb_lanczos=False,
climb_lanczos_rms=5e-3,
climb_fixed=False,
ts_opt=False,
ts_opt_rms=2.5e-3,
energy_min_mix=False,
scheduler=None,
cluster=False,
cluster_kwargs=None,
progress=False,
):
assert len(images) >= 2, "Need at least 2 images!"
self.images = list(images)
self.fix_first = fix_first
self.fix_last = fix_last
self.align_fixed = align_fixed
self.climb = climb
self.climb_rms = climb_rms
self.climb_lanczos = climb_lanczos
self.climb_lanczos_rms = min(self.climb_rms, climb_lanczos_rms)
self.ts_opt = ts_opt
self.ts_opt_rms = ts_opt_rms
# I really wonder what made me pick 'climb_fixed = True' in e9f039a0 ...
# ... now i know! Sometimes it hampers the convergence, when the CI index
# flips between multiple images.
self.climb_fixed = climb_fixed
self.energy_min_mix = energy_min_mix
# Must not be lower than climb_rms
self.scheduler = scheduler
# Providing only cluster_kwargs also enables dask
self._cluster = bool(cluster) or (cluster_kwargs is not None)
if cluster_kwargs is None:
cluster_kwargs = dict()
# _cluster_kwargs = {
# # Only one calculation / worker
# "threads_per_worker": 1,
# "n_workers": psutil.cpu_count(logical=False),
# }
ncores = psutil.cpu_count(logical=False)
_cluster_kwargs = {
# One worker per cluster with multiple threads
"threads_per_worker": ncores,
"n_workers": 1,
# Register available cores as resource
"resources": {
"CPU": ncores,
},
}
_cluster_kwargs.update(cluster_kwargs)
self.cluster_kwargs = _cluster_kwargs
self.progress = progress
# Can't call self.log before counter is initialized ...
self._external_scheduler = scheduler is not None
self.counter = 0
self._coords = None
self._forces = None
self._energy = None
self.coords_length = self.images[0].coords.size
self.cart_coords_length = self.images[0].cart_coords.size
self.zero_vec = np.zeros(self.coords_length)
"""
It was a rather unfortunate choice to fill/grow self.coords_list and
self.all_true_forces in different methods. self.prepare_opt_cycle()
appends to self.coords_list, while self.calculate_forces() appends to
self.all_true_forces().
After a succsessful COS optimization both lists differ in length;
self.all_true_forces has 1 additional item, compared to self.coords_list,
as self.calculate_forces() is called in Optimizer.prepare_opt() once.
Afterwards, self.coords_list and self.all_true_forces grow in a consistent
manner.
Two choices can be made: keep this discrepancy in mind and omit/neglect
the first item in self.coords_list, or grow another list in
self.calculate_forces(). For now, we will go with the latter option.
"""
# coords_list & force_list are updated in prepare_opt_cycle
self.coords_list = list()
self.forces_list = list()
# all_energies & all_true_forces are updated in calculate_forces()
self.all_energies = list()
self.all_true_forces = list()
# See the multiline comment above
self.all_cart_coords = list()
self.lanczos_tangents = dict()
self.prev_lanczos_hash = None
# Start climbing immediateley with climb_rms == -1
self.started_climbing = self.climb_rms == -1
if self.started_climbing:
self.log("Will start climbing immediately.")
self.started_climbing_lanczos = self.climb_lanczos_rms == -1
if self.started_climbing_lanczos:
self.log("Will use Lanczos algorithm immediately.")
self.started_ts_opt = self.ts_opt_rms == -1
if self.started_ts_opt:
self.log("Will use TS image(s) immediately.")
self.fixed_climb_indices = None
self.fixed_ts_indices = None
# Use original forces for these images
self.org_forces_indices = list()
img0 = self.images[0]
self.image_atoms = copy(img0.atoms)
self.coord_type = img0.coord_type
assert (
self.coord_type in self.valid_coord_types
), f"Invalid coord_type! Supported types are: {self.valid_coord_types}"
assert all(
[img.coord_type == self.coord_type for img in self.images]
), "coord_type of images differ!"
try:
self.typed_prims = img0.internal.typed_prims
except AttributeError:
self.typed_prims = None
@property
def nimages(self) -> int:
return len(self.images)
@property
def calculator(self):
try:
calc = self.images[0].calculator
except IndexError:
calc = None
return calc
[docs]
def propagate(self, force_unique_overlap_data_fns=True):
"""Propagate chkfiles and root information along COS.
Does an energy calculation at every image and tries to propagate
the converged wavefunction to the next image.
When excited states should be tracked it is assumed that the correct root is
set at the first image. In most cases, e.g. in COS calculations started from
YAML input the initial root will be the same for all images. When the correct
root switches between the first image (with the correct root) and the last image,
then using the same root for all images will be wrong. This is also corrected
here. From the second image on the excited state (ES) overlaps are calculated
between the current image and the image before it and the root with the highest
overlap to the root on the previous image is set on the current image. As we
start from the correct root at the first image this will ensure that the correct
root is selected at all images.
If requested, unique names for the dumped overlap data HDF5 will be picked.
"""
try:
track = self.images[0].calculator.track
except AttributeError:
track = False
# To avoid cyclic import
from pysisyphus.calculators.OverlapCalculator import (
track_root_between_ovlp_cals,
)
# If requested, check if all calculators dump their overlap data into unique
# HDF5 files. If they don't, update the filenames.
# Won't really work for Growing-String calculations ...
if track and force_unique_overlap_data_fns:
nimages = len(self.images)
cur_dump_fns = set([image.calculator.dump_fn for image in self.images])
if len(cur_dump_fns) != nimages:
for i, image in enumerate(self.images):
calc = image.calculator
new_dump_fn = calc.dump_fn.with_name(
f"overlap_data_{calc.calc_number:03d}.h5"
)
calc.dump_fn = new_dump_fn
self.log(f"Updated dump_fn of image {i} to {new_dump_fn.name}")
assert (
len(set([image.calculator.dump_fn for image in self.images])) == nimages
)
if track:
for image in self.images:
image.calculator.ovlp_with = "previous"
# Run an energy calculation on all calculators sequentially,
# then propagate chkfiles and root-information along them.
for i, image in enumerate(self.images):
# Do an energy calculation. Don't worry when the calculated/selected root
# is actually the wrong one. This will be correct below and from the second
# iteration on, the roots will be correct.
image.energy
self.log(f"Calculated energy for image {i}.")
calc = image.calculator
# Try to set chkfiles on the next calculator
try:
next_calc = self.images[i + 1].calculator
next_calc.set_chkfiles(calc.get_chkfiles())
self.log(f"Set chkfiles of image {i} at image {i+1}.")
except IndexError:
self.log("Last image. No further image to set chkfiles on!")
except AttributeError:
self.log("Calculator does not implement set_chkfiles/get_chkfiles!")
# Update root information when it is a tracking calculation
if track and i > 0:
prev_calc = self.images[i - 1].calculator
# prev_root = prev_calc.root
# root = calc.root
new_root = track_root_between_ovlp_cals(prev_calc, calc)
calc.root = new_root
self.log(
f"Calculated ES overlaps between image {i-1} and image {i}. "
f"New root at image {i} is {new_root}."
)
# self.log(
# f"Root at image {i-1}: {prev_root}, previous root at image {i}: {root}, "
# f"new root at image {i}: {new_root}."
# )
# Clear the lists storing infomration about the calculated roots.
# If we don't reset them then the first reference cycle will be a cycle with a
# potentially erroneous root, leading to undesired root flips.
if track:
for i, image in enumerate(self.images):
image.calculator.clear_stored_calculations()
image.clear()
print(f"Using root {image.calculator.root} for image {i}.")
self.log("Finished propagating chkfiles/roots along COS.")
# TODO: return roots?!
@property
def is_analytical_2d(self):
try:
ia2d = self.images[0].calculator.analytical_2d
except AttributeError:
ia2d = False
return ia2d
[docs]
def log(self, message):
self.logger.debug(f"Counter {self.counter+1:03d}, {message}")
[docs]
def get_fixed_indices(self):
fixed = list()
if self.fix_first:
fixed.append(0)
if self.fix_last:
fixed.append(len(self.images) - 1)
return fixed
[docs]
def get_dask_local_cluster(self):
cluster = LocalCluster(**self.cluster_kwargs)
link = cluster.dashboard_link
self.log(f"Created {cluster}. Dashboard is available at {link}")
return cluster
@property
def use_dask(self):
return self.scheduler is not None
[docs]
def init_dask(self):
# Return dummy when scheduler address is already present or dask is disabled.
if self._external_scheduler or not self._cluster:
# cluster = ClusterDummy(self.scheduler)
cluster = ClusterDummy() # self.scheduler)
# Create LocalCluster and return it if dask is enabled but no scheduler address
# is set.
# It seems like we can't save the LocalCluster object in the ChainOfStates object,
# because this leads to strange errors while object pickling.
elif self.scheduler is None:
cluster = self.get_dask_local_cluster()
self.scheduler = cluster.scheduler_address
else:
raise Exception("How did I end up here?")
return cluster
[docs]
def exit_dask(self, cluster):
# Don't do anything is cluster/scheduler is externally managed
if not self._external_scheduler:
cluster.close()
self.scheduler = None
self._external_scheduler = False
@property
def moving_indices(self):
"""Returns the indices of the images that aren't fixed and can be
optimized."""
fixed = self.get_fixed_indices()
return [i for i in range(len(self.images)) if i not in fixed]
@property
def last_index(self):
return len(self.images) - 1
@property
def moving_images(self):
return [self.images[i] for i in self.moving_indices]
@property
def max_image_num(self):
return len(self.images)
@property
def image_inds(self):
return list(range(self.max_image_num))
[docs]
def zero_fixed_vector(self, vector):
fixed = self.get_fixed_indices()
for i in fixed:
vector[i] = self.zero_vec
return vector
[docs]
def clear(self):
self._energy = None
self._forces = None
self._hessian = None
try:
self._tangents = None
except AttributeError:
# TODO: move this to another logging level?!
self.log("There are no tangents to reset.")
# @property
# def freeze_atoms(self):
# image_freeze_atoms = [image.freeze_atoms for image in self.images]
# lens = [len(fa) for fa in image_freeze_atoms]
# len0 = lens[0]
# assert all([len_ == len0 for len_ in lens])
# return image_freeze_atoms[0]
@property
def atoms(self):
atoms_ = self.images[0].atoms
return len(self.images) * atoms_
[docs]
def set_vector(self, name, vector, clear=False):
vec_per_image = vector.reshape(-1, self.coords_length)
assert len(self.images) == len(vec_per_image)
for i in self.moving_indices:
setattr(self.images[i], name, vec_per_image[i])
if clear:
self.clear()
@property
def coords(self):
"""Return a flat 1d array containing the coordinates of all images."""
all_coords = [image.coords for image in self.images]
# Note: why does this getter set self._coords? ... I wrote this line 6 years ago.
self._coords = np.concatenate(all_coords)
return self._coords
@coords.setter
def coords(self, coords):
"""Distribute the flat 1d coords array over all images."""
self.set_vector("coords", coords, clear=True)
@property
def cart_coords(self):
"""Return a flat 1d array containing the cartesian coordinates of all
images."""
return np.concatenate([image.cart_coords for image in self.images])
@property
def coords3d(self):
assert self.images[0].coord_type == "cart"
return self.coords.reshape(-1, 3)
@property
def image_coords(self):
return np.array([image.coords for image in self.images])
[docs]
def set_coords_at(self, i, coords):
"""Called from helpers.procrustes with cartesian coordinates.
Then tries to set cartesian coordinate as self.images[i].coords
which will raise an error when coord_type != "cart".
"""
assert self.images[i].coord_type in ("cart", "cartesian"), (
"ChainOfStates.set_coords_at() has to be reworked to support "
"internal coordiantes. Try to set 'align: False' in the 'opt' "
"section of the .yaml input file."
)
if i in self.moving_indices:
self.images[i].coords = coords
# When dealing with a fixed image don't set coords through the
# property, which would result in resetting the image's calculated
# data. Instead, assign coords directly. This only occurs when
# aligning the fixed images.
elif self.align_fixed:
self.images[i]._coords = coords
@property
def energy(self):
self._energy = np.array([image.energy for image in self.images])
return self._energy
@energy.setter
def energy(self, energies):
"""This is needed for some optimizers like CG and BFGS."""
assert len(self.images) == len(energies)
for i in self.moving_indices:
self.images[i].energy = energies[i]
self._energy = energies
[docs]
def par_image_calc(self, image):
image.calc_energy_and_forces()
return image
[docs]
def concurrent_force_calcs(self, images_to_calculate, image_indices):
client = self.get_dask_client()
self.log(f"Doing concurrent force calculations using {client}")
calculated_images = distributed_calculations(
client,
images_to_calculate,
self.par_image_calc,
logger=self.logger,
)
# Set calculated images
for ind, image in zip(image_indices, calculated_images):
self.images[ind] = image
client.close()
[docs]
def calculate_forces(self):
# Determine the number of images for which we have to do calculations.
# There may also be calculations for fixed images, as they need an
# energy value. But every fixed image only needs a calculation once.
images_to_calculate = self.moving_images
image_indices = self.moving_indices
if self.fix_first and (self.images[0]._energy is None):
images_to_calculate = [self.images[0]] + images_to_calculate
image_indices = [0] + list(image_indices)
if self.fix_last and (self.images[-1]._energy is None):
images_to_calculate = images_to_calculate + [self.images[-1]]
image_indices = list(image_indices) + [-1]
assert len(images_to_calculate) <= len(self.images)
# Parallel calculation with dask
if self.use_dask:
self.concurrent_force_calcs(images_to_calculate, image_indices)
# Serial calculation
else:
for image in images_to_calculate:
image.calc_energy_and_forces()
# Poor mans progress bar ;)
if self.progress:
print(".", end="")
sys.stdout.flush()
if self.progress:
print("\r", end="")
self.set_zero_forces_for_fixed_images()
self.counter += 1
# EnergyMin calculator carries out multiple calculations at the same
# geometry, e.g., in different spin states or different electronic states
# and picks the one with the lowest energy.
if self.energy_min_mix:
# Will be None for calculators that already mix
all_energies = np.array([image.all_energies for image in self.images])
energy_diffs = np.diff(all_energies, axis=1).flatten()
calc_inds = all_energies.argmin(axis=1)
mix_at = []
for i, calc_ind in enumerate(calc_inds[:-1]):
next_ind = calc_inds[i + 1]
if (
(calc_ind != next_ind)
and (i not in self.org_forces_indices)
and (i + 1 not in self.org_forces_indices)
):
min_diff_offset = energy_diffs[[i, i + 1]].argmin()
mix_at.append(i + min_diff_offset)
for ind in mix_at:
self.images[ind].calculator.mix = True
# Recalculate correct energy and forces
print(
f"Switch after calc_ind={calc_ind} at index {ind}. Recalculating."
)
self.images[ind].calc_energy_and_forces()
self.org_forces_indices.append(ind)
calc_ind = calc_inds[ind]
energies = [image.energy for image in self.images]
forces = np.array([image.forces for image in self.images])
self.all_energies.append(energies)
self.all_true_forces.append(forces.copy())
self.all_cart_coords.append(self.cart_coords.copy())
return {
"energies": energies,
"forces": forces,
}
@property
def forces(self):
self.set_zero_forces_for_fixed_images()
forces = [image.forces for image in self.images]
self._forces = np.concatenate(forces)
self.counter += 1
return self._forces
@forces.setter
def forces(self, forces):
self.set_vector("forces", forces)
@property
def perpendicular_forces(self):
indices = range(len(self.images))
perp_forces = [self.get_perpendicular_forces(i) for i in indices]
return np.array(perp_forces).flatten()
[docs]
def get_perpendicular_forces(self, i):
"""[1] Eq. 12"""
# Our goal in optimizing a ChainOfStates is minimizing the
# perpendicular force. Always return zero perpendicular
# forces for fixed images, so that they don't interfere
# with the convergence check.
if i not in self.moving_indices:
return self.zero_vec
forces = self.images[i].forces
tangent = self.get_tangent(i)
perp_forces = forces - forces.dot(tangent) * tangent
return perp_forces
@property
def gradient(self):
return -self.forces
@gradient.setter
def gradient(self, gradient):
self.forces = -gradient
@property
def masses_rep(self):
return np.array([image.masses_rep for image in self.images]).flatten()
@property
def results(self):
tmp_results = list()
for image in self.images:
res = image.results
res["coords"] = image.coords
res["cart_coords"] = image.cart_coords
tmp_results.append(res)
return tmp_results
[docs]
def set_zero_forces_for_fixed_images(self):
"""This is always done in cartesian coordinates, independent
of the actual coord_type of the images as setting forces only
work with cartesian forces."""
zero_forces = np.zeros_like(self.images[0].cart_coords)
if self.fix_first:
self.images[0].cart_forces = zero_forces
self.log("Zeroed forces on fixed first image.")
if self.fix_last:
self.images[-1].cart_forces = zero_forces
self.log("Zeroed forces on fixed last image.")
[docs]
def get_tangent(
self, i, kind="upwinding", lanczos_guess=None, disable_lanczos=False
):
"""[1] Equations (8) - (11)"""
# Converge to lowest curvature mode at the climbing image.
# In the current implementation the given kind may be overwritten when
# Lanczos iterations are enabled and there are climbing images. By
# setting 'disable_lanczos=True' the provided kind is never overwritten.
if (
not disable_lanczos
and self.started_climbing_lanczos
# and (i in self.get_climbing_indices())
and (i == self.get_hei_index())
):
kind = "lanczos"
tangent_kinds = ("upwinding", "simple", "bisect", "lanczos")
assert kind in tangent_kinds, "Invalid kind! Valid kinds are: {tangent_kinds}"
prev_index = max(i - 1, 0)
next_index = min(i + 1, len(self.images) - 1)
prev_image = self.images[prev_index]
ith_image = self.images[i]
next_image = self.images[next_index]
# If (i == 0) or (i == len(self.images)-1) then one
# of this tangents is zero.
tangent_plus = next_image - ith_image
tangent_minus = ith_image - prev_image
# Handle first and last image
if i == 0:
return tangent_plus / np.linalg.norm(tangent_plus)
elif i == (len(self.images) - 1):
return tangent_minus / np.linalg.norm(tangent_minus)
# [1], Eq. (1)
if kind == "simple":
tangent = next_image - prev_image
# [1], Eq. (2)
elif kind == "bisect":
first_term = tangent_minus / np.linalg.norm(tangent_minus)
sec_term = tangent_plus / np.linalg.norm(tangent_plus)
tangent = first_term + sec_term
# Upwinding tangent from [1] Eq. (8) and so on
elif kind == "upwinding":
prev_energy = prev_image.energy
ith_energy = ith_image.energy
next_energy = next_image.energy
next_energy_diff = abs(next_energy - ith_energy)
prev_energy_diff = abs(prev_energy - ith_energy)
delta_energy_max = max(next_energy_diff, prev_energy_diff)
delta_energy_min = min(next_energy_diff, prev_energy_diff)
# Uphill
if next_energy > ith_energy > prev_energy:
tangent = tangent_plus
# Downhill
elif next_energy < ith_energy < prev_energy:
tangent = tangent_minus
# Minimum or Maximum
else:
if next_energy >= prev_energy:
tangent = (
tangent_plus * delta_energy_max
+ tangent_minus * delta_energy_min
)
# next_energy < prev_energy
else:
tangent = (
tangent_plus * delta_energy_min
+ tangent_minus * delta_energy_max
)
elif kind == "lanczos":
# Calculating a lanczos tangent is costly, so we store the
# tangent in a dictionary. The current coordinates are
# stringified with precision=4 and then hashed. The tangent
# is stored/looked up with this hash.
cur_hash = hash_arr(ith_image.coords, precision=4)
try:
tangent = self.lanczos_tangents[cur_hash]
self.log(
"Returning previously calculated Lanczos tangent with "
f"hash={cur_hash}"
)
except KeyError:
# Try to use previous Lanczos tangent
guess = lanczos_guess
if (guess is None) and (self.prev_lanczos_hash is not None):
guess = self.lanczos_tangents[self.prev_lanczos_hash]
self.log(
f"Using tangent with hash={self.prev_lanczos_hash} "
"as initial guess for Lanczos algorithm."
)
w_min, tangent, _ = geom_lanczos(
ith_image, guess=guess, logger=self.logger
)
self.lanczos_tangents[cur_hash] = tangent
# Update hash
self.prev_lanczos_hash = cur_hash
tangent /= np.linalg.norm(tangent)
return tangent
[docs]
def get_tangents(self):
return np.array([self.get_tangent(i) for i in range(len(self.images))])
[docs]
def as_xyz(self, comments=None):
return "\n".join([image.as_xyz() for image in self.images])
[docs]
def get_dask_client(self):
return Client(self.scheduler)
[docs]
def get_hei_index(self, energies=None):
"""Return index of highest energy image."""
if energies is None:
energies = [image.energy for image in self.images]
return np.argmax(energies)
[docs]
def get_full_cycles(self) -> np.ndarray:
"""Return array of integers that indexes self.all_true_forces/self.all_cart_coords.
When the ChainOfStates is not yet fully grown this list will be empty.
The items of this list can be used to index self.all_true_forces and
related lists, to extract image coordinate & forces data for all
cycles when the COS was already fully grown.
This data can then be used to, e.g., construct a (TS)-Hessian for an
selected image."""
try:
fully_grown = self.fully_grown
except AttributeError:
fully_grown = True
if not fully_grown:
return np.empty(0)
full_size = self.cart_coords_length * self.nimages
all_sizes = np.array([true_forces.size for true_forces in self.all_true_forces])
mask = all_sizes == full_size
indices = np.arange(len(all_sizes))
full_cycles = indices[mask]
return full_cycles
[docs]
def prepare_opt_cycle(self, last_coords, last_energies, last_forces):
"""Implements additional logic in preparation of the next
optimization cycle.
Should be called by the optimizer at the beginning of a new
optimization cycle. Can be used to implement additional logic
as needed for AdaptiveNEB etc.
"""
self.coords_list.append(last_coords)
self.forces_list.append(last_forces)
messages = list()
# Check if we can start climbing.
already_climbing = self.started_climbing
if self.climb and not already_climbing:
self.started_climbing = self.compare_image_rms_forces(self.climb_rms)
if self.started_climbing:
msg = "Will use climbing image(s) in next cycle."
self.log(msg)
messages.append(msg)
# Fix climbing index/indices if not already set, but requested.
if already_climbing and self.climb_fixed and (self.fixed_climb_indices is None):
self.fixed_climb_indices = self.get_climbing_indices()
# Check if we can start to converge the HEI tangent using the Lanczos algorithm.
already_climbing_lanczos = self.started_climbing_lanczos
if (
self.climb_lanczos
and self.started_climbing
and not already_climbing_lanczos
):
self.started_climbing_lanczos = self.compare_image_rms_forces(
self.climb_lanczos_rms
)
if self.started_climbing_lanczos:
msg = "Will use Lanczos algorithm for HEI tangent in next cycle."
self.log(msg)
messages.append(msg)
# Starting TS optimization does not influence the return value. Expand on this?!
if self.ts_opt and not self.started_ts_opt:
self.started_ts_opt = self.compare_image_rms_forces(self.ts_opt_rms)
if self.started_ts_opt:
msg = "Will use TS image(s) in next cycle. Disabled any climbing/Lanczos image."
self.climb_lanczos = False
self.climb = False
self.log(msg)
messages.append(msg)
reset_flag = not already_climbing and self.started_climbing
return reset_flag, messages
[docs]
def compare_image_rms_forces(self, ref_rms):
"""Compare rms(forces) value of an image against a reference value.
Used to decide if we designate an image as climbing image or a
TS node.
Only initiate climbing on a sufficiently converged MEP.
This can be determined from a supplied threshold for the
RMS force (rms_force) or from a multiple of the
RMS force convergence threshold (rms_multiple, default).
"""
rms_forces = rms(self.perpendicular_forces)
# Only start climbing when the COS is fully grown. This
# attribute may not be defined in all subclasses, so it
# defaults to True here.
try:
fully_grown = self.fully_grown
except AttributeError:
fully_grown = True
start_climbing = (rms_forces <= ref_rms) and fully_grown
return start_climbing
[docs]
def get_climbing_indices(self):
# Index of the highest energy image (HEI)
hei_index = self.get_hei_index()
move_inds = self.moving_indices
# Don't climb if not yet enabled or requested.
if not (self.climb and self.started_climbing):
climb_indices = tuple()
elif self.fixed_climb_indices is not None:
climb_indices = self.fixed_climb_indices
_ = "index" if len(climb_indices) == 1 else "indices"
self.log(f"Returning fixed climbing {_}.")
# Do one image climbing (C1) neb if explicitly requested or
# the HEI is the first or last item in moving_indices.
elif self.climb == "one" or ((hei_index == 1) or (hei_index == move_inds[-1])):
climb_indices = (hei_index,)
# We can do two climbing (C2) neb if the highest energy image (HEI)
# is in moving_indices but not the first or last item in this list.
# elif self.climb != "one" and hei_index in move_inds[1:-1]:
elif hei_index in move_inds[1:-1]:
climb_indices = (hei_index - 1, hei_index + 1)
# Don't climb when the HEI is the first or last image of the whole NEB.
else:
climb_indices = tuple()
self.log("Want to climb but can't. HEI is first or last image!")
# self.log(f"Climbing indices: {climb_indices}")
return climb_indices
[docs]
def get_climbing_forces(self, ind):
climbing_image = self.images[ind]
ci_forces = climbing_image.forces
tangent = self.get_tangent(ind)
climbing_forces = ci_forces - 2 * ci_forces.dot(tangent) * tangent
return climbing_forces, climbing_image.energy
[docs]
def set_climbing_forces(self, forces):
# Avoids calling the other methods with their logging output etc.
if not (self.climb and self.started_climbing):
return forces
for i in self.get_climbing_indices():
climb_forces, climb_en = self.get_climbing_forces(i)
forces[i] = climb_forces
norm = np.linalg.norm(climb_forces)
self.log(
f"Climbing with image {i}, E = {climb_en:.6f} au, "
f"norm(forces)={norm:.6f}"
)
return forces
[docs]
def get_ts_image_indices(self):
if not self.started_ts_opt:
ts_images = tuple()
else:
if self.fixed_ts_indices is None:
hei_index = self.get_hei_index()
self.fixed_ts_indices = (hei_index,)
self.log(f"Fixed TS image to index {hei_index}.")
ts_images = self.fixed_ts_indices
return ts_images
[docs]
def get_splined_hei(self):
self.log("Splining HEI")
# Interpolate energies
cart_coords = np.array([image.cart_coords for image in self.images])
if not self.is_analytical_2d:
cart_coords = align_coords(cart_coords)
coord_diffs = get_coords_diffs(cart_coords)
self.log(f"\tCoordinate differences: {coord_diffs}")
energies = np.array(self.energy)
energies_spline = interp1d(coord_diffs, energies, kind="cubic")
x_fine = np.linspace(0, 1, 500)
energies_fine = energies_spline(x_fine)
# Determine index that yields the highest energy
hei_ind = energies_fine.argmax()
hei_x = x_fine[hei_ind]
self.log(f"Found splined HEI at x={hei_x:.4f}")
hei_frac_index = hei_x * (len(self.images) - 1)
hei_energy = energies_fine[hei_ind]
reshaped = cart_coords.reshape(-1, self.cart_coords_length)
# To use splprep we have to transpose the coords.
transp_coords = reshaped.transpose()
tcks, us = zip(
*[
splprep(transp_coords[i : i + 9], s=0, k=3, u=coord_diffs)
for i in range(0, len(transp_coords), 9)
]
)
# Reparametrize mesh
hei_coords = np.vstack(
[
# WTF, Black? This looks horrible.
splev(
[
hei_x,
],
tck,
)
for tck in tcks
]
)
hei_coords = hei_coords.flatten()
# Actually it looks like that splined tangents are really bad approximations
# to the actual imaginary mode. The Cartesian upwinding tangent is usually
# much much better. In 'run_tsopt_from_cos' we actually mix two "normal" tangents
# to obtain the HEI tangent.
hei_tangent = np.vstack(
[
# WTF, Black? This looks horrible.
splev(
[
hei_x,
],
tck,
der=1,
)
for tck in tcks
]
).T
hei_tangent = hei_tangent.flatten()
hei_tangent /= np.linalg.norm(hei_tangent)
return hei_coords, hei_energy, hei_tangent, hei_frac_index
[docs]
def get_image_calc_counter_sum(self):
return sum([image.calculator.calc_counter for image in self.images])
[docs]
def describe(self):
imgs = self.images
img = imgs[0]
return f"ChainOfStates, {len(imgs)} images, ({img.sum_formula}, {len(img.atoms)} atoms) per image"
def __str__(self):
name = self.__class__.__name__
nimages = len(self.images)
return f"{name}({nimages} images)"