from dataclasses import dataclass
from enum import Enum
import logging
import os
from pathlib import Path
import platform
import shutil
import subprocess
import tempfile
from typing import Callable, Optional
from natsort import natsorted
from pysisyphus import logger
from pysisyphus import helpers_pure
from pysisyphus.config import get_cmd, OUT_DIR_DEFAULT
from pysisyphus.constants import BOHR2ANG
from pysisyphus.exceptions import (
CalculationFailedException,
RunAfterCalculationFailedException,
)
from pysisyphus.finite_diffs import finite_difference_hessian_mp
from pysisyphus.helpers import geom_loader
from pysisyphus.wavefunction import Wavefunction
KeepKind = Enum("KeepKind", ["ALL", "LATEST", "NONE"])
HessKind = Enum("HessKind", ["ORG", "NUMERICAL"])
[docs]
@dataclass
class SetPlan:
key: str
name: Optional[str] = None
condition: Callable = lambda obj: True
# success: Optional[Callable] = None
fail: Optional[Callable] = None
def __post_init__(self):
if self.name is None:
self.name = self.key
[docs]
class Calculator:
conf_key = None
_set_plans = []
[docs]
def __init__(
self,
calc_number=0,
charge=0,
mult=1,
base_name="calculator",
pal=1,
mem=1000,
keep_kind="all",
check_mem=True,
retry_calc=0,
last_calc_cycle=None,
clean_after=True,
out_dir=OUT_DIR_DEFAULT,
force_num_hess=False,
num_hess_kwargs=None,
):
"""Base-class of all calculators.
Meant to be extended.
Parameters
----------
calc_number : int, default=0
Identifier of the Calculator. Used in distinguishing it from
other Calculators, e.g. in ChainOfStates calculations. Also
used in the creation of filenames.
charge : int, default=0
Molecular charge.
mult : int, default=1
Molecular multiplicity (1 = singlet, 2 = doublet, ...)
base_name : str, default=calculator
Generated filenames will start with this string.
pal : int, default=1
Positive integer that gives the number of physical cores to
use on 1 node.
mem : int, default=1000
Mememory per core in MB. The total amount of memory is given as
mem*pal.
check_mem : bool, default=True
Whether to adjust the requested memory if too much is requested.
retry_calc : int, default=0
Number of additional retries when calculation failed.
last_calc_cycle : int
Internal variable used in restarts.
clean_after : bool
Delete temporary directory were calculations were executed
after a calculation.
out_dir : str
Path that is prepended to generated filenames.
force_hess_kwargs : bool, default False
Force numerical Hessians.
num_hess_kwargs : dict
Keyword arguments for finite difference Hessian calculation.
"""
self.logger = logging.getLogger("calculator")
self.calc_number = calc_number
self.charge = int(charge)
self.mult = int(mult)
self.base_name = base_name
self.pal = int(pal)
assert self.pal > 0, "pal must be a non-negative integer!"
if check_mem:
mem = helpers_pure.check_mem(int(mem), pal, logger=self.logger)
self.mem = mem
self.keep_kind = KeepKind[keep_kind.upper()]
# Disasble retries if check_termination method is not implemented
self.retry_calc = int(retry_calc) if hasattr(self, "check_termination") else 0
assert self.retry_calc >= 0
try:
self.out_dir = Path(out_dir).resolve()
except TypeError:
self.out_dir = Path(OUT_DIR_DEFAULT).resolve()
self.out_dir.mkdir(parents=True, exist_ok=True)
if num_hess_kwargs is None:
num_hess_kwargs = dict()
self.num_hess_kwargs = num_hess_kwargs
# Extensions of the files to keep after running a calculation.
# Usually overridden in derived classes.
self.to_keep = ()
# How many calculations were already run
self.calc_counter = 0
# Handle restarts
if last_calc_cycle:
self.calc_counter = int(last_calc_cycle) + 1
self.reattach(int(last_calc_cycle))
self.log(f"Set {self.calc_counter} for this calculation")
self.clean_after = clean_after
self.inp_fn = "calc.inp"
self.out_fn = "calc.out"
# When this is set the run() method will use this path
# instead of creating a new one.
# Currently this is only used with the Turbomole calculator.
self.path_already_prepared = None
self.last_run_path = None
self.backup_dir = None
self.kept_history = dict()
self.build_set_plans()
# Backup original get_hessian method
self._org_get_hessian = self.get_hessian
self.hessian_kind = HessKind["ORG"]
if force_num_hess:
self.force_num_hessian()
[docs]
def get_cmd(self, key="cmd"):
assert self.conf_key, "'conf_key'-attribute is missing for this calculator!"
try:
return get_cmd(section=self.conf_key, key=key, use_defaults=True)
except KeyError:
logger.warning(
f"Failed to load key '{key}' from section '{self.conf_key}'!"
)
[docs]
@classmethod
def geom_from_fn(cls, fn, **kwargs):
geom = geom_loader(fn)
calc = cls(**kwargs)
geom.set_calculator(calc)
return geom
@property
def name(self):
return f"{self.base_name}_{self.calc_number:03d}"
[docs]
def log(self, message=""):
"""Write a log message.
Wraps the logger variable.
Parameters
----------
message : str
Message to be logged.
"""
self.logger.debug(f"{self.name}, cycle {self.calc_counter:03d}: {message}")
[docs]
def get_energy(self, atoms, coords, **prepare_kwargs):
"""Meant to be extended."""
raise Exception("Not implemented!")
[docs]
def get_forces(self, atoms, coords, **prepare_kwargs):
"""Meant to be extended."""
raise Exception("Not implemented!")
[docs]
def get_hessian(self, atoms, coords, **prepare_kwargs):
"""Get Hessian matrix. Fall back to numerical Hessian, if not overriden.
Preferrably, this method should provide an analytical Hessian."""
self.log(f"Calculator {self} has no native Hessian method.")
return self.get_num_hessian(atoms, coords, **prepare_kwargs)
[docs]
def get_num_hessian(self, atoms, coords, **prepare_kwargs):
self.log("Calculating numerical Hessian.")
results = self.get_energy(atoms, coords, **prepare_kwargs)
def grad_func(coords):
results = self.get_forces(atoms, coords, **prepare_kwargs)
gradient = -results["forces"]
return gradient
def callback(i):
self.log(f"Displacement {i}")
_num_hess_kwargs = {
"step_size": 0.005,
# Central difference with two displacements by default
"acc": 2,
}
_num_hess_kwargs.update(self.num_hess_kwargs)
fd_hessian = finite_difference_hessian_mp(
atoms,
coords,
self,
prepare_kwargs=prepare_kwargs,
**_num_hess_kwargs,
)
results["hessian"] = fd_hessian
return results
[docs]
def force_num_hessian(self):
"""Always calculate numerical Hessians."""
self.log("Doing numerical Hessians from now on.")
self.get_hessian = self.get_num_hessian
self.hessian_kind = HessKind["NUMERICAL"]
[docs]
def restore_org_hessian(self):
"""Restore original 'get_hessian' method, which may also fallback to numerical
Hessians, if not implemented."""
self.log("Restored original 'get_hessian' method.")
self.get_hessian = self._org_get_hessian
self.hessian_kind = HessKind["ORG"]
[docs]
def get_wavefunction(self, atoms, coords, **prepare_kwargs):
"""Meant to be extended."""
raise Exception("Not implemented!")
[docs]
def get_relaxed_density(self, atoms, coords, root, **prepare_kwargs):
"""Meant to be extended."""
raise Exception("Not implemented!")
[docs]
def load_wavefunction_from_file(self, fn, **kwargs):
return Wavefunction.from_file(fn, **kwargs)
[docs]
def get_stored_wavefunction(self, **kwargs):
raise Exception("Not implemented!")
[docs]
def make_fn(self, name, counter=None, return_str=False):
"""Make a full filename.
Return a full filename including the calculator name and the
current counter given a suffix.
Parameters
----------
name: str
Suffix of the filename.
counter : int, optional
If not given use the current calc_counter.
return_str : int, optional
Return a string instead of a Path when True.
Returns
-------
fn : str
Filename.
"""
if counter is None:
counter = self.calc_counter
fn = self.out_dir / f"{self.name}.{counter:03d}.{name}"
if return_str:
fn = str(fn)
return fn
[docs]
def prepare_path(self, use_in_run=False):
"""Get a temporary directory handle.
Create a temporary directory that can later be used in a calculation.
Parameters
----------
use_in_run : bool, option
Sets the internal variable ``self.path_already_prepared`` that
is later read by ``self.run()``. No new temporary directory will
be created in ``self.run()``.
Returns
-------
path: Path
Prepared directory.
"""
prefix = f"{self.name}_{self.calc_counter:03d}_"
path = Path(tempfile.mkdtemp(prefix=prefix))
if use_in_run:
self.path_already_prepared = path
return path
[docs]
def prepare(self, inp):
"""Prepare a temporary directory and write input.
Similar to prepare_path, but the input is also written into
the prepared directory.
Paramters
---------
inp : str
Input to be written into the file ``self.inp_fn`` in
the prepared directory.
Returns
-------
path: Path
Prepared directory.
"""
if not self.path_already_prepared:
path = self.prepare_path()
else:
path = self.path_already_prepared
# Calculators like Turbomole got no input.
if inp:
inp_path = path / self.inp_fn
with open(inp_path, "w") as handle:
handle.write(inp)
return path
[docs]
def print_out_fn(self, path):
"""Print calculation output.
Prints the output of a calculator after a calculation.
Parameters
----------
path : Path
Temporary directory of the calculation.
"""
with open(path / self.out_fn) as handle:
text = handle.read()
print(text)
[docs]
def prepare_turbo_coords(self, atoms, coords):
"""Get a Turbomole coords string.
Parameters
----------
atoms : iterable
Atom descriptors (element symbols).
coords: np.array, 1d
1D-array holding coordinates in Bohr.
Returns
-------
coords: str
String holding coordinates in Turbomole coords format.
"""
fmt = "{:<20.014f}"
coord_str = "$coord\n"
for atom, coord in zip(atoms, coords.reshape(-1, 3)):
coord_line = (fmt + fmt + fmt).format(*coord) + atom.lower() + "\n"
coord_str += coord_line
coord_str += "$end"
return coord_str
[docs]
def prepare_coords(self, atoms, coords):
"""Get 3d coords in Angstrom.
Reshape internal 1d coords to 3d and convert to Angstrom.
Parameters
----------
atoms : iterable
Atom descriptors (element symbols).
coords: np.array, 1d
1D-array holding coordinates in Bohr.
Returns
-------
coords: np.array, 3d
3D-array holding coordinates in Angstrom.
"""
coords = coords.reshape(-1, 3) * BOHR2ANG
coords = "\n".join(
[
"{} {:10.08f} {:10.08f} {:10.08f}".format(a, *c)
for a, c in zip(atoms, coords)
]
)
return coords
[docs]
def prepare_xyz_string(self, atoms, coords):
"""Returns a xyz string in Angstrom.
Parameters
----------
atoms : iterable
Atom descriptors (element symbols).
coords: np.array, 1d
1D-array holding coordinates in Bohr.
Returns
-------
xyz_str: string
Coordinates in .xyz format.
"""
return f"{len(atoms)}\n\n{self.prepare_coords(atoms, coords)}"
[docs]
def run(
self,
inp,
calc,
add_args=None,
env=None,
shell=False,
hold=False,
keep=True,
cmd=None,
inc_counter=True,
run_after=True,
parser_kwargs=None,
symlink=True,
):
"""Run a calculation.
The bread-and-butter method to actually run an external quantum
chemistry code.
Parameters
----------
inp : str
Input for the external program that is written to the temp-dir.
calc : str, hashable
Key (and more or less type of calculation) to select the right
parsing function from ``self.parser_funcs``.
add_args : iterable, optional
Additional arguments that will be appended to the program call.
env : Environment, optional
A potentially modified environment for the subprocess call.
shell : bool, optional
Use a shell to execute the program call. Need for Turbomole were
we chain program calls like dscf; escf.
hold : bool, optional
Wether to remove the temporary directory after the calculation.
keep : bool, optional
Wether to backup files as specified in ``self.to_keep()``. Usually
you want this.
cmd : str or iterable, optional
Overwrites ``self.base_cmd``.
inc_counter : bool, optional
Wether to increment the counter after a calculation.
Returns
-------
results : dict
Dictionary holding all applicable results of the calculations
like the energy, a forces vector and/or excited state energies
from TDDFT.
"""
self.backup_dir = None
path = self.prepare(inp)
self.log(f"Running in {path} on {platform.node()}")
if cmd is None:
cmd = self.base_cmd
if isinstance(cmd, str):
cmd = [cmd]
act_cmd = cmd[0]
cmd_availble = shutil.which(act_cmd)
if (not cmd_availble) and (not shell):
print(
f"Command '{act_cmd}' could not be found on $PATH! "
"Maybe you forgot to make it available?"
)
return
args = cmd + [self.inp_fn]
if add_args:
args.extend(add_args)
if not env:
env = os.environ.copy()
tmp_out_fn = path / self.out_fn
with open(tmp_out_fn, "w") as handle:
if symlink:
# We can't use resolve here as a previous symlink may already
# exist. Calling resolve would translate this to the original
# out file in some tempdir (that is already deleted ...).
# sym_fn = Path("cur_out").resolve()
sym_fn = self.out_dir / "cur_out"
try:
os.remove(sym_fn)
except FileNotFoundError:
pass
try:
os.symlink(tmp_out_fn, sym_fn)
self.log(f"Created symlink in '{sym_fn}'")
# This may happen if we use a dask scheduler
except FileExistsError:
self.log("Symlink already exists. Skipping generation.")
# Do at least one cycle. When retries are disabled retry_calc == 0
# and range(0+1) will result in one cycle
added_retry_args = False
for retry in range(self.retry_calc + 1):
result = subprocess.Popen(
args,
cwd=path,
stdout=handle,
stderr=subprocess.PIPE,
env=env,
shell=shell,
)
result.communicate()
returncode = result.returncode
nonzero_return = returncode != 0
try:
normal_termination = False
# Calling check_termination may result in an exception and
# normal_termination will stay at False
normal_termination = (
self.retry_calc == 0
) or self.check_termination(tmp_out_fn)
# Method check_termination may not be implemented, so we will always
# do only one try.
except AttributeError:
normal_termination = True
# The out file may not be present
except FileNotFoundError:
self.log(
f"Could not find out-file {str(tmp_out_fn)} for termination status check!"
)
if normal_termination:
break
else:
print("Abnormal termination! Retrying calculation.")
shutil.copy(tmp_out_fn, str(tmp_out_fn) + f".fail_{retry:02d}")
try:
self.clean_tmp(path)
except AttributeError:
self.log("'self.clean_path()' not implemented!")
# Clear tmp_out_fn
handle.seek(0)
handle.truncate()
self.log("Detected abnormal termination! Retrying calculation.")
if not added_retry_args:
try:
args += self.get_retry_args()
added_retry_args = True
except AttributeError:
self.log("'self.get_retry_args()' not implemented!")
# Parse results for desired quantities
try:
# Don't try to parse calculation results when the calculated already
# returned an nonzero error code.
if nonzero_return:
raise CalculationFailedException(
msg=f"Calculation in '{path}' returned with code {returncode}!",
path=path,
)
if run_after:
self.run_after(path)
parser_kwargs = {} if parser_kwargs is None else parser_kwargs
results = self.parser_funcs[calc](path, **parser_kwargs)
if keep:
self.keep(path)
except Exception as err:
print("Crashed input:")
print(inp)
backup_dir = Path(os.getcwd()) / f"crashed_{self.name}"
self.backup_dir = backup_dir
if backup_dir.exists():
shutil.rmtree(backup_dir)
shutil.copytree(path, backup_dir)
print(
f"Copied contents of\n\t'{path}'\nto\n\t'{backup_dir}'.\n"
"Consider checking the log files there.\n"
)
raise RunAfterCalculationFailedException(
"Postprocessing calculation failed!", err
)
finally:
if (not hold) and self.clean_after:
self.clean(path)
if inc_counter:
self.calc_counter += 1
self.path_already_prepared = None
self.last_run_path = path
return results
[docs]
def run_after(self, path):
"""Meant to be extended.
This method is called after a calculation was done, but before
entering ``self.keep()`` and ``self.clean()``. Can be used to call
tools like formchk or ricctools.
"""
[docs]
def popen(self, cmd, cwd=None):
if isinstance(cmd, str):
cmd = cmd.split()
proc = subprocess.Popen(
cmd,
cwd=cwd,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
proc.wait()
return proc
[docs]
def prepare_pattern(self, raw_pat):
"""Prepare globs.
Transforms an entry of ``self.to_keep`` into a glob and a key
suitable for the use in ``self.keep()``.
Parameters
----------
raw_pat : str
Entry of ``self.to_keep``
Returns
-------
pattern : str
Glob that can be used in Path.glob()
multi : bool
Flag if glob may match multiple files.
key : str
A key to be used in the ``kept_fns`` dict.
"""
key_given = None
if ":" in raw_pat:
key_given, raw_pat = raw_pat.split(":")
# Indicates if multiple files are expected
multi = "*" in raw_pat
# Drop '*' as it just indicates if we expect multiple matches
raw_pat = raw_pat.replace("*", "")
# Interpret it as prefix and drop the two underscores
if raw_pat.startswith("__"):
pattern = f"{raw_pat[2:]}*"
pattern_key = f"{raw_pat[2:]}s"
# Use raw_pat as suffix
else:
pattern = f"*{raw_pat}"
pattern_key = f"{raw_pat}"
if key_given:
pattern_key = key_given
pattern_key = pattern_key.lower()
return pattern, multi, pattern_key
[docs]
def build_set_plans(self, _set_plans=None):
if _set_plans is None:
_set_plans = self._set_plans
set_plans = list()
for sp in _set_plans:
if type(sp) == SetPlan:
set_plans.append(sp)
continue
if type(sp) == dict:
set_plans.append(SetPlan(**sp))
continue
if type(sp) == str:
sp = (sp,)
set_plans.append(SetPlan(*sp))
self.set_plans = set_plans
[docs]
def apply_keep_kind(self):
assert self.keep_kind in KeepKind
# Nothing to do when set to ALL
delete_calc_counter = list()
if self.keep_kind == KeepKind["LATEST"]:
delete_calc_counter.extend(
[
self.calc_counter - 1,
]
if self.calc_counter > 0
else []
)
elif self.keep_kind == KeepKind["NONE"]:
delete_calc_counter.extend(list(self.kept_history.keys()))
for cycle in delete_calc_counter:
self.log(f"Deleting kept files from cycle {cycle}.")
for k, f in self.kept_history[cycle].items():
if type(f) is list:
[os.remove(f_) for f_ in f]
else:
os.remove(f)
del self.kept_history[cycle]
[docs]
def keep(self, path):
"""Backup calculation results.
Parameters
----------
path : Path
Temporary directory of the calculation.
Returns
-------
kept_fns : dict
Dictonary holding the filenames that were backed up. The keys
correspond to the type of file.
"""
kept_fns = dict()
for raw_pattern in self.to_keep:
pattern, multi, key = self.prepare_pattern(raw_pattern)
globbed = natsorted(path.glob(pattern))
if not multi:
assert len(globbed) <= 1, (
f"Expected at most one file "
f"matching {pattern} in {path}. Found {len(globbed)} "
f"files instead ({', '.join([g.name for g in globbed])})!"
)
else:
# I wonder if this has to be a list? Probably to support 'multi', when the pattern
# contains a '*' character.
kept_fns[key] = list()
for tmp_fn in globbed:
base = tmp_fn.name
new_fn = self.make_fn(base)
shutil.copy(tmp_fn, new_fn)
if multi:
kept_fns[key].append(new_fn)
else:
kept_fns[key] = new_fn
self.kept_history[self.calc_counter] = kept_fns
self.apply_keep_kind()
try:
kept_fns = self.kept_history[self.calc_counter]
except KeyError:
kept_fns = dict()
self.apply_set_plans(kept_fns)
return kept_fns
[docs]
def apply_set_plans(self, kept_fns, set_plans=None):
if set_plans is None:
set_plans = self.set_plans
for sp in set_plans:
if not sp.condition(self):
continue
try:
setattr(self, sp.name, kept_fns[sp.key])
except KeyError:
if sp.fail is not None:
self.log(sp.fail(sp.key))
[docs]
def clean(self, path):
"""Delete the temporary directory.
Parameters
----------
path : Path
Directory to delete.
"""
shutil.rmtree(path)
self.log(f"Cleaned {path}")
[docs]
def get_restart_info(self):
"""Return a dict containing chkfiles.
Returns
-------
restart_info : dict
Dictionary holding the calculator state. Used for restoring calculaters
in restarted calculations.
"""
try:
# Convert possible Paths to str
chkfiles = {k: str(v) for k, v in self.get_chkfiles().items()}
except AttributeError:
chkfiles = dict()
restart_info = {
"base_name": self.base_name,
"calc_number": self.calc_number,
"calc_counter": self.calc_counter,
"chkfiles": chkfiles,
}
return restart_info
[docs]
def verify_chkfiles(self, chkfiles):
"""Checks if given chkfiles exist and return them as Paths
Parameters
----------
chkfiles : dict
Dictionary holding the chkfiles. The keys correspond to the attribute
names, the values are strs holding the (potentially full) filename (path).
Returns
-------
paths : dict
Dictionary of Paths.
"""
paths = {}
for key, chkfile in chkfiles.items():
chkfile = Path(chkfile)
# If the chkfile exists at the given path we use it as it is.
if not chkfile.exists():
self.log(
f"Given chkfile '{chkfile}' could not be found! Dropping "
"absolute part and trying only its name."
)
# Check if relative path exists. This may happen if the calculation
# has been moved to a different folder.
name = Path(chkfile.name)
if name.exists():
chkfile = name
else:
self.log(f"'{name}' could not be found! Skipping this chkfile.")
continue
paths[key] = chkfile
return paths
[docs]
def set_restart_info(self, restart_info):
"""Sets restart information (chkfiles etc.) on the calculator.
Parameters
-------
restart_info : dict
Dictionary holding the calculator state. Used for restoring calculaters
in restarted calculations.
"""
try:
chkfiles = self.verify_chkfiles(restart_info.pop("chkfiles"))
self.set_chkfiles(chkfiles)
except KeyError:
self.log("No chkfiles preset in restart_info")
except AttributeError:
self.log(
"Found chkfiles on restart_info, but 'set_chkfiles' is not "
"implemented for Calculator."
)
self.log("Setting restart_info")
for key, value in restart_info.items():
setattr(self, key, value)
self.log(f"\t{key}: {value}")
[docs]
def print_capabilities(self):
print(
f" Can retry?: {hasattr(self, 'check_termination')}\n"
f"Can track ES??: {hasattr(self, 'prepare_overlap_data')}\n"
)