Source code for storq.vasp.relaxer

import gzip
import os
import subprocess
import shutil
import tarfile
import warnings
import storq.vasp.readers as readers
from storq.tools import getstatusoutput


def _backup_files(fnames, backup):
    """Backup a given list of files."""
    idir = 1
    while os.path.isdir("run." + str(idir)):
        idir += 1
    new_dir = "run." + str(idir)
    os.mkdir(new_dir)
    if backup == "full":
        for f in os.listdir("."):
            if os.path.isfile(f):
                shutil.copy(f, new_dir)
    else:
        for f in fnames:
            shutil.copy(f, new_dir)


[docs]class Relaxer:
[docs] def relax(self, max_runs=10, backup=None): """Succesively run vasp until convergence. Parameters ---------- max_runs : int Maximum number of vasp restarts allowed. backup : list, str Determines which files are backed-up during successive runs. Can be all, minimal or a list of files to backup. """ conf = self.conf site = self.siteconf if not self.calculation_required(): if self.state in [self.RUNNING, self.QUEUED]: return None if self.parameters.get("isif", None) in range(3, 7): if self.get_number_of_ionic_steps() == 1: if self.state != self.PARSED: self.read_results() return else: if conf.get("vasp_restart_on", None) is None: warnings.warn( "WARNING: {0}: Cell relaxation finished with more than one ionic step".format( self.directory ), UserWarning, ) if self.state != self.PARSED: self.read_results() return else: if self.state != self.PARSED: self.read_results() return # if we get here we are preparing for running self.write_input() # if we are on a node or using run mode we should just run if conf["vasp_mode"] == "run" or ( site and site["env"]["submitdir"] in os.environ ): cwd = os.getcwd() os.chdir(self.directory) if conf.get("vasp_stdout", "vasp.out"): with open(conf["vasp_stdout"], "wb") as fout: retcode = self._run_and_backup(max_runs=max_runs, backup=backup, file_out=fout) # if we are on a node we should return here if ("env" in site) and (site["env"]["nodelist"] in os.environ): os.chdir(cwd) return retcode else: # we are in cmd on a local computer and should parse # move back up and parse os.chdir(cwd) if self.check_outcar() is True: self.read_results() return retcode else: raise RuntimeError("Vasp was aborted or encountered an error") elif conf["vasp_mode"] == "queue": # submit the job cwd = os.getcwd() vaspdir = self.directory # handle the args to python relaxer script file_args = "{0} {1} {2}".format(cwd, vaspdir, max_runs) if backup is not None: if type(backup) == list: file_args += " --backup {0} ".format(" ".join(backup)) else: file_args += " --backup {0}".format(backup) # do the actual submission
self.submit("relax.py", file_args) def _run_and_backup(self, max_runs=None, backup=None, file_out=None): """Support function for relax(). Parameters ---------- max_runs : int Maximum number of vasp restarts allowed. backup : list, str Determines which files are backed-up during successive runs. Can be all, minimal or a list of files to backup. file_out : File File to which VASP will write its stdout and stderr. """ conf = self.conf vasp_runcmd = self.get_runcmd() isif = self.parameters.get("isif", None) # figure out which files to backup minimal_backup = [ "INCAR", "POSCAR", "CONTCAR", "OUTCAR", "POTCAR", "vasprun.xml", "storq.db", ] minimal_backup.append(conf.get("vasp_stdout", "vasp.out")) if os.path.exists(self.kpoints): minimal_backup.append("KPOINTS") if backup == "minimal": fnames = minimal_backup elif type(backup) == list: fnames = backup elif backup is None: fnames = [] # run one time first unless OUTCAR already exists if not os.path.exists(self.outcar): retcode = getstatusoutput(vasp_runcmd, stdout=file_out, stderr=file_out) # check exit state and convergence if not self.check_outcar(): raise RuntimeError("Vasp was aborted or encountered an error") else: nstep = readers.get_number_of_ionic_steps() if conf.get("convergence", "basic"): convergence = self.check_convergence() if nstep == 1 and convergence: return retcode else: if nstep == 1: return retcode max_runs -= 1 # relax until convergence main loop for itrial in range(max_runs): # do the backup if backup: _backup_files(fnames, backup) # update positions os.rename("CONTCAR", "POSCAR") # optional: switch optimization algorithm if conf.get("vasp_relaxer_smart", False): ibrion = self.parameters.get(ibrion, None) try: forces = self.read_forces() except Exception as exc: print(exc) continue fmax_tol = conf.get("vasp_relaxer_smart_fmax", None) favg_tol = conf.get("vasp_relaxer_smart_favg", None) favg = np.mean(np.linalg.norm(forces, axis=1)) fmax = np.amax(np.linalg.norm(forces, axis=1)) if ibrion == 2: if fmax_tol and fmax > fmax_tol: continue if favg_tol and favg > favg_tol: continue self.parameters["ibrion"] = 1 self.write_input() # rerun, wipe output files beforehand file_out.seek(0) file_out.truncate(0) retcode = getstatusoutput(vasp_runcmd, stdout=file_out, stderr=file_out) # check if vasp finished and converged if not self.check_outcar(): raise RuntimeError("Vasp was aborted or encountered an error") else: convergence = self.check_convergence() if isif in range(3, 7): nstep = readers.get_number_of_ionic_steps() if nstep == 1 and convergence: return retcode else: if convergence: return retcode # if we get here vasp finished but didn't converge so we warn warnings.warn( "WARNING: {0}: Calculation is not converged".format(self.directory), UserWarning, ) if isif in range(3, 7): warnings.warn( "WARNING: {0}: Cell relaxation finished with more than one ionic step".format( self.directory ) )
return retcode