Skip to content
Snippets Groups Projects
cluster_resource.py 3.57 KiB
Newer Older
'''
Created on 04/04/2016

@author: MMPE
'''
import multiprocessing
mads's avatar
mads committed
import threading

mads's avatar
mads committed

mads's avatar
mads committed
from wetb.utils.cluster_tools import pbswrap
mads's avatar
mads committed
from wetb.utils.cluster_tools.ssh_client import SSHClient, SharedSSHClient
mads's avatar
mads committed


class Resource(object):
mads's avatar
mads committed

    def __init__(self, min_cpu, min_free):
        self.min_cpu = min_cpu
        self.min_free = min_free
mads's avatar
mads committed
        self.acquired = 0
        self.lock = threading.Lock()
mads's avatar
mads committed

    def ok2submit(self):
        """Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus"""
mads's avatar
mads committed
        try:
            total, free, user = self.check_resources()
        except:
            return False
mads's avatar
mads committed

        if user < self.min_cpu:
            return True
        elif free > self.min_free:
            return True
        else:
            return False
mads's avatar
mads committed
    def acquire(self):
        with self.lock:
            self.acquired += 1

    def release(self):
        with self.lock:
            self.acquired -= 1
mads's avatar
mads committed
class SSHPBSClusterResource(Resource, SSHClient):
mads's avatar
mads committed
    def __init__(self, host, username, password, port, min_cpu, min_free):
        Resource.__init__(self, min_cpu, min_free)
mads's avatar
mads committed
        self.shared_ssh = SharedSSHClient(host, username, password, port)
        SSHClient.__init__(self, host, username, password, port=port)
mads's avatar
mads committed
        self.lock = threading.Lock()
mads's avatar
mads committed

    def new_ssh_connection(self):
        return SSHClient(self.host, self.username, self.password, self.port)

    def check_resources(self):
mads's avatar
mads committed
        with self.lock:
            try:
                with self:
                    _, output, _ = self.execute('pbsnodes -l all')
                    pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n"))

                    _, output, _ = self.execute('qstat -n1')
                    users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n"))

mads's avatar
mads committed
                # if the user does not have any jobs, this will not exist
                try:
                    cpu_user = users[self.username]['cpus']
                    cpu_user += users[self.username]['Q']
                except KeyError:
                    cpu_user = 0
                cpu_user = max(cpu_user, self.acquired)
                cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes)
mads's avatar
mads committed
                return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user
mads's avatar
mads committed
            except Exception as e:
mads's avatar
mads committed
                raise EnvironmentError("check resources failed")
mads's avatar
mads committed
    def jobids(self, jobname_prefix):
            _, output, _ = self.execute('qstat -u %s' % self.username)
            return [l.split()[0].split(".")[0] for l in output.split("\n")[5:] if l.strip() != "" and l.split()[3].startswith("h2l")]
mads's avatar
mads committed
    def stop_pbsjobs(self, jobids):
        if not hasattr(jobids, "len"):
            jobids = list(jobids)
        self.execute("qdel %s" % (" ".join(jobids)))
mads's avatar
mads committed

mads's avatar
mads committed
class LocalResource(Resource):
    def __init__(self, process_name):
mads's avatar
mads committed
        N = max(1, multiprocessing.cpu_count() / 2)
        Resource.__init__(self, N, multiprocessing.cpu_count())
mads's avatar
mads committed
        self.process_name = process_name
        self.host = 'Localhost'

    def check_resources(self):
mads's avatar
mads committed
        import psutil
mads's avatar
mads committed
        def name(i):
            try:
mads's avatar
mads committed
                return psutil.Process(i).name()
            except (psutil.AccessDenied, psutil.NoSuchProcess):
mads's avatar
mads committed

        no_cpu = multiprocessing.cpu_count()
mads's avatar
mads committed
        cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu
        no_current_process = len([i for i in psutil.pids() if name(i).lower().startswith(self.process_name.lower())])
mads's avatar
mads committed
        used = max(self.acquired, no_cpu - cpu_free, no_current_process)
        return no_cpu, cpu_free, used