Source code for storq.vasp.resource_manager

import os
import subprocess
import warnings

from storq.tools import getstatusoutput


[docs] class ResourceManager:
[docs] def submit(self, run_file, file_args=""): """ Submit a job through the queue. """ conf = self.conf site = self.siteconf file_args = file_args.strip() script = "#!/bin/bash\n" prerun = conf.get("batch_prerun", None) if prerun: script += prerun + "\n" script += "{0} {1}".format(run_file, file_args) jobname = self.directory.name # Build a list of options to pass to sbatch # these options must be present, all are long options cmdlist = ["{0}".format(site["submit"]["cmd"])] cmdlist.append(site["submit"]["workdir"].format(self.directory)) cmdlist.append(site["submit"]["jobname"].format(jobname)) cmdlist.append(site["submit"]["account"].format(conf["batch_account"])) cmdlist.append(site["submit"]["walltime"].format(conf["batch_walltime"])) # short options that may not be set cmdshort = "" stdout = conf.get("batch_stdout", None) stderr = conf.get("batch_stderr", None) if stdout: cmdshort += " " + site["submit"]["stdout"].format(stdout) if stderr: cmdshort += " " + site["submit"]["stderr"].format(stderr) # Here we handle the node/core, if only a node count was given we use # the max number of cores/node. Note that cores is a short option. nodes = conf.get("batch_nodes", None) cores = conf.get("batch_cores", None) if nodes: cmdlist.append(site["submit"]["nodes"].format(nodes)) if cores: cmdshort += " " + site["submit"]["cores"].format(cores) # add short options to cmdlist cmdlist += cmdshort.split() # parse any remaining options from space separated options string if conf.get("batch_options", None): cmdlist += [option for option in conf["batch_options"].split()] # This is the block that acutally runs sbatch. p = subprocess.Popen( cmdlist, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out, err = p.communicate(script.encode()) if err != b"": raise Exception( "something went wrong in sbatch:\n\n{0}".format(err.decode()) ) # Write jobid and finish, note that the state changes. self.jobid = out.strip().split()[3].decode() self.write_persistence_db(data={"jobid": self.jobid}) self.state = self.QUEUED print("Submitted batch job {0}: {1}".format(self.jobid, jobname)) print("with command: ",*cmdlist,sep=' ')
[docs] def get_job_info(self): """Return job info for the calculation by polling the queue using the appropriate command and the jobid """ site = self.siteconf if self.jobid is None: return None try: status, output, err = getstatusoutput( [site["queue"]["cmd"], site["queue"]["jobid"].format(self.jobid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception: return None err = err.decode() if err != "" and "Invalid job id specified" not in err: print(err) return None output = output.decode().split("\n") output = list(filter(None, output)) # filter out empty strings if len(output) > 1: keys = [key.strip().lower() for key in output[0].split()] # ind = [str(self.jobid) in row for row in output].index(True) row = output[1] vals = [val.strip() for val in row.split()] else: # output contains only the header return None return dict(zip(keys, vals))
[docs] def in_queue(self): """Return True or False if the directory has a job in the queue.""" site = self.siteconf if self.jobid is None: return False else: try: # try to see if jobid is in queue status, output, err = getstatusoutput( [site["queue"]["cmd"], site["queue"]["jobid"].format(self.jobid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception: print(err.decode()) return False if str(self.jobid) in output.decode().split(): return True else: return False
[docs] def cancel(self, options): """Delete job from the queue. options is a space separated string of options passed to scancel. """ if self.in_queue(): cmd = [site["cancel"]["cmd"]] + options.split() + [str(self.jobid)] status, output, err = getstatusoutput( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if status != 0: print(output + err) return status, output return "{} not in queue.".format(self.directory.name)
[docs] def get_ntasks(self): # Find ntasks from SLURM environment using SLURM_TASKS_PER_NODE # Note that SLURM_NTASKS is not always defined site = self.siteconf if site["resource_manager"] == "slurm": string = os.environ["SLURM_TASKS_PER_NODE"] tasks_list = string.split(",") ntasks = "0" for tasks in tasks_list: task1 = tasks.split("(", 1)[0] try: task2 = tasks.split("(", 1)[1].split(")")[0].split("x")[1] task1 = str(int(task2) * int(task1)) except: pass ntasks = int(ntasks) + int(task1) # if hyper threading is on ntasks gives # the number of logical CPUS # ntasks /= site['threads_per_core'] return int(ntasks) else: raise RuntimeError( "get_ntasks not implemented for current resource manager" )