Source code for storq.vasp.helpers

import gzip
import os
import shutil
import tarfile
import warnings
from pathlib import Path

import numpy as np

from storq.vasp.readers import read_enmax
from storq.vasp.files import input_files, minimal_backup_base, output_files


def _remove_files(directory, files, exceptions=None):
    directory = Path(directory)
    if exceptions:
        files = files - exceptions
    for f in files:
        try:
            directory.joinpath(f).unlink()
        except OSError:
            pass


[docs]def remove_files(directory, files, exceptions=None): """ Flexible function for removing calculation files. Note that this is a staticmethod so as to enable cleanup of broken calculation directories where a calculator object cannot be intialized. Parameters ---------- directory : str The calculation directory from which to remove files. files : Union[str, list] Decides which files to remove. Can be 'input', 'output', 'all', 'backup', 'dir', the name of a single file, or a list of filenames. exceptions : Union[str, list, set] Exceptions that will not be removed. """ directory = Path(directory) if exceptions and exceptions != "last": exceptions = set(list(exceptions)) if files == "input": _remove_files(directory, input_files, exceptions) elif files == "output": _remove_files(directory, output_files, exceptions) elif files == "dir": shutil.rmtree(directory) elif files == "all": _remove_files(directory, output_files | input_files, exceptions) elif files == "backup": imax = -1 for f in directory.joinpath("run").iterdir(): i = int(f.split(".")[-1]) if i > imax: imax = i if exceptions == "last": iend = imax else: iend = imax + 1 if iend > 1: for i in range(1, iend): if os.path.exists("run.{}".format(i)): shutil.rmtree("run.{}".format(i)) else: continue if exceptions == "last": os.rename("run.{}".format(imax), "run.1") else: # files is str, list, or set files = set(list(files))
_remove_files(directory, files, exceptions)
[docs]def copy_files(dir_old, dir_new, files="all", exceptions=None): """Flexible method for copying files. """ dir_old = Path(dir_old) dir_new = Path(dir_new) dir_new.mkdir(exist_ok=True) if files == "all": files = set(dir_old.iterdir()) elif files == "input": files = input_files elif files == "output": files = output_files else: # handles str, list, set files = set(list(files)) if exceptions: exceptions = set(list(exceptions)) files = files - exceptions for f in files: f = dir_old.joinpath(f) if f.is_file():
shutil.copy(f, dir_new)
[docs]class Helpers:
[docs] def archive(self, tag=None, organize=True, cleanup=False): """ Archive a run by (separately) gzipping entire vaspdir, vasp xml file and OUTCAR. """ parent_dir = self.directory.parent if not tag: tag = self.directory.name # gunzip OUTCAR, xml and self.directory gz_names = { f"outcar": f"OUTCAR.{tag}.gz", f"xml": f"{tag}.xml.gz", f"{self.directory}": f"{tag}.tar.gz", } with open(self.outcar, "rb") as file_in, gzip.open( gz_names["outcar"], "wb" ) as file_out: shutil.copyfileobj(file_in, file_out) with open(self.xml, "rb") as file_in, gzip.open( gz_names["xml"], "wb" ) as file_out: shutil.copyfileobj(file_in, file_out) with tarfile.open(gz_names[f"{self.directory}"], "w:gz") as tar: tar.add(self.directory, arcname=tag) # puts xml & outcar in 'out'and self.directorys in 'runs' if organize: out_dir = parent_dir.joinpath("out") runs_dir = parent_dir.joinpath("runs") out_dir.mkdir(exist_ok=True) runs_dir.mkdir(exist_ok=True) shutil.move(gz_names["outcar"], out_dir) shutil.move(gz_names["xml"], out_dir) shutil.move(gz_names[f"{self.directory}"], runs_dir) # deletes the original unzipped self.directory if cleanup:
shutil.rmtree(self.directory)
[docs] def backup(self, files="minimal", backup_dir="run."): """Flexible function for backing up calcualtion files. Parameters ---------- files : str, list Can be 'minimal' or 'all' in which case a minimal set of files or all files are backed up, respectively. Can also be a list of filenames to back up. """ minimal_backup = list(minimal_backup_base.copy()) if "vasp_stdout" in self.conf and self.conf["vasp_stdout"] is not None: minimal_backup.append(self.conf["vasp_stdout"]) if self.kpoints.is_file(): minimal_backup.append("KPOINTS") if files == "minimal": files_to_backup = minimal_backup elif isinstance(files, list): files_to_backup = files elif files is None: files_to_backup = [] backup_dir = backup_dir + "{}" ind_unused = 1 while self.directory.joinpath(backup_dir.format(ind_unused)).is_dir(): ind_unused += 1 backup_dir = self.directory.joinpath(backup_dir.format(ind_unused)) backup_dir.mkdir() if files == "all": for f in self.directory.iterdir(): if f.is_file(): shutil.copy(f, backup_dir) else: for f in files_to_backup: try: shutil.copy(f, backup_dir) except FileNotFoundError: warnings.warn( "{}: cannot backup {}, file not found".format( self.directory, f ), UserWarning,
)
[docs] def remove_files(self, files, exceptions=None):
remove_files(self.directory, files, exceptions)
[docs] def copy_files(self, dir_new, files="all", exceptions=None):
copy_files(self.directory, dir_new, files=files, exceptions=exceptions)
[docs] def suggest_num_bands(self, factor=1.0): """Convenience function to compute and set the number of bands. Uses the same rule as VASP does internally, which looks roughly like nbands = int(nelectrons/2 + factor*nions/2). The difference is that this method allows the specification of a scaling factor for the term proportional to the number of ions. This can be useful for e.g., transition metals where more bands need to be added sometimes (factor=2 can be required). Parameters ---------- factor : float Multiplicative scaling factor for the number of bands. Returns ------- int The new number of bands. """ nelect = self.get_num_valence() natoms = len(self.atoms) nbands = int( max(round(nelect + 2) / 2 + factor * max(natoms // 2, 3), int(0.6 * nelect)) ) npar = self.parameters.get("npar", 1) nbands = ((nbands + npar - 1) // npar) * npar self.set(nbands=nbands)
return nbands
[docs] def suggest_encut(self, factor=1.3): """Heuristic for setting the PW cutoff. Uses the formula factor*max(ENMAX) where the maximum is taken over all POTCARs involved. Parameters ---------- factor : float Multiplicative scaling factor for the eneryg cutoff. """ enmax = 0 for _, potcar in self.get_potcars(): enmax = max(read_enmax(potcar), enmax) enmax *= factor
return encut