Source code for storq.vasp.relaxer

import os
import subprocess
import shutil
import tarfile
import warnings

import numpy as np

import storq.vasp.readers as readers
from storq.tools import getstatusoutput
from storq.vasp.files import minimal_backup_base
from storq.vasp.state_engine import StateEngine


def _backup_files(fnames, backup):
    """Backup a given list of files."""
    idir = 1
    while os.path.isdir("run." + str(idir)):
        idir += 1
    new_dir = "run." + str(idir)
    os.mkdir(new_dir)
    if backup == "all":
        for f in os.listdir("."):
            if os.path.isfile(f):
                shutil.copy(f, new_dir)
    else:
        for f in fnames:
            try:
                shutil.copy(f, new_dir)
            except FileNotFoundError as e:
                warnings.warn(
                    "{}: cannot backup {}, file not found".format(os.getcwd(), f),
                    UserWarning,
                )


[docs]class Relaxer:
[docs] def relax( self, max_runs=10, backup=None, switch_opt=False, switch_tol_favg=None, switch_tol_fmax=None, ): """Succesively run vasp until convergence. Parameters ---------- max_runs : int Maximum number of vasp restarts allowed. backup : list, str Determines which files are backed-up during successive runs. Can be all, minimal or a list of files to backup. switch_opt : bool Toggle automatic switching from conjugate gradeint (ibrion=2) to quasi-newton (ibrion=1). switch_tol_favg: float Average force threshold below which quasi-newton is switched on. switch_tol_fmax: float Maximum force threshold below which quasi-newton is switched on. """ with StateEngine( self, run_method="_relax", max_runs=max_runs, backup=backup, switch_opt=switch_opt, switch_tol_favg=switch_tol_favg, switch_tol_fmax=switch_tol_fmax, ):
pass def _relax( self, max_runs=10, backup=None, switch_opt=False, switch_tol_favg=None, switch_tol_fmax=None, ): conf = self.conf site = self.siteconf # If we get here we are preparing for running. self.write_input() self.write_persistence_db(self.atoms) # If we are on a compute node or using run mode we should run. if conf["vasp_mode"] == "run" or ( site and site["env"]["submitdir"] in os.environ ): cwd = os.getcwd() os.chdir(self.directory) if conf.get("vasp_stdout", "vasp.out"): with open(conf["vasp_stdout"], "wb") as fout: retcode = self._run_and_backup( max_runs=max_runs, backup=backup, file_out=fout, switch_opt=switch_opt, switch_tol_favg=switch_tol_favg, switch_tol_fmax=switch_tol_fmax, ) # if we are on a node we should return here if ("env" in site) and (site["env"]["nodelist"] in os.environ): os.chdir(cwd) return retcode else: # we are in cmd on a local computer and should parse # move back up and parse os.chdir(cwd) if self.has_valid_outcar() is True: self.read_results() return retcode else: raise RuntimeError("Vasp was aborted or encountered an error") # If we are on a login node we should submit. elif conf["vasp_mode"] == "queue": cwd = os.getcwd() vaspdir = self.directory # Send argument to the python relaxer script using argsparse. file_args = "{0} {1} {2}".format(cwd, vaspdir, max_runs) if switch_opt == True: file_args += " --switch_opt" if switch_tol_favg is not None: file_args += " --switch_tol_favg {}".format(switch_tol_favg) if switch_tol_fmax is not None: file_args += " --switch_tol_fmax {}".format(switch_tol_fmax) if backup is not None: if type(backup) == list: file_args += " --backup {0} ".format(" ".join(backup)) else: file_args += " --backup {0}".format(backup) # Do the actual submission. self.submit("relax.py", file_args) def _run_and_backup( self, max_runs=None, backup=None, file_out=None, switch_opt=False, switch_tol_favg=None, switch_tol_fmax=None, ): """Support function for relax(). Parameters ---------- max_runs : int Maximum number of vasp restarts allowed. backup : list, str Determines which files are backed-up during successive runs. Can be all, minimal or a list of files to backup. file_out : File File to which VASP will write its stdout and stderr. """ conf = self.conf vasp_runcmd = self._get_runcmd() isif = self.parameters.get("isif", None) # figure out which files to backup minimal_backup = list(minimal_backup_base.copy()) minimal_backup.append(conf.get("vasp_stdout", "vasp.out")) if self.kpoints.is_file(): minimal_backup.append("KPOINTS") if backup == "minimal": fnames = minimal_backup elif type(backup) == list: fnames = backup else: fnames = [] # run one time first unless OUTCAR already exists if not self.outcar.is_file(): retcode = getstatusoutput(vasp_runcmd, stdout=file_out, stderr=file_out) # check exit state and convergence if not self.has_valid_outcar(): raise RuntimeError("Vasp was aborted or encountered an error") else: converged = self.has_converged() if isif in range(3, 7): nstep = readers.read_num_ionic_steps() if nstep == 1 and converged: return retcode elif isif == 2: if converged: return retcode else: if converged: return retcode else: if (conf.get("vasp_convergence", None) == "strict"): raise RuntimeError("Vasp did not converge") else: warnings.warn( "{0}: Calculation is not converged".format( self.directory ), UserWarning, ) max_runs -= 1 # relax until convergence main loop for itrial in range(max_runs): # do the backup if backup: _backup_files(fnames, backup) # update positions os.rename("CONTCAR", "POSCAR") # optional: switch optimization algorithm to QN (ibrion=1) if using CG (ibrion=2) ibrion = self.parameters.get("ibrion", None) if switch_opt and ibrion == 2: try: forces = self.read_forces() except Exception as e: raise e favg = np.mean(np.linalg.norm(forces, axis=1)) fmax = np.amax(np.linalg.norm(forces, axis=1)) switch_tol_favg_default = 0.15 switch = False if [switch_tol_favg, switch_tol_fmax] == [None, None]: if favg < switch_tol_favg_default: switch = True elif switch_tol_favg is None: if fmax < switch_tol_fmax: switch = True elif switch_tol_fmax is None: if favg < switch_tol_favg: switch = True else: if fmax < switch_tol_fmax and favg < switch_tol_favg: switch = True if switch: self.parameters["ibrion"] = 1 self.write_incar() # rerun, wipe output files beforehand file_out.seek(0) file_out.truncate(0) retcode = getstatusoutput(vasp_runcmd, stdout=file_out, stderr=file_out) # check if vasp finished and converged if not self.has_valid_outcar(): raise RuntimeError("Vasp was aborted or encountered an error") else: converged = self.has_converged() if isif in range(3, 7): nstep = readers.read_num_ionic_steps() if nstep == 1 and converged: return retcode elif isif == 2: if converged: return retcode else: if converged: return retcode else: if conf.get("vasp_convergence", None) == "strict": raise RuntimeError("Vasp did not converge") else: warnings.warn( "{0}: Calculation is not converged".format( self.directory ), UserWarning, ) # if we get here vasp finished but didn't converge so we warn warnings.warn( "{0}: Calculation is not converged".format(self.directory), UserWarning ) if isif in range(3, 7): warnings.warn( "{0}: Cell relaxation finished with more than one ionic step".format( self.directory ) )
return retcode