import os
import subprocess
import warnings
from storq.tools import getstatusoutput
[docs]class ResourceManager:
[docs] def submit(self, run_file, file_args=""):
""" Submit a job through the queue.
"""
conf = self.conf
site = self.siteconf
file_args = file_args.strip()
script = "#!/bin/bash\n"
prerun = conf.get("batch_prerun", None)
if prerun:
script += prerun + "\n"
script += "{0} {1}".format(run_file, file_args)
jobname = self.directory.name
# Build a list of options to pass to sbatch
# these options must be present, all are long options
cmdlist = ["{0}".format(site["submit"]["cmd"])]
cmdlist.append(site["submit"]["workdir"].format(self.directory))
cmdlist.append(site["submit"]["jobname"].format(jobname))
cmdlist.append(site["submit"]["account"].format(conf["batch_account"]))
cmdlist.append(site["submit"]["walltime"].format(conf["batch_walltime"]))
# short options that may not be set
cmdshort = ""
stdout = conf.get("batch_stdout", None)
stderr = conf.get("batch_stderr", None)
if stdout:
cmdshort += " " + site["submit"]["stdout"].format(stdout)
if stderr:
cmdshort += " " + site["submit"]["stderr"].format(stderr)
# Here we handle the node/core, if only a node count was given we use
# the max number of cores/node. Note that cores is a short option.
nodes = conf.get("batch_nodes", None)
cores = conf.get("batch_cores", None)
if nodes:
cmdlist.append(site["submit"]["nodes"].format(nodes))
if cores:
cmdshort += " " + site["submit"]["cores"].format(cores)
# add short options to cmdlist
cmdlist += cmdshort.split()
# parse any remaining options from space separated options string
if conf.get("batch_options", None):
cmdlist += [option for option in conf["batch_options"].split()]
# This is the block that acutally runs sbatch.
p = subprocess.Popen(
cmdlist,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = p.communicate(script.encode())
if err != b"":
raise Exception(
"something went wrong in sbatch:\n\n{0}".format(err.decode())
)
# Write jobid and finish, note that the state changes.
self.jobid = out.strip().split()[3].decode()
self.write_persistence_db(data={"jobid": self.jobid})
self.state = self.QUEUED
print("Submitted batch job {0}: {1}".format(self.jobid, jobname))
print("with command: ",*cmdlist,sep=' ')
[docs] def get_job_info(self):
"""Return job info for the calculation by
polling the queue using the appropriate command
and the jobid """
site = self.siteconf
if self.jobid is None:
return None
try:
status, output, err = getstatusoutput(
[site["queue"]["cmd"], site["queue"]["jobid"].format(self.jobid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception:
return None
err = err.decode()
if err != "" and "Invalid job id specified" not in err:
print(err)
return None
output = output.decode().split("\n")
output = list(filter(None, output)) # filter out empty strings
if len(output) > 1:
keys = [key.strip().lower() for key in output[0].split()]
# ind = [str(self.jobid) in row for row in output].index(True)
row = output[1]
vals = [val.strip() for val in row.split()]
else: # output contains only the header
return None
return dict(zip(keys, vals))
[docs] def in_queue(self):
"""Return True or False if the directory has a job in the queue."""
site = self.siteconf
if self.jobid is None:
return False
else:
try: # try to see if jobid is in queue
status, output, err = getstatusoutput(
[site["queue"]["cmd"], site["queue"]["jobid"].format(self.jobid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception:
print(err.decode())
return False
if str(self.jobid) in output.decode().split():
return True
else:
return False
[docs] def cancel(self, options):
"""Delete job from the queue.
options is a space separated string of options passed to scancel.
"""
if self.in_queue():
cmd = [site["cancel"]["cmd"]] + options.split() + [str(self.jobid)]
status, output, err = getstatusoutput(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if status != 0:
print(output + err)
return status, output
return "{} not in queue.".format(self.directory.name)
[docs] def get_ntasks(self):
# Find ntasks from SLURM environment using SLURM_TASKS_PER_NODE
# Note that SLURM_NTASKS is not always defined
site = self.siteconf
if site["resource_manager"] == "slurm":
string = os.environ["SLURM_TASKS_PER_NODE"]
tasks_list = string.split(",")
ntasks = "0"
for tasks in tasks_list:
task1 = tasks.split("(", 1)[0]
try:
task2 = tasks.split("(", 1)[1].split(")")[0].split("x")[1]
task1 = str(int(task2) * int(task1))
except:
pass
ntasks = int(ntasks) + int(task1)
# if hyper threading is on ntasks gives
# the number of logical CPUS
# ntasks /= site['threads_per_core']
return int(ntasks)
else:
raise RuntimeError(
"get_ntasks not implemented for current resource manager"
)