Source code for storq.vasp.relaxer

import gzip
import os
import subprocess
import shutil
import tarfile
import warnings

import numpy as np

import storq.vasp.readers as readers
from storq.tools import getstatusoutput


def _backup_files(fnames, backup):
    """Backup a given list of files."""
    idir = 1
    while os.path.isdir("run." + str(idir)):
        idir += 1
    new_dir = "run." + str(idir)
    os.mkdir(new_dir)
    if backup == "all":
        for f in os.listdir("."):
            if os.path.isfile(f):
                shutil.copy(f, new_dir)
    else:
        for f in fnames:
            try:
                shutil.copy(f, new_dir)
            except FileNotFoundError as e:
                warnings.warn(
                    "WARNING: {}: cannot backup {}, file not found".format(os.getcwd(), f),
                    UserWarning,
                )


[docs]class Relaxer:
[docs] def relax(self, max_runs=10, backup=None, switch_opt=False, switch_tol_favg=None, switch_tol_fmax=None): """Succesively run vasp until convergence. Parameters ---------- max_runs : int Maximum number of vasp restarts allowed. backup : list, str Determines which files are backed-up during successive runs. Can be all, minimal or a list of files to backup. switch_opt : bool Toggle automatic switching from conjugate gradeint (ibrion=2) to quasi-newton (ibrion=1). switch_tol_favg: float Average force threshold below which quasi-newton is switched on. switch_tol_fmax: float Maximum force threshold below which quasi-newton is switched on. """ conf = self.conf site = self.siteconf if not self.calculation_required(): if self.state in [self.RUNNING, self.QUEUED]: return None if self.parameters.get("isif", None) in range(3, 7): if self.get_num_ionic_steps() == 1: if self.state != self.PARSED: self.read_results() return else: if conf.get("vasp_restart_on", None) is None: warnings.warn( "WARNING: {0}: Cell relaxation finished with more than one ionic step".format( self.directory ), UserWarning, ) if self.state != self.PARSED: self.read_results() return else: if self.state != self.PARSED: self.read_results() return # if we get here we are preparing for running self.write_input() # if we are on a node or using run mode we should just run if conf["vasp_mode"] == "run" or ( site and site["env"]["submitdir"] in os.environ ): cwd = os.getcwd() os.chdir(self.directory) if conf.get("vasp_stdout", "vasp.out"): with open(conf["vasp_stdout"], "wb") as fout: retcode = self._run_and_backup(max_runs=max_runs, backup=backup, file_out=fout, switch_opt=switch_opt, switch_tol_favg=switch_tol_favg, switch_tol_fmax=switch_tol_fmax) # if we are on a node we should return here if ("env" in site) and (site["env"]["nodelist"] in os.environ): os.chdir(cwd) return retcode else: # we are in cmd on a local computer and should parse # move back up and parse os.chdir(cwd) if self.has_valid_outcar() is True: self.read_results() return retcode else: raise RuntimeError("Vasp was aborted or encountered an error") elif conf["vasp_mode"] == "queue": # submit the job cwd = os.getcwd() vaspdir = self.directory # Send argument to the python relaxer script using argsparse file_args = "{0} {1} {2}".format(cwd, vaspdir, max_runs) if switch_opt == True: file_args += " --switch_opt" if switch_tol_favg is not None: file_args += " --switch_tol_favg {}".format(switch_tol_favg) if switch_tol_fmax is not None: file_args += " --switch_tol_fmax {}".format(switch_tol_fmax) if backup is not None: if type(backup) == list: file_args += " --backup {0} ".format(" ".join(backup)) else: file_args += " --backup {0}".format(backup) # Do the actual submission.
self.submit("relax.py", file_args) def _run_and_backup(self, max_runs=None, backup=None, file_out=None, switch_opt=False, switch_tol_favg=None, switch_tol_fmax=None): """Support function for relax(). Parameters ---------- max_runs : int Maximum number of vasp restarts allowed. backup : list, str Determines which files are backed-up during successive runs. Can be all, minimal or a list of files to backup. file_out : File File to which VASP will write its stdout and stderr. """ conf = self.conf vasp_runcmd = self.get_runcmd() isif = self.parameters.get("isif", None) # figure out which files to backup minimal_backup = self.minimal_backup_base.copy() minimal_backup.append(conf.get("vasp_stdout", "vasp.out")) if os.path.exists(self.kpoints): minimal_backup.append("KPOINTS") if backup == "minimal": fnames = minimal_backup elif type(backup) == list: fnames = backup else: fnames = [] # run one time first unless OUTCAR already exists if not os.path.exists(self.outcar): retcode = getstatusoutput(vasp_runcmd, stdout=file_out, stderr=file_out) # check exit state and convergence if not self.has_valid_outcar(): raise RuntimeError("Vasp was aborted or encountered an error") else: converged = self.has_converged() if isif in range(3, 7): nstep = readers.read_num_ionic_steps() if nstep == 1 and converged: return retcode else: if converged: return retcode else: if conf.get("vasp_convergence", None) == "strict": raise RunTimeError("Vasp did not converge") else: warnings.warn( "WARNING: {0}: Calculation is not converged".format(self.directory), UserWarning, ) max_runs -= 1 # relax until convergence main loop for itrial in range(max_runs): # do the backup if backup: _backup_files(fnames, backup) # update positions os.rename("CONTCAR", "POSCAR") # optional: switch optimization algorithm to QN (ibrion=1) if using CG (ibrion=2) ibrion = self.parameters.get("ibrion", None) if switch_opt and ibrion == 2: try: forces = self.read_forces() except Exception as e: raise e favg = np.mean(np.linalg.norm(forces, axis=1)) fmax = np.amax(np.linalg.norm(forces, axis=1)) switch_tol_favg_default = 0.15 switch = False if [switch_tol_favg, switch_tol_fmax] == [None, None]: if favg < switch_tol_favg_default: switch = True elif switch_tol_favg is None: if fmax < switch_tol_fmax: switch = True elif switch_tol_fmax is None: if favg < switch_tol_favg: switch = True else: if fmax < switch_tol_fmax and favg < switch_tol_favg: switch = True if switch: self.parameters["ibrion"] = 1 self.write_incar() # rerun, wipe output files beforehand file_out.seek(0) file_out.truncate(0) retcode = getstatusoutput(vasp_runcmd, stdout=file_out, stderr=file_out) # check if vasp finished and converged if not self.has_valid_outcar(): raise RuntimeError("Vasp was aborted or encountered an error") else: converged = self.has_converged() if isif in range(3, 7): nstep = readers.read_num_ionic_steps() if nstep == 1 and converged: return retcode else: if converged: return retcode else: if conf.get("vasp_convergence", None) == "strict": raise RunTimeError("Vasp did not converge") else: warnings.warn( "WARNING: {0}: Calculation is not converged".format(self.directory), UserWarning, ) # if we get here vasp finished but didn't converge so we warn warnings.warn( "WARNING: {0}: Calculation is not converged".format(self.directory), UserWarning, ) if isif in range(3, 7): warnings.warn( "WARNING: {0}: Cell relaxation finished with more than one ionic step".format( self.directory ) )
return retcode