Source code for storq.vasp.helpers

import numpy as np
import os
import shutil

[docs]class Helpers:
[docs] def archive(self, tag=None, organize=True, cleanup=False): """ Archive a run by (separately) gzipping entire vaspdir, vasp xml file and OUTCAR. """ import gzip import tarfile vaspdir = origin = os.path.dirname(vaspdir) if not tag: tag = os.path.basename( # gunzip OUTCAR, xml and vaspdir gz_names = { "outcar": "OUTCAR.{0}.gz".format(tag), "xml": "{0}.xml.gz".format(tag), "vaspdir": "{0}.tar.gz".format(tag), } with open(self.outcar, "rb") as file_in, gz_names["outcar"], "wb" ) as file_out: shutil.copyfileobj(file_in, file_out) with open(os.path.join(vaspdir, "vasprun.xml"), "rb") as file_in, gz_names["xml"], "wb" ) as file_out: shutil.copyfileobj(file_in, file_out) with["vaspdir"], "w:gz") as tar: tar.add(vaspdir, arcname=tag) # puts xml & outcar in 'out'and vaspdirs in 'runs' if organize: outdir = os.path.join(origin, "out") runsdir = os.path.join(origin, "runs") if not os.path.isdir(outdir): os.mkdir(outdir) shutil.move(gz_names["outcar"], outdir) shutil.move(gz_names["xml"], outdir) if not os.path.isdir(runsdir): os.mkdir(runsdir) shutil.move(gz_names["vaspdir"], runsdir) # deletes the original unzipped vaspdir if cleanup:
[docs] def backup(self, include="minimal", dirname="run."): """Flexible function for backing up calcualtion files. Parameters ---------- include : str, list Can be 'minimal' or 'all' in which case a minimal set of files or all files are backed up, respectively. Can also be a list of filenames to back up. """ dirname = dirname + "{}" # figure out which files to backup cwd = os.getcwd() os.chdir( minimal_backup = [ "INCAR", "POSCAR", "OUTCAR", "POTCAR", "vasprun.xml", "storq.db", ] if "vasp_stdout" in self.conf and self.conf["vasp_stdout"] is not None: minimal_backup.append(self.conf["vasp_stdout"]) if os.path.exists(self.kpoints): minimal_backup.append("KPOINTS") if include == "minimal": files_to_backup = minimal_backup elif type(include) == list: files_to_backup = include elif include is None: files_to_backup = [] # do the backup idir = 1 while os.path.isdir(dirname.format(idir)): idir += 1 new_dir = dirname.format(idir) os.mkdir(new_dir) if include == "all": for f in os.listdir("."): if os.path.isfile(f): shutil.copy(f, new_dir) else: for f in files_to_backup: shutil.copy(f, new_dir)
[docs] @staticmethod def remove(rundir, target, exceptions=set()): """ Flexible function for removing calculation files. Note that this is a staticmethod so as to enable cleanup of broken calculation directories where a calculator object cannot be intialized. Parameters ---------- rundir : str The calculation directory from which to remove files. target : str, list Decides which files to remove. Can be 'input', 'output', 'all', 'backup', the name of a single file or a list of filenames. """ input_files = ["INCAR", "POSCAR", "KPOINTS", "POTCAR"] output_files = [ "OUTCAR", "OSZICAR", "vasprun.xml", "IBZKPT", "EIGENVAL", "CHG", "CHGCAR", "WAVECAR", "REPORT", "PCDAT", "CONTCAR", "XDATCAR", "LOCPOT", "DOSCAR", "PROCAR", "PROOUT", "storq.db", ] if isinstance(exceptions, list): exceptions = set(exceptions) elif isinstance(exceptions, str) and exceptions != "last": exceptions = set([exceptions]) if target == "input": for f in set(input_files).difference(exceptions): try: os.remove(os.path.join(rundir, f)) except OSError: pass elif target == "output": for f in set(output_files).difference(exceptions): try: os.remove(os.path.join(rundir, f)) except OSError: pass elif target == "all": import shutil shutil.rmtree(rundir) elif target == "backup": imax = -1 cwd = os.getcwd() os.chdir(rundir) for f in glob(os.path.join(rundir, "run.*")): i = int(f.split(".")[-1]) if i > imax: imax = i if exceptions == "last": iend = imax else: iend = imax + 1 if iend > 1: for i in range(1, iend): if os.path.exists("run.{}".format(i)): shutil.rmtree("run.{}".format(i)) else: continue if exceptions == "last": os.rename("run.{}".format(imax), "run.1") os.chdir(cwd) # user-supplied list of files to remove elif isinstance(target, list): for f in set(target).difference(exceptions): try: os.remove(os.path.join(rundir, f)) except OSError: pass # user-supplied name of file to remove elif isinstance(target, str): for f in set([target]).difference(exceptions): try: os.remove(os.path.join(rundir, f)) except OSError:
[docs] @staticmethod def copy_files(olddir, newdir, include="all", exceptions=[]): """Flexible method for copying VASP calculation files. """ input_files = ["INCAR", "POSCAR", "KPOINTS", "POTCAR"] output_files = [ "OUTCAR", "OSZICAR", "vasprun.xml", "IBZKPT", "EIGENVAL", "CHG", "CHGCAR", "WAVECAR", "REPORT", "PCDAT", "CONTCAR", "XDATCAR", "LOCPOT", "DOSCAR", "PROCAR", "PROOUT", "storq.db", ] if not os.path.isdir(newdir): os.mkdir(newdir) if include == "all": files = os.listdir(olddir) elif include == "input": files = input_files elif include == "output": files = output_files for f in files: f = os.path.join(olddir, f) if os.path.isfile(f) and f not in exceptions:
shutil.copy(f, newdir)