Source code for storq.vasp.readers

import ase
import numpy as np
import os
import re
from ase import Atoms
from ase.calculators.calculator import Parameters
from ase.db import connect
from storq.tools import tail
from subprocess import Popen, PIPE


[docs]def read_num_ionic_steps(outcar: str=None) -> int: """Returns the number of ionic steps from the OUTCAR file. Parameters ---------- outcar name of input file [default: OUTCAR] """ if not outcar: outcar = os.path.join(os.getcwd(), "OUTCAR") nsteps = None with open(outcar) as f: for line in f.readlines(): if line.find("- Iteration") != -1: nsteps = int(line.split("(")[0].split()[-1].strip())
return nsteps
[docs]def read_outcar_validity(outcar: str=None) -> bool: """Returns True if VASP finished properly. If VASP itself is cancelled (batch_walltime or explicitly by user) or encounters certain errors it won't ouput the typical Voluntary context switch count on the last line of the OUTCAR. Parameters ---------- outcar name of input file [default: OUTCAR] """ if not outcar: outcar = os.path.join(os.getcwd(), "OUTCAR") if os.path.exists(outcar): last_line = tail(outcar) return "Voluntary context switches" in str(last_line)
return False
[docs]def read_convergence(outcar=None): """Checks whether a calculation has converged. Performs a strict convergence checks in the sense that electronic as well as ionic convergence is explicitly chechked for by comparing their magnitude to EDIFF and EDIFFG. Returns ------- list A list of two bools indicating if the electronic SCF cycle and the ionic relaxation converged, respectively. """ if not outcar: outcar = os.path.join(os.getcwd(), "OUTCAR") converged = [False, False] # First check electronic convergence for line in open(outcar, "r"): if 0: # vasp always prints that! if line.rfind("aborting loop") > -1: # scf failed raise RuntimeError(line.strip()) break if line.rfind("EDIFF ") > -1: ediff = float(line.split()[2]) if line.rfind("total energy-change") > -1: # I saw this in an atomic oxygen calculation. it # breaks this code, so I am checking for it here. if "MIXING" in line: continue split = line.split(":") a = float(split[1].split("(")[0]) b = split[1].split("(")[1][0:-2] # sometimes this line looks like (second number wrong format!): # energy-change (2. order) :-0.2141803E-08 ( 0.2737684-111) # we are checking still the first number so # let's "fix" the format for the second one if "e" not in b.lower(): # replace last occurrence of - (assumed exponent) with -e bsplit = b.split("-") bsplit[-1] = "e" + bsplit[-1] b = "-".join(bsplit).replace("-e", "e-") b = float(b) if [abs(a), abs(b)] < [ediff, ediff]: converged[0] = True else: # converged = False continue # check ionic convergence for line in open(outcar, "r"): if line.rfind("reached required accuracy") > -1: converged[1] = True
return converged
[docs]def read_zval(potcar): """Return the ZVAL for a potcar file. parse this line: POMASS = 106.420; ZVAL = 10.000 mass and valenz """ # First check if it is a .Z type file if potcar.endswith(".Z"): cmdlist = ["zcat", potcar] p = Popen(cmdlist, stdin=PIPE, stdout=PIPE, stderr=PIPE) out, err = p.communicate() if out == "" or err != "": raise Exception("Cannot read POTCAR.Z:\n\n{0}".format(err)) lines = out.split("\n") else: with open(potcar, "r") as f: lines = f.readlines() for line in lines: if "ZVAL" in line: m = re.search("ZVAL =\s*([0-9]*\.?[0-9]*)", line) return float(m.group(1))
return
[docs]def read_enmax(potcar): """ Return ENMAX from the potcar file.""" with open(potcar) as f: for line in f: if "ENMAX" in line: m = re.search("ENMAX\s*=\s*(?P<ENMAX>[0-9]+.[0-9]+);", line)
return float(m.groupdict()["ENMAX"])
[docs]def read_enmin(potcar): """ Return ENMIN from the potcar file.""" with open(potcar) as f: for line in f: if "ENMIN" in line: regex = "ENMIN\s*=\s*(?P<ENMIN>[0-9]+.[0-9]+)\s+eV" m = re.search(regex, line)
return float(m.groupdict()["ENMIN"])
[docs]def isfloat(s): """Return True if s is a float. We check if it is not an integer first, then try to cast it as a float. Parameters ---------- s : str input string Returns ------- boolean """ if re.match("^[-+]?\d+$", s): return False try: float(s) return True except:
return False
[docs]class Readers: """Mixin class of reader-type methods for the Vasp calculator. These methods are all dependent on some calculator property or reimplement ASE methods and hence cannot be factored out of the calculator. """
[docs] def read_incar(self, fname=None): """Read fname (defaults to INCAR). This only reads simple INCAR files, e.g. one tag per line, and with no comments in the line. There is no knowledge of any Vasp keywords in this, and the values are converted to Python types by some simple rules. Parameters ---------- fname : str name of input file Returns ------- dict parameters from INCAR file """ if fname is None: fname = self.incar params = Parameters() with open(fname) as f: lines = f.readlines() # The first line is a comment for line in lines[1:]: line = line.strip() if ";" in line: raise Exception("; found. that is not supported.") elif line.startswith("#") or line == "": continue elif "#" in line: line = line.split("#")[0].strip() key, val = line.split("=") key = key.strip().lower() val = val.strip() # now we need some logic if val == ".TRUE.": val = True elif val == ".FALSE.": val = False # Match integers by a regexp that includes signs # val.isdigit() does not get negative integers right. elif re.match("^[-+]?\d+$", val): val = int(val) elif isfloat(val): val = float(val) elif len(val.split(" ")) > 1: # this is some kind of list separated by spaces val = val.split(" ") val = [int(x) if re.match("^[-+]?\d+$", x) else float(x) for x in val] else: # I guess we have a string here. pass # make sure magmom is returned as a list. This is an issue # when there is only one atom. Then it looks like a float. if key == "magmom": if not isinstance(val, list): val = [val] if key == "rwigs": if not isinstance(val, list): val = [val] params[key] = val
return params
[docs] def read_kpoints(self, fname=None): """Read KPOINTS file. Parameters ---------- fname : str name of input file Returns ------- ASE Parameters dictionary k-point tags """ if fname is None: fname = self.kpoints with open(fname) as f: lines = f.readlines() params = Parameters() # read the comment in case we hid some information there # (only relevant for spacing and surface options) comment_split = lines[0].strip().split() # second line is the number of kpoints or 0 for automatic kpoints nkpts = int(lines[1].strip()) # third line you have to specify whether the coordinates are given # in cartesian or reciprocal coordinates if nkpts is greater than # zero. Only the first character of the third line is # significant. The only key characters recognized by VASP are 'C', # 'c', 'K' or 'k' for switching to cartesian coordinates, any # other character will switch to reciprocal coordinates. # # if nkpts = 0 then the third line will start with m or g for # Monkhorst-Pack and Gamma. if it does not start with m or g, an # alternative vasp_mode is entered that we do not support yet. kpts = {} ktype = lines[2].split()[0].lower()[0] if nkpts <= 0: # automatic vasp_mode if ktype == "g": kpts["gamma"] = True elif ktype == "m": kpts["gamma"] = False if ktype == "a": kpts["auto"] = float(lines[3]) else: kpts_shift = [float(lines[4].split()[i]) for i in range(3)] kpts_size = [int(lines[3].split()[i]) for i in range(3)] if "spacing" not in comment_split: kpts["size"] = kpts_size if kpts_shift != self._default_parameters["kpts"]["shift"]: kpts["shift"] = kpts_shift else: kpts["spacing"] = float( comment_split[comment_split.index("spacing") + 1] ) if "surface" in comment_split: kpts["surface"] = True # elif nkpts > 0: # # list of kpts provided. Technically c,k are supported and # # anything else means reciprocal coordinates. # if ktype in ['c', 'k', 'r']: # kpts = [] # for i in range(3, 3 + nkpts): # # the kpts also have a weight attached to them # kpts.append([float(lines[i].split()[j]) # for j in range(4)]) # params['kpts'] = kpts # # you may also be in line-vasp_mode # elif ktype in ['l']: # if lines[3][0].lower() == 'r': # params['reciprocal'] = True # params['kpts_nintersections'] = nkpts # kpts = [] # for i in range(4, len(lines)): # if lines[i] == '': # continue # else: # kpts.append([float(lines[i].split()[j]) # for j in range(3)]) # else: # raise NotImplementedError('ktype = %s' % lines[2]) # if ktype == 'r': # params['reciprocal'] = True kpts_tmp = self._default_parameters["kpts"].copy() kpts_tmp.pop("size", None) # avoid conflicting kw kpts_tmp.update(kpts) params["kpts"] = kpts_tmp
return params
[docs] def read_potcar(self, fname: str = None): """ Read the POTCAR file to get the information regarding PAW/pseudopotential setups. Parameters ---------- fname : str name of input file Returns ------- ASE Parameters dictionary PAW/pseudopotential setups """ if fname is None: fname = self.potcar params = Parameters() potcars = [] with open(fname) as f: lines = f.readlines() # first potcar potcars += [lines[0].strip()] for i, line in enumerate(lines): if "LEXCH = PE" in line: params["pp"] = "PBE" elif "LEXCH = CA" in line: params["pp"] = "LDA" elif "LEXCH = 91" in line: params["pp"] = "GGA" if "End of Dataset" in line and i != len(lines) - 1: potcars += [lines[i + 1].strip()] potcars = [(x[0], x[1], x[2]) for x in [potcar.split() for potcar in potcars]] special_setups = [] for xc, sym, date in potcars: if "_" in sym: # we have a special setup symbol, setup = sym.split("_") special_setups += [[symbol, "_" + setup]] if special_setups: params["setups"] = special_setups
return params
[docs] def read_atoms(self): """ Read the final atoms object. and return the user defined order (unsort) if able. Returns ------- Atoms or None """ atoms = None resort = self.get_db("resort") # for a new or a pre-vasp calculation resort will be None if resort is not None: resort = list(resort) from ase.db import connect with connect(os.path.join(self.directory, "storq.db")) as con: temp_atoms = con.get_atoms(id=1) tags = temp_atoms.get_tags() else: tags = None # outcar = os.path.join(self.directory, 'OUTCAR') vasprun_xml = os.path.join(self.directory, "vasprun.xml") poscar = os.path.join(self.directory, "POSCAR") # contcar = os.path.join(self.directory, 'CONTCAR') if os.path.exists(vasprun_xml): try: atoms = ase.io.read(vasprun_xml) except Exception: if os.path.exists(poscar): atoms = ase.io.read(os.path.join(self.directory, "POSCAR")) else: return None elif os.path.exists(poscar): atoms = ase.io.read(os.path.join(self.directory, "POSCAR")) else: return None if resort is not None: atoms = atoms[resort] atoms.set_tags(tags)
return atoms
[docs] def read_basic(self): """Read the self.parameters and atoms, not the results(!). Overwrites the parent ase calculator method, and is called upon initialization of the calculator. Restart is ignored but part of the signature for ase. Todo ---- complete docstring with description of parameters and return value """ db = None if os.path.isfile(self.db): db = connect(self.db) # Else read a regular calculation. we start with reading stuff # that is independent of the calculation state. self.parameters = Parameters() if db: data = db.get(1).data self.parameters.update(data["parameters"]) else: if os.path.exists(self.incar): self.parameters.update(self.read_incar()) if os.path.exists(self.potcar): self.parameters.update(self.read_potcar()) if os.path.exists(self.kpoints): self.parameters.update(self.read_kpoints()) # We have to figure out the xc that was used based on the # Parameter keys. We sort the possible xc dictionaries so the # ones with the largest number of keys are compared first. This is # to avoid false matches of xc's with smaller number of equal # keys. xc_keys = sorted( self._xc_defaults, key=lambda k: len(self._xc_defaults[k]), reverse=True ) for ex in xc_keys: pd = {k: self.parameters.get(k, None) for k in self._xc_defaults[ex]} if pd == self._xc_defaults[ex]: self.parameters["xc"] = ex break # reconstruct ldau_luj. special setups might break this. if "ldauu" in self.parameters: ldaul = self.parameters["ldaul"] ldauj = self.parameters["ldauj"] ldauu = self.parameters["ldauu"] with open(self.potcar) as f: lines = f.readlines() # symbols are in the first line of each potcar symbols = [lines[0].split()[1]] for i, line in enumerate(lines): if "End of Dataset" in line and i != len(lines) - 1: symbols += [lines[i + 1].split()[1]] ldau_luj = {} for sym, l, j, u in zip(symbols, ldaul, ldauj, ldauu): ldau_luj[sym] = {"L": l, "U": u, "J": j} self.parameters["ldau_luj"] = ldau_luj # Now get the atoms self.atoms = self.read_atoms() if self.atoms is not None: self.sort_atoms() imm = self.parameters.get("magmom", [0 for atom in self.atoms]) self.atoms.set_initial_magnetic_moments(imm) # update configs with options and jobid stored in the database if db: self.conf.update(data["conf"]) self.jobid = data["jobid"] # get the state
self.state = self.get_state()
[docs] def read_results(self): """Read energy, forces, stress, magmom and magmoms from output file. Other quantities will be read by other functions. This depends on the calculator state. Todo ---- complete docstring with description of parameters and return value """ # regular calculation that is finished outcar = os.path.join(self.directory, "OUTCAR") contcar = os.path.join(self.directory, "CONTCAR") xml = os.path.join(self.directory, "vasprun.xml") # read atoms with single-point calculator (but no tags). # if os.path.exists(outcar) and os.path.exists(contcar): # atoms = ase.io.read(outcar) if os.path.exists(xml): atoms = ase.io.read(xml) elif os.path.exists(outcar): atoms = ase.io.read(outcar) else: exc = "No OUTCAR or vasprun.xml in {}".format(self.directory) raise RuntimeError(exc) # Set the basic results using the single-point calculator on atoms energy = atoms.get_potential_energy() try: free_energy = atoms.get_potential_energy(force_consistent=True) except NotImplementedError: # e.g. no vasprun xml exists free_energy = None forces = atoms.get_forces() # needs to be resorted try: stress = atoms.get_stress() except NotImplementedError: # e.g. ISIF=0, stress is not computed stress = None # Update self.atoms and get resort if self.atoms is None: self.sort_atoms(atoms) self.results["energy"] = energy self.results["free_energy"] = free_energy self.results["forces"] = forces[self.resort] self.results["stress"] = stress # this makes sure that the original atoms obj is updated atoms_on_file = self.read_atoms() # if self.atoms is not None: self.atoms.positions = atoms_on_file.positions self.atoms.cell = atoms_on_file.cell # else: # self.atoms = atoms_on_file # attach calc to atoms so we can use atoms.get... methods self.atoms.set_calculator(self) # handle magnetic moments magnetic_moment = 0 magnetic_moments = np.zeros(len(atoms)) if self.parameters.get("ispin", 0) == 2: lines = open(os.path.join(self.directory, "OUTCAR"), "r").readlines() for n, line in enumerate(lines): if line.startswith(" number of electron "): try: magnetic_moment = float(line.split()[-1]) except: print("magmom read error") print(self.directory, line) if line.rfind("magnetization (x)") > -1: for m in range(len(atoms)): val = float(lines[n + m + 4].split()[4]) magnetic_moments[m] = val self.results["magmom"] = magnetic_moment self.results["magmoms"] = np.array(magnetic_moments)[self.resort] # let the state reflect that we have already parsed everything
self.state = self.PARSED
[docs] def read_forces(self, latest=True): """Method that reads forces from OUTCAR file. If 'latest' is switched off, the forces for all ionic steps in the OUTCAR file be returned, in other case only the forces for the last ionic configuration is returned. """ if self.outcar is None: self.outcar = os.path.join(self.directory, "OUTCAR") file = open(self.outcar, 'r') lines = file.readlines() file.close() n = 0 if latest == False: all_forces = [] for line in lines: if line.rfind('TOTAL-FORCE') > -1: forces = [] for i in range(len(self.atoms)): forces.append(np.array([float(f) for f in lines[n + 2 + i].split()[3:6]])) if latest == False: all_forces.append(np.array(forces)[self.resort]) n += 1 if latest == False: return np.array(all_forces) else:
return np.array(forces)[self.resort]