Source code for storq.vasp.writers

import numpy as np
import os
from ase.db import connect
from ase.io.vasp import write_vasp
from collections import OrderedDict
from storq.tools import find_db_rows


[docs]class Writers:
[docs] def write_input(self): """Writes all input files required for a calculation. """ # make directories if necessary if self.directory != os.curdir and not os.path.isdir(self.directory): os.makedirs(self.directory) self.write_poscar() self.write_incar() if "kspacing" not in self.parameters: self.write_kpoints() self.write_potcar() self.write_persistence_db() # finally symlink the vdW kernel if needed if self.parameters.get("luse_vdw", False): if self.conf.get("vasp_vdw_kernel", None) is not None: kernel = os.path.join(self.directory, "vdw_kernel.bindat") if not os.path.exists(kernel):
os.symlink(self.conf["vasp_vdw_kernel"], kernel)
[docs] def write_db(self, db_name, data=None, append=True, **kwargs): """Write database used by storq calculation persistence. The database consists of a single row with information about atoms, results, parameters and confiugration for easy restart. Parameters ---------- db_name : str Name of a db file (existing or new) to write to. append : bool Whether the database file should be overwritten or not. keys : dict Additional key-value-pairs to include in the database. data : dict Additional data to include in the database. """ db = connect(os.path.join(os.getcwd(), db_name), append=append) # check for possible duplicates name = os.path.basename(self.directory) rows = find_db_rows(db, name=name) if len(rows) > 0: raise RuntimeError( "Row #{0} has the same calculation name (must be unique).".format( rows[0] ) ) else: print("writing")
db.write(self.atoms, name=name, data=data, **kwargs)
[docs] def write_persistence_db( self, atoms=None, parser=None, overwrite=True, keys={}, data={}, del_info=[], **kwargs ): """Database containing persistence information for storq calculations. Contains atoms, results, parameters and confiugration for easy restart. Note that the datbase should only contain a single row. Parameters ---------- atoms : ASE Atoms objectc Structure to write to the database. If None, the atoms object will be the image attached to the calculator. parser : str A tool for generating key-value-pairs from the calculators directory path. Pairs will be separated by directory and key- values will be separated by parser. If None, no key-value-pair information will be collected. overwrite : bool Whether the database file should be overwritten or not. keys : dict Additional key-value-pairs to include in the database. data : dict Additional data to include in the database. del_info : list Keys to be removed from the data of key_value_pairs of the database file. """ # Get the atoms object from the calculator if atoms is None: atoms = self.get_atoms() # Get keys-value-pairs from directory name. # Collect only path names with 'parser' in them. if parser is not None: path = [x for x in self.directory.split("/") if parser in x] for key_value in path: key = key_value.split(parser)[0] value = key_value.split(parser)[1] # Try to recognize characters and convert to # specific data types for easy access later. if "." in value: value = float(value) elif value.isdigit(): value = int(value) elif value == "False": value = bool(False) elif value == "True": value = bool(True) else: value = str(value) # Add directory keys keys[key] = value data.update( { "path": self.directory, "version": self.version, "resort": self.resort, "parameters": self.parameters, "ppp_list": self.ppp_list, "jobid": self.jobid, "conf": self.conf, } ) # Only relevant for writing single entry DB file. if overwrite: if os.path.exists(self.db): # Get any current data and keywords. with connect(self.db) as db: try: dbatoms = db.get_atoms(id=1) data.update(dbatoms.data) keys.update(dbatoms.key_value_pairs) except AttributeError: pass os.unlink(self.db) # Remove keys and data in del_info. for k in del_info: if k in keys: del keys[k] if k in data: del data[k] # Generate the db file db_name = os.path.join(self.directory, "storq.db") with connect(db_name) as db: db.write(atoms, key_value_pairs=keys, data=data)
self.db = db_name
[docs] def write_poscar(self): """Write POSCAR file. """ poscar = os.path.join(self.directory, "POSCAR")
write_vasp(poscar, self.atoms_sorted, symbol_count=self.symbol_count)
[docs] def write_incar(self): """Write INCAR file. Boolean values are written as .TRUE./.FALSE.; integers, floats and strings are written out as is; lists and tuples are written out as space separated values. Parameters ---------- fname : str output file name """ incar = os.path.join(self.directory, "INCAR") special_kwargs = [ "xc", "pp", "setups", "kpts", "ldau_luj", ] # DFT + U dictionary incar_keys = list(set(self.parameters) - set(special_kwargs)) d = {key: self.parameters[key] for key in incar_keys} d = OrderedDict(sorted(d.items(), key=lambda t: t[0])) with open(incar, "w") as f: f.write("INCAR created by Storq\n") for key, val in d.items(): key = " " + key.upper() if val is None: # Do not write out None values # It is how we delete tags pass elif isinstance(val, bool): s = ".TRUE." if val else ".FALSE." f.write("{} = {}\n".format(key, s)) elif isinstance(val, (list, tuple, np.ndarray)): s = " ".join([str(x) for x in val]) f.write("{} = {}\n".format(key, s)) else:
f.write("{} = {}\n".format(key, val))
[docs] def write_kpoints(self): """Write KPOINTS file. The KPOINTS file format is as follows:: line 1: a comment line 2: number of kpoints n <= 0 Automatic kpoint generation n > 0 explicit number of kpoints line 3: kpt format if n > 0: C,c,K,k = cartesian coordinates anything else = reciprocal coordinates if n <= 0 M,m,G,g for Monkhorst-Pack or Gamma grid anything else is a special case line 4: if n <= 0, the Monkhorst-Pack grid if n > 0, then one line per kpoint line 5: if n <= 0 it is the gamma shift After the kpts may follow information regarding the tetrahedra, but we do not support that for now. Parameters ---------- fname : str output file name """ fname = os.path.join(self.directory, "KPOINTS") # handle the input args p = self.parameters kpts = p["kpts"] # guaranteed to be set # we are using automatic or semi-automatic mode if "size" in kpts or "spacing" in kpts: if kpts["gamma"]: mode = "Gamma" else: mode = "Monkhorst-Pack" num_kpts = 0 elif "auto" in kpts: mode = "Auto" num_kpts = 0 # we are in an explicit listing mode elif kpts.get("list", None): if kpts.get("cartesian", False): mode = "Cartesian" else: mode = "Reciprocal" num_kpts = len(kpts["list"]) # we are in line mode elif kpts.get("lines", None): mode = "Line-mode" num_lines = len(kpts["lines"]) # write the file with open(fname, "w") as f: # line 1: comment # hide some info here to restore kpts to right state on restart comment = "KPOINTS created by Atomic Simulation Environment" if "spacing" in kpts: comment = "spacing {0:.4f}".format(kpts["spacing"]) if kpts.get("surface", False) is True: comment += " surface" f.write(comment + "\n") # line 2: number of kpts if mode == "Line-mode": f.write("{}\n".format(kpts.get("intersections", 10))) else: f.write("{}\n".format(num_kpts)) # line 3: the mode f.write(mode + "\n") # line 4+: k-point specification if mode in ["Monkhorst-Pack", "Gamma"]: if "size" in kpts: kpts_size = kpts["size"] elif "spacing" in kpts: kpts_size = [1, 1, 1] rec_cell = self.atoms_sorted.get_reciprocal_cell() kpts_size[0] = int( max(1, np.ceil( np.linalg.norm(rec_cell[0]) \ * 2.0*np.pi / kpts["spacing"])) ) kpts_size[1] = int( max(1, np.ceil(np.linalg.norm(rec_cell[1]) \ * 2.0*np.pi / kpts["spacing"])) ) kpts_size[2] = int( max(1, np.ceil(np.linalg.norm(rec_cell[2]) \ * 2.0*np.pi / kpts["spacing"])) ) if kpts.get("surface", None) is False: f.write("{0} {1} {2}\n".format(*tuple(kpts_size))) else: f.write("{0} {1} 1\n".format(*tuple(kpts_size))) elif mode == "Auto": f.write("{0}\n".format(kpts["auto"])) elif mode == "Line-mode": if kpts.get("caretesian", False): f.write("Cartesian\n") else: f.write("Reciprocal\n") for n in range(0, num_lines, 2): f.write("{0} {1} {2}\n".format(*tuple(kpts["lines"][n]))) f.write("{0} {1} {2}\n".format(*tuple(kpts["lines"][n + 1]))) f.write("\n") elif mode in ["Cartesian", "Reciprocal"]: for n in range(0, num_kpts): f.write("{0} {1} {2} {3}\n".format(*tuple(kpts["list"][n]))) # if we are in 'semi' automatic mode and have a shift if mode in ["Monkhorst-Pack", "Gamma"]:
f.write("{0} {1} {2}\n".format(*tuple(kpts["shift"])))
[docs] def write_potcar(self): """Write POTCAR file. POTCARs are expected to reside in self.conf['vasp_potentials']. Parameters ---------- fname : str output file name """ fname = os.path.join(self.directory, "POTCAR") with open(fname, "wb") as potfile: for _, pfile, _ in self.ppp_list: pfile = os.path.join(self.conf["vasp_potentials"], pfile) with open(pfile, "rb") as f:
potfile.write(f.read())