from collections import namedtuple
import io
import json
import os
import re
import shutil
import textwrap
import numpy as np
from pysisyphus.calculators.Calculator import Calculator
from pysisyphus.calculators.parser import parse_turbo_gradient
from pysisyphus.calculators.ORCA import save_orca_pc_file
from pysisyphus.constants import BOHR2ANG, BOHRPERFS2AU
from pysisyphus.helpers import geom_loader
from pysisyphus.helpers_pure import file_or_str
from pysisyphus.xyzloader import make_xyz_str
OptResult = namedtuple("OptResult", "opt_geom opt_log")
[docs]
class XTB(Calculator):
conf_key = "xtb"
_set_plans = ("charges", "json", "xtbrestart", "molden")
[docs]
def __init__(
self,
gbsa="",
alpb="",
gfn=2,
acc=1.0,
etemp=None,
retry_etemp=None,
restart=False,
topo=None,
topo_update=None,
quiet=False,
wavefunction_dump=False,
**kwargs,
):
"""XTB calculator.
Wrapper for running energy, gradient and Hessian calculations by
XTB.
Parameters
----------
gbsa : str, optional
Solvent for GBSA calculation, by default no solvent model is
used.
alpb : str, optional
Solvent for ALPB calculation, by default no solvent model is
used.
gfn : int or str, must be (0, 1, 2, or "ff")
Hamiltonian for the XTB calculation (GFN0, GFN1, GFN2, or GFNFF).
acc : float, optional
Accuracy control of the calculation, the lower the tighter several
numerical thresholds are chosen.
topo: str, optional
Path the a GFNFF-topolgy file. As setting up the topology may take
some time for sizable systems, it may be desired to reuse the file.
topo_update : int
Integer controlling the update interval of the GFNFF topology update.
If supplied, the topolgy will be recreated every N-th calculation.
mem : int
Mememory per core in MB.
quiet : bool, optional
Suppress creation of log files.
wavefunction_dump : bool
Whether to dump a molden file.
"""
super().__init__(**kwargs)
self.gbsa = gbsa
self.alpb = alpb
self.gfn = gfn
self.acc = acc
self.etemp = etemp
self.retry_etemp = retry_etemp
self.restart = restart
if self.etemp is not None:
assert (
self.retry_etemp is None
), "Using 'etemp' and 'retry_etemp' simultaneously is not possible!"
self.topo = topo
self.topo_update = topo_update
self.quiet = quiet
self.wavefunction_dump = wavefunction_dump
self.topo_used = 0
self.xtbrestart = None
valid_gfns = (0, 1, 2, "ff")
assert (
self.gfn in valid_gfns
), f"Invalid gfn argument. Allowed arguments are: {', '.join(valid_gfns)}!"
self.uhf = self.mult - 1
self.inp_fn = "xtb.xyz"
self.out_fn = "xtb.out"
self.to_keep = (
"out:xtb.out",
"gradient",
"xtbopt.xyz",
"g98.out",
"xtb.trj",
"charges:charges",
"xcontrol",
"molden:molden.input",
)
if self.restart:
self.to_keep += ("xtbrestart",)
if self.quiet:
self.to_keep = ()
self.parser_funcs = {
"grad": self.parse_gradient,
"hess": self.parse_hessian,
"opt": self.parse_opt,
"md": self.parse_md,
"topo": self.parse_topo,
"noparse": lambda path: None,
"calc": self.parse_energy,
}
self.base_cmd = self.get_cmd()
[docs]
def reattach(self, last_calc_cycle):
pass
[docs]
def prepare_coords(self, atoms, coords):
coords = coords * BOHR2ANG
return make_xyz_str(atoms, coords.reshape((-1, 3)))
[docs]
def prepare_add_args(self, xcontrol=None):
add_args = (
f"--input xcontrol --chrg {self.charge} --uhf {self.uhf} "
f"--acc {self.acc}".split()
)
if self.etemp:
etemp = f"--etemp {self.etemp}".split()
add_args = add_args + etemp
# Use solvent model if specified
if self.gbsa:
gbsa = f"--gbsa {self.gbsa}".split()
add_args = add_args + gbsa
elif self.alpb:
alpb = f"--alpb {self.alpb}".split()
add_args = add_args + alpb
if self.wavefunction_dump:
add_args = add_args + ["--molden"]
# Select parametrization
gfn = ["--gfnff"] if self.gfn == "ff" else f"--gfn {self.gfn}".split()
add_args = add_args + gfn
return add_args
[docs]
def get_pal_env(self):
env_copy = os.environ.copy()
env_copy["OMP_NUM_THREADS"] = str(self.pal)
env_copy["MKL_NUM_THREADS"] = str(self.pal)
# Per thread
env_copy["OMP_STACKSIZE"] = f"{self.mem}M"
return env_copy
[docs]
def get_energy(self, atoms, coords, **prepare_kwargs):
results = self.get_forces(atoms, coords, **prepare_kwargs)
del results["forces"]
return results
[docs]
def get_forces(self, atoms, coords, **prepare_kwargs):
self.prepare_input(atoms, coords, "forces", **prepare_kwargs)
inp = self.prepare_coords(atoms, coords)
add_args = self.prepare_add_args() + ["--grad"]
self.log(f"Executing {self.base_cmd} {add_args}")
kwargs = {
"calc": "grad",
"add_args": add_args,
"env": self.get_pal_env(),
}
results = self.run(inp, **kwargs)
return results
[docs]
def get_hessian(self, atoms, coords, **prepare_kwargs):
self.prepare_input(atoms, coords, "hessian", **prepare_kwargs)
inp = self.prepare_coords(atoms, coords)
add_args = self.prepare_add_args() + ["--hess"]
self.log(f"Executing {self.base_cmd} {add_args}")
kwargs = {
"calc": "hess",
"add_args": add_args,
"env": self.get_pal_env(),
}
results = self.run(inp, **kwargs)
return results
[docs]
def get_stored_wavefunction(self, **kwargs):
return self.load_wavefunction_from_file(self.molden, xtb_nuc_charges=True)
[docs]
def run_calculation(self, atoms, coords, **prepare_kwargs):
self.prepare_input(atoms, coords, "calculation", **prepare_kwargs)
inp = self.prepare_coords(atoms, coords)
kwargs = {
"calc": "calc",
"env": self.get_pal_env(),
}
energy = self.run(inp, **kwargs)
results = {"energy": energy}
return results
[docs]
def run_topo(self, atoms, coords):
inp = self.prepare_coords(atoms, coords)
kwargs = {
"calc": "topo",
"cmd": [self.base_cmd, "topo"],
"env": self.get_pal_env(),
}
results = self.run(inp, **kwargs)
return results
[docs]
def parse_topo(self, path):
fn = "gfnff_topo"
topo = path / fn
target = self.make_fn(fn)
shutil.copy(topo, target)
return {
"topo": target,
}
[docs]
def get_mdrestart_str(self, coords, velocities):
"""coords and velocities have to given in au!"""
vals = np.concatenate((coords, velocities), axis=1)
with io.StringIO() as io_stream:
np.savetxt(io_stream, vals, fmt="% .14e")
mdrestart = io_stream.getvalue()
# What does the -1.0 mean?
mdrestart = "-1.0\n" + mdrestart.replace("e", "D")
mdrestart = textwrap.indent(mdrestart, " ")
return mdrestart
[docs]
def write_mdrestart(self, path, mdrestart_str):
with open(path / "mdrestart", "wb") as handle:
handle.write(mdrestart_str.encode("ascii"))
[docs]
def run_md(self, atoms, coords, t, dt, velocities=None, dump=1):
"""Expecting t and dt in fs, even though xtb wants t in ps!"""
restart = "false"
path = self.prepare_path(use_in_run=True)
if velocities is not None:
coords3d = coords.reshape(-1, 3)
velocities3d = velocities.reshape(-1, 3) * BOHRPERFS2AU
assert (
coords3d.shape == velocities3d.shape
), "Shape of coordinates and velocities doesn't match!"
mdrestart_str = self.get_mdrestart_str(coords3d, velocities3d)
self.write_mdrestart(path, mdrestart_str)
restart = "true"
md_str = textwrap.dedent(
"""
$md
hmass=1
dump={dump} # fs
nvt=false
restart={restart}
time={time} # ps
shake=0
step={step} # fs
velo=false
$end"""
)
t_ps = t / 1000
md_str_fmt = md_str.format(restart=restart, time=t_ps, step=dt, dump=dump)
with open(path / "xcontrol", "w") as handle:
handle.write(md_str_fmt)
inp = self.prepare_coords(atoms, coords)
add_args = self.prepare_add_args() + ["--input", "xcontrol", "--md"]
self.log(f"Executing {self.base_cmd} {add_args}")
kwargs = {
"calc": "md",
"add_args": add_args,
"env": self.get_pal_env(),
"keep": True,
}
geoms = self.run(inp, **kwargs)
return geoms
[docs]
def parse_md(self, path):
assert (path / "xtbmdok").exists(), "File xtbmdok does not exist!"
geoms = geom_loader(path / "xtb.trj")
return geoms
[docs]
def run_opt(self, atoms, coords, keep=True, keep_log=False):
inp = self.prepare_coords(atoms, coords)
add_args = self.prepare_add_args() + ["--opt", "tight"]
self.log(f"Executing {self.base_cmd} {add_args}")
kwargs = {
"calc": "opt",
"add_args": add_args,
"env": self.get_pal_env(),
"keep": keep,
"parser_kwargs": {"keep_log": keep_log},
}
opt_result = self.run(inp, **kwargs)
return opt_result
[docs]
def parse_opt(self, path, keep_log=False):
xtbopt = path / "xtbopt.xyz"
if not xtbopt.exists():
self.log(f"{self.calc_number:03d} failed")
return None
opt_geom = geom_loader(xtbopt)
opt_geom.energy = self.parse_energy(path)
opt_log = None
if keep_log:
opt_log = geom_loader(path / "xtbopt.log")
opt_result = OptResult(opt_geom=opt_geom, opt_log=opt_log)
return opt_result
[docs]
def parse_energy(self, path):
with open(path / self.out_fn) as handle:
text = handle.read()
energy_re = r"TOTAL ENERGY\s*([-\d\.]+) Eh"
energy = float(re.search(energy_re, text)[1])
return energy
[docs]
def parse_gradient(self, path):
return parse_turbo_gradient(path)
[docs]
def parse_hessian(self, path):
with open(path / "hessian") as handle:
text = handle.read()
hessian = np.array(text.split()[1:], dtype=float)
coord_num = int(hessian.size**0.5)
hessian = hessian.reshape(coord_num, coord_num)
energy = self.parse_energy(path)
results = {
"energy": energy,
"hessian": hessian,
}
return results
[docs]
def parse_charges(self, fn=None):
if fn is None:
fn = self.charges
charges = np.loadtxt(fn, dtype=float)
return charges
[docs]
def parse_charges_from_json(self, fn=None):
if fn is None:
fn = self.json
with open(fn, "r") as handle:
dump = json.load(handle)
charges = dump["partial charges"]
return charges
@staticmethod
@file_or_str(".out")
def check_termination(text):
term_re = re.compile("finished run on")
mobj = term_re.search(text)
return bool(mobj)
[docs]
def get_retry_args(self):
if self.retry_etemp is None:
return []
self.log(f"Retrying calculation with increased etemp={self.retry_etemp}")
return f"--etemp {self.retry_etemp}".split()
def __str__(self):
return "XTB calculator"