import os
import re
from pathlib import Path
from subprocess import PIPE, Popen
import numpy as np
import ase
from ase import Atoms
from ase.calculators.calculator import Parameters
from ase.db import connect
from storq.tools import tail
[docs]
def read_num_ionic_steps(outcar: str = None) -> int:
"""Returns the number of ionic steps from the OUTCAR file.
Parameters
----------
outcar
name of input file [default: OUTCAR]
"""
if not outcar:
outcar = Path("OUTCAR")
nsteps = None
with open(outcar) as f:
for line in f.readlines():
if line.find("- Iteration") != -1:
nsteps = int(line.split("(")[0].split()[-1].strip())
return nsteps
[docs]
def read_outcar_validity(outcar: str = None) -> bool:
"""Returns True if VASP finished properly.
If VASP itself is cancelled (batch_walltime or explicitly by user)
or encounters certain errors it won't ouput the typical
Voluntary context switch count on the last line of the OUTCAR.
Parameters
----------
outcar
name of input file [default: OUTCAR]
"""
if not outcar:
outcar = Path("OUTCAR")
if outcar.is_file():
last_line = tail(outcar)
return "Voluntary context switches" in str(last_line)
return False
[docs]
def read_convergence(outcar=None):
"""Checks whether a calculation has converged.
Performs a strict convergence checks in the sense that electronic
as well as ionic convergence is explicitly chechked for by comparing
their magnitude to EDIFF and EDIFFG.
Returns
-------
list
A list of two bools indicating if the electronic SCF cycle
and the ionic relaxation converged, respectively.
"""
if not outcar:
outcar = Path("OUTCAR")
converged = [False, False]
# First check electronic convergence
for line in open(outcar, "r"):
if 0: # vasp always prints that!
if line.rfind("aborting loop") > -1: # scf failed
raise RuntimeError(line.strip())
break
if line.rfind("EDIFF ") > -1:
ediff = float(line.split()[2])
if line.rfind("total energy-change") > -1:
# I saw this in an atomic oxygen calculation. it
# breaks this code, so I am checking for it here.
if "MIXING" in line:
continue
split = line.split(":")
a = float(split[1].split("(")[0])
b = split[1].split("(")[1][0:-2]
# sometimes this line looks like (second number wrong format!):
# energy-change (2. order) :-0.2141803E-08 ( 0.2737684-111)
# we are checking still the first number so
# let's "fix" the format for the second one
if "e" not in b.lower():
# replace last occurrence of - (assumed exponent) with -e
bsplit = b.split("-")
bsplit[-1] = "e" + bsplit[-1]
b = "-".join(bsplit).replace("-e", "e-")
b = float(b)
if [abs(a), abs(b)] < [ediff, ediff]:
converged[0] = True
else:
# converged = False
continue
# check ionic convergence
for line in open(outcar, "r"):
if line.rfind("reached required accuracy") > -1:
converged[1] = True
return converged
[docs]
def read_zval(potcar):
"""Return the ZVAL for a potcar file.
parse this line:
POMASS = 106.420; ZVAL = 10.000 mass and valenz
"""
# First check if it is a .Z type file
if potcar.endswith(".Z"):
cmdlist = ["zcat", potcar]
p = Popen(cmdlist, stdin=PIPE, stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
if out == "" or err != "":
raise Exception("Cannot read POTCAR.Z:\n\n{0}".format(err))
lines = out.split("\n")
else:
with open(potcar, "r") as f:
lines = f.readlines()
for line in lines:
if "ZVAL" in line:
m = re.search("ZVAL =\s*([0-9]*\.?[0-9]*)", line)
return float(m.group(1))
[docs]
def read_enmax(potcar):
""" Return ENMAX from the potcar file."""
with open(potcar) as f:
for line in f:
if "ENMAX" in line:
m = re.search("ENMAX\s*=\s*(?P<ENMAX>[0-9]+.[0-9]+);", line)
return float(m.groupdict()["ENMAX"])
[docs]
def read_enmin(potcar):
""" Return ENMIN from the potcar file."""
with open(potcar) as f:
for line in f:
if "ENMIN" in line:
regex = "ENMIN\s*=\s*(?P<ENMIN>[0-9]+.[0-9]+)\s+eV"
m = re.search(regex, line)
return float(m.groupdict()["ENMIN"])
[docs]
def isfloat(s):
"""Return True if s is a float.
We check if it is not an integer first, then try to cast it as a float.
Parameters
----------
s : str
input string
Returns
-------
boolean
"""
if re.match("^[-+]?\d+$", s):
return False
try:
float(s)
return True
except:
return False
[docs]
class Readers:
"""Mixin class of reader-type methods for the Vasp calculator.
These methods are all dependent on some calculator property or reimplement
ASE methods and hence cannot be factored out of the calculator.
"""
[docs]
def read_incar(self, fname=None):
"""Read fname (defaults to INCAR).
This only reads simple INCAR files, e.g. one tag per line, and
with no comments in the line. There is no knowledge of any Vasp
keywords in this, and the values are converted to Python types by
some simple rules.
Parameters
----------
fname : str
name of input file
Returns
-------
dict
parameters from INCAR file
"""
if fname is None:
fname = self.incar
params = Parameters()
with open(fname) as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if ";" in line:
raise Exception("; found. that is not supported.")
elif line.startswith("#") or "=" not in line:
continue
elif "#" in line:
line = line.split("#")[0].strip()
key, val = line.split("=")
key = key.strip().lower()
val = val.strip()
# now we need some logic
if re.match(r"^\.?[Tt][Rr][Uu][Ee]\.?$", val):
val = True
elif re.match(r"(?:^|^\.?)[Ff][Aa][Ll][Ss][Ee](?:$|\.?$)", val):
val = False
# Match integers by a regexp that includes signs
# val.isdigit() does not get negative integers right.
elif re.match("^[-+]?\d+$", val):
val = int(val)
elif isfloat(val):
val = float(val)
elif len(val.split(" ")) > 1:
# this is some kind of list separated by spaces
val = val.split(" ")
val = sum([int(x.split('*')[0]) * [float(x.split('*')[1])]
if re.match("\d+\*.*", x) else [int(x)] if
re.match("^[-+]?\d+$", x) else [float(x)] for x in
val], [])
else:
# I guess we have a string here.
pass
# make sure magmom is returned as a list. This is an issue
# when there is only one atom. Then it looks like a float.
if key == "magmom":
if not isinstance(val, list):
val = int(val.split('*')[0]) * [float(val.split('*')[1])]\
if re.match("\d+\*.*", val) else [float(val)]
if key == "rwigs":
if not isinstance(val, list):
val = [val]
# assemble kpoints parameters in dict
if key in ["kspacing", "kgamma"]:
if "kpts" in params.keys():
params["kpts"][key[1:]] = val
else:
params["kpts"] = {key[1:]: val}
# all other parameters goes into params as is
else:
params[key] = val
return params
[docs]
def read_kpoints(self, fname=None):
"""Read KPOINTS file.
Parameters
----------
fname : str
name of input file
Returns
-------
ASE Parameters dictionary
k-point tags
"""
if fname is None:
fname = self.kpoints
with open(fname) as f:
lines = f.readlines()
# params = Parameters()
# read the comment in case we hid some information there
# (only relevant for spacing and surface options)
comment_split = lines[0].strip().split()
# second line is the number of kpoints or 0 for automatic kpoints
nkpts = int(lines[1].strip())
# third line you have to specify whether the coordinates are given
# in cartesian or reciprocal coordinates if nkpts is greater than
# zero. Only the first character of the third line is
# significant. The only key characters recognized by VASP are 'C',
# 'c', 'K' or 'k' for switching to cartesian coordinates, any
# other character will switch to reciprocal coordinates.
#
# if nkpts = 0 then the third line will start with m or g for
# Monkhorst-Pack and Gamma. if it does not start with m or g, an
# alternative vasp_mode is entered that we do not support yet.
kpts = {}
ktype = lines[2].split()[0].lower()[0]
if nkpts <= 0:
# automatic vasp_mode
if ktype == "g":
kpts["gamma"] = True
elif ktype == "m":
kpts["gamma"] = False
if ktype == "a":
kpts["auto"] = float(lines[3])
else:
try:
kpts_shift = [float(lines[4].split()[i]) for i in range(3)]
except Exception:
kpts_shift = [0.0, 0.0, 0.0]
kpts_size = [int(lines[3].split()[i]) for i in range(3)]
if "spacing" not in comment_split:
kpts["size"] = kpts_size
if not np.all(np.isclose(kpts_shift, [0.0, 0.0, 0.0])):
kpts["shift"] = kpts_shift
else:
kpts["spacing"] = float(
comment_split[comment_split.index("spacing") + 1]
)
if "surface" in comment_split:
kpts["surface"] = True
# TODO: add support for other k point specifications
return {"kpts": kpts}
[docs]
def read_potcar(self, fname: str = None):
"""
Read the POTCAR file to get the information regarding
PAW/pseudopotential setups.
Parameters
----------
fname : str
name of input file
Returns
-------
ASE Parameters dictionary
PAW/pseudopotential setups
"""
if fname is None:
fname = self.potcar
params = Parameters()
potcars = []
with open(fname) as f:
lines = f.readlines()
# first potcar
potcars += [lines[0].strip()]
for i, line in enumerate(lines):
if "LEXCH = PE" in line:
params["pp"] = "PBE"
elif "LEXCH = CA" in line:
params["pp"] = "LDA"
elif "LEXCH = 91" in line:
params["pp"] = "GGA"
if "End of Dataset" in line and i != len(lines) - 1:
potcars += [lines[i + 1].strip()]
potcars = [(x[0], x[1], x[2]) for x in [potcar.split() for potcar in potcars]]
special_setups = {}
for xc, sym, date in potcars:
if "_" in sym: # we have a special setup
symbol, setup = sym.split("_")
special_setups[symbol] = "_" + setup
if len(special_setups) > 0:
params["setups"] = special_setups
return params
[docs]
def read_atoms(self):
"""
Read the first and final atoms objects.
Returns
-------
Atoms or None
"""
atoms_on_file = {}
contcar = self.directory.joinpath("CONTCAR")
# Parsing order
# 1. First try persistence db for initial config
if self.db.is_file():
with connect(self.db) as db:
atoms_on_file[0] = db.get_atoms(1)
# 2. Try CONTCAR+POSCAR
if not atoms_on_file.get(0, None) and self.poscar.is_file():
atoms_on_file[0] = ase.io.read(self.poscar)
if contcar.is_file():
try:
atoms_on_file[-1] = ase.io.read(contcar)
except IndexError as e:
pass
# 3. Try XML then OUTCAR
if None in [atoms_on_file.get(0, None), atoms_on_file.get(-1, None)]:
for f in [self.xml, self.outcar]:
try:
atoms_on_file[0] = ase.io.read(f, 0)
atoms_on_file[-1] = ase.io.read(f, -1)
break
except: #FileNotFoundError as e:
pass
return atoms_on_file
[docs]
def read_parameters(self):
# Reading stuff that is independent of the calculation state.
if self.db.is_file():
with connect(self.db) as db:
data = db.get(1).data
self.parameters.update(data["parameters"])
self.conf.update(data["conf"])
self.jobid = data["jobid"]
else:
if self.incar.is_file():
self.parameters.update(self.read_incar())
if self.potcar.is_file():
self.parameters.update(self.read_potcar())
if self.kpoints.is_file():
self.parameters.update(self.read_kpoints())
# We have to figure out the xc that was used based on the
# Parameter keys. We sort the possible xc dictionaries so the
# ones with the largest number of keys are compared first. This is
# to avoid false matches of xc's with smaller number of equal
# keys.
xc_keys = sorted(
self.xc_defaults, key=lambda k: len(self.xc_defaults[k]), reverse=True
)
for ex in xc_keys:
pd = {k: self.parameters.get(k, None) for k in self.xc_defaults[ex]}
if pd == self.xc_defaults[ex]:
self.parameters["xc"] = ex
break
# reconstruct ldau_luj. special setups might break this.
if "ldauu" in self.parameters:
ldaul = self.parameters["ldaul"]
ldauj = self.parameters["ldauj"]
ldauu = self.parameters["ldauu"]
with open(self.potcar) as f:
lines = f.readlines()
# symbols are in the first line of each potcar
symbols = [lines[0].split()[1]]
for i, line in enumerate(lines):
if "End of Dataset" in line and i != len(lines) - 1:
symbols += [lines[i + 1].split()[1]]
ldau_luj = {}
for sym, l, j, u in zip(symbols, ldaul, ldauj, ldauu):
ldau_luj[sym] = {"L": l, "U": u, "J": j}
self.parameters["ldau_luj"] = ldau_luj
# Store the params read from file for later use.
self.parameters_on_file = self.parameters.copy()
[docs]
def read_results(self):
"""Read energy, forces, stress, magmom and magmoms from output file.
Other quantities will be read by other functions. This depends on
the calculator state.
Todo
----
complete docstring with description of parameters and return value
"""
# regular calculation that is finished
# read atoms with single-point calculator (but no tags).
for f in [self.outcar, self.xml]:
try:
atoms_final = ase.io.read(f)
break
except FileNotFoundError:
pass
else:
msg = "No OUTCAR or vasprun.xml in {}".format(self.directory)
raise RuntimeError(msg)
# Set the basic results using the single-point calculator on atoms
energy = atoms_final.get_potential_energy()
try:
free_energy = atoms_final.get_potential_energy(force_consistent=True)
except NotImplementedError: # e.g. no vasprun xml exists
free_energy = None
forces = atoms_final.get_forces() # needs to be resorted
try:
stress = atoms_final.get_stress()
except NotImplementedError: # e.g. ISIF=0, stress is not computed
stress = None
# Sort the newly read Atoms and use them to update
# self.atoms and self.atoms_sorted. To do this we need
# to map the final read Atoms indices to the original
# unsorted indices
isort_final = self.sort_atoms(atoms_final)
iunsort = self.isort.argsort()
imap = isort_final[iunsort]
self.atoms.cell = atoms_final.cell
self.atoms.positions = atoms_final.positions[imap]
# set the results
self.results["energy"] = energy
self.results["free_energy"] = free_energy
self.results["forces"] = forces[imap]
self.results["stress"] = stress
# attach calc to atoms so we can use atoms.get... methods
self.atoms.set_calculator(self)
# handle magnetic moments
magnetic_moment = 0
magnetic_moments = np.zeros(len(atoms_final))
if self.parameters.get("ispin", 0) == 2:
lines = open(self.outcar).readlines()
for n, line in enumerate(lines):
if line.startswith(" number of electron "):
try:
magnetic_moment = float(line.split()[-1])
except:
print("magmom read error")
print(self.directory, line)
if line.rfind("magnetization (x)") > -1:
for m in range(len(atoms_final)):
val = float(lines[n + m + 4].split()[4])
magnetic_moments[m] = val
self.results["magmom"] = magnetic_moment
self.results["magmoms"] = np.array(magnetic_moments)[imap]
# let the state reflect that we have already parsed everything
self.state = self.PARSED
[docs]
def read_forces(self, latest=True):
"""Method that reads forces from OUTCAR file.
If 'latest' is switched off, the forces for all ionic steps
in the OUTCAR file be returned, in other case only the
forces for the last ionic configuration is returned.
"""
resort = self.isort
file = open(self.outcar, "r")
lines = file.readlines()
file.close()
n = 0
if latest == False:
all_forces = []
for line in lines:
if line.rfind("TOTAL-FORCE") > -1:
forces = []
for i in range(len(self.atoms)):
forces.append(
np.array([float(f) for f in lines[n + 2 + i].split()[3:6]])
)
if latest == False:
all_forces.append(np.array(forces)[resort])
n += 1
if latest == False:
return np.array(all_forces)
else:
return np.array(forces)[resort]