import gzip
import os
import shutil
import tarfile
import warnings
from pathlib import Path
import numpy as np
from storq.vasp.readers import read_enmax
from storq.vasp.files import input_files, minimal_backup_base, output_files
def _remove_files(directory, files, exceptions=None):
directory = Path(directory)
if exceptions:
files = files - exceptions
for f in files:
try:
directory.joinpath(f).unlink()
except OSError:
pass
[docs]
def remove_files(directory, files, exceptions=None):
""" Flexible function for removing calculation files.
Note that this is a staticmethod so as to enable cleanup
of broken calculation directories where a calculator object
cannot be intialized.
Parameters
----------
directory : str
The calculation directory from which to remove files.
files : Union[str, list]
Decides which files to remove. Can be 'input', 'output', 'all', 'backup', 'dir',
the name of a single file, or a list of filenames.
exceptions : Union[str, list, set]
Exceptions that will not be removed.
"""
directory = Path(directory)
if exceptions and exceptions != "last":
exceptions = set(list(exceptions))
if files == "input":
_remove_files(directory, input_files, exceptions)
elif files == "output":
_remove_files(directory, output_files, exceptions)
elif files == "dir":
shutil.rmtree(directory)
elif files == "all":
_remove_files(directory, output_files | input_files, exceptions)
elif files == "backup":
imax = -1
for f in directory.joinpath("run").iterdir():
i = int(f.split(".")[-1])
if i > imax:
imax = i
if exceptions == "last":
iend = imax
else:
iend = imax + 1
if iend > 1:
for i in range(1, iend):
if os.path.exists("run.{}".format(i)):
shutil.rmtree("run.{}".format(i))
else:
continue
if exceptions == "last":
os.rename("run.{}".format(imax), "run.1")
else: # files is str, list, or set
files = set(list(files))
_remove_files(directory, files, exceptions)
[docs]
def copy_files(dir_old, dir_new, files="all", exceptions=None):
"""Flexible method for copying files. """
dir_old = Path(dir_old)
dir_new = Path(dir_new)
dir_new.mkdir(exist_ok=True)
if files == "all":
files = set(dir_old.iterdir())
elif files == "input":
files = input_files
elif files == "output":
files = output_files
else: # handles str, list, set
files = set(list(files))
if exceptions:
exceptions = set(list(exceptions))
files = files - exceptions
for f in files:
f = dir_old.joinpath(f)
if f.is_file():
shutil.copy(f, dir_new)
[docs]
class Helpers:
[docs]
def archive(self, tag=None, organize=True, cleanup=False):
""" Archive a run by (separately) gzipping entire vaspdir,
vasp xml file and OUTCAR.
"""
parent_dir = self.directory.parent
if not tag:
tag = self.directory.name
# gunzip OUTCAR, xml and self.directory
gz_names = {
f"outcar": f"OUTCAR.{tag}.gz",
f"xml": f"{tag}.xml.gz",
f"{self.directory}": f"{tag}.tar.gz",
}
with open(self.outcar, "rb") as file_in, gzip.open(
gz_names["outcar"], "wb"
) as file_out:
shutil.copyfileobj(file_in, file_out)
with open(self.xml, "rb") as file_in, gzip.open(
gz_names["xml"], "wb"
) as file_out:
shutil.copyfileobj(file_in, file_out)
with tarfile.open(gz_names[f"{self.directory}"], "w:gz") as tar:
tar.add(self.directory, arcname=tag)
# puts xml & outcar in 'out'and self.directorys in 'runs'
if organize:
out_dir = parent_dir.joinpath("out")
runs_dir = parent_dir.joinpath("runs")
out_dir.mkdir(exist_ok=True)
runs_dir.mkdir(exist_ok=True)
shutil.move(gz_names["outcar"], out_dir)
shutil.move(gz_names["xml"], out_dir)
shutil.move(gz_names[f"{self.directory}"], runs_dir)
# deletes the original unzipped self.directory
if cleanup:
shutil.rmtree(self.directory)
[docs]
def backup(self, files="minimal", backup_dir="run."):
"""Flexible function for backing up calcualtion files.
Parameters
----------
files : str, list
Can be 'minimal' or 'all' in which case a minimal set
of files or all files are backed up, respectively. Can also
be a list of filenames to back up.
"""
minimal_backup = list(minimal_backup_base.copy())
if "vasp_stdout" in self.conf and self.conf["vasp_stdout"] is not None:
minimal_backup.append(self.conf["vasp_stdout"])
if self.kpoints.is_file():
minimal_backup.append("KPOINTS")
if files == "minimal":
files_to_backup = minimal_backup
elif isinstance(files, list):
files_to_backup = files
elif files is None:
files_to_backup = []
backup_dir = backup_dir + "{}"
ind_unused = 1
while self.directory.joinpath(backup_dir.format(ind_unused)).is_dir():
ind_unused += 1
backup_dir = self.directory.joinpath(backup_dir.format(ind_unused))
backup_dir.mkdir()
if files == "all":
for f in self.directory.iterdir():
if f.is_file():
shutil.copy(f, backup_dir)
else:
for f in files_to_backup:
try:
shutil.copy(f, backup_dir)
except FileNotFoundError:
warnings.warn(
"{}: cannot backup {}, file not found".format(
self.directory, f
),
UserWarning,
)
[docs]
def remove_files(self, files, exceptions=None):
remove_files(self.directory, files, exceptions)
[docs]
def copy_files(self, dir_new, files="all", exceptions=None):
copy_files(self.directory, dir_new, files=files, exceptions=exceptions)
[docs]
def suggest_num_bands(self, factor=1.0):
"""Convenience function to compute and set the number of bands.
Uses the same rule as VASP does internally, which looks roughly like
nbands = int(nelectrons/2 + factor*nions/2).
The difference is that this method allows the specification of a scaling
factor for the term proportional to the number of ions. This can be useful
for e.g., transition metals where more bands need to be added sometimes
(factor=2 can be required).
Parameters
----------
factor : float
Multiplicative scaling factor for the number of bands.
Returns
-------
int
The new number of bands.
"""
nelect = self.get_num_valence()
natoms = len(self.atoms)
nbands = int(
max(round(nelect + 2) / 2 + factor * max(natoms // 2, 3), int(0.6 * nelect))
)
npar = self.parameters.get("npar", 1)
nbands = ((nbands + npar - 1) // npar) * npar
self.set(nbands=nbands)
return nbands
[docs]
def suggest_encut(self, factor=1.3):
"""Heuristic for setting the PW cutoff.
Uses the formula factor*max(ENMAX) where the maximum
is taken over all POTCARs involved.
Parameters
----------
factor : float
Multiplicative scaling factor for the eneryg cutoff.
"""
enmax = 0
for _, potcar in self.get_potcars():
enmax = max(read_enmax(potcar), enmax)
enmax *= factor
return encut