From 66b4113437061066487c917b434b1572d9f8ab10 Mon Sep 17 00:00:00 2001
From: "Mads M. Pedersen" <mmpe@dtu.dk>
Date: Tue, 7 Feb 2017 10:27:19 +0100
Subject: [PATCH] added subset_mean

---
 wetb/hawc2/simulation_resources.py           |   8 +-
 wetb/utils/cluster_tools/cluster_resource.py |  56 +++--
 wetb/utils/cluster_tools/ssh_client.py       |   8 +-
 wetb/utils/subset_mean.py                    | 208 +++++++++++++++++++
 4 files changed, 245 insertions(+), 35 deletions(-)
 create mode 100644 wetb/utils/subset_mean.py

diff --git a/wetb/hawc2/simulation_resources.py b/wetb/hawc2/simulation_resources.py
index 7ee64a4..dfc386a 100644
--- a/wetb/hawc2/simulation_resources.py
+++ b/wetb/hawc2/simulation_resources.py
@@ -202,9 +202,13 @@ class SimulationThread(Thread):
         
 
 class PBSClusterSimulationResource(SSHPBSClusterResource):
-    def __init__(self, host, username, password, port, min_cpu, min_free, init_cmd, wine_cmd, python_cmd):
-        SSHPBSClusterResource.__init__(self, host, username, password, port, min_cpu, min_free, init_cmd, wine_cmd, python_cmd)
+    def __init__(self, sshclient, min_cpu, min_free, init_cmd, wine_cmd, python_cmd):
+        SSHPBSClusterResource.__init__(self, sshclient, min_cpu, min_free)
+        self.init_cmd = init_cmd
+        self.wine_cmd = wine_cmd
+        self.python_cmd = python_cmd
         
+    
     def is_clean(self):
         return self.execute("find .hawc2launcher/ -type f | wc -l")[1] > 0
 
diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py
index 78ed0f1..d12e3e0 100644
--- a/wetb/utils/cluster_tools/cluster_resource.py
+++ b/wetb/utils/cluster_tools/cluster_resource.py
@@ -11,6 +11,8 @@ import threading
 
 from wetb.utils.cluster_tools import pbswrap
 from wetb.utils.cluster_tools.ssh_client import SSHClient, SharedSSHClient
+from wetb.utils.timing import print_time
+import time
 
 
 def unix_path(path, cwd=None, fail_on_missing=False):
@@ -65,7 +67,7 @@ class Resource(object):
         self.min_cpu = min_cpu
         self.min_free = min_free
         self.acquired = 0
-        self.lock = threading.Lock()
+        self.resource_lock = threading.Lock()
 
     def ok2submit(self):
         """Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus"""
@@ -81,54 +83,54 @@ class Resource(object):
             return False
 
     def acquire(self):
-        with self.lock:
+        with self.resource_lock:
             self.acquired += 1
 
     def release(self):
-        with self.lock:
+        with self.resource_lock:
             self.acquired -= 1
 
 
     def update_status(self):
         try:
-            self.no_cpu, self.cpu_free, self.no_current_process = self.check_resources()
+            self.no_cpu, self.cpu_free, self.used_by_user = self.check_resources()
         except Exception:
             pass
 
 
-class SSHPBSClusterResource(Resource, SSHClient):
+class SSHPBSClusterResource(Resource):
     finished = []
     loglines = {}
     is_executing = []
     
-    def __init__(self, host, username, password, port, min_cpu, min_free, init_cmd, wine_cmd, python_cmd):
+    def __init__(self, sshclient, min_cpu, min_free):
         Resource.__init__(self, min_cpu, min_free)
-        self.init_cmd = init_cmd
-        self.wine_cmd = wine_cmd
-        self.python_cmd = python_cmd
-        self.shared_ssh = SharedSSHClient(host, username, password, port)
-        SSHClient.__init__(self, host, username, password, port=port)
-        self.lock = threading.Lock()
+        self.ssh = sshclient
+        self.resource_lock = threading.Lock()
+
+    @property
+    def host(self):
+        return self.ssh.host
 
 
     def new_ssh_connection(self):
         return SSHClient(self.host, self.username, self.password, self.port)
 
     def check_resources(self):
-        with self.lock:
+        with self.resource_lock:
             try:
-                with self:
-                    _, output, _ = self.execute('pbsnodes -l all')
+                with self.ssh:
+                    _, output, _ = self.ssh.execute('pbsnodes -l all')
                     pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n"))
 
-                    _, output, _ = self.execute('qstat -n1')
-                    users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n"), self.host)
+                    _, output, _ = self.ssh.execute('qstat -n1')
+                    users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n"), self.ssh.host)
 
 
                 # if the user does not have any jobs, this will not exist
                 try:
-                    cpu_user = users[self.username]['cpus']
-                    cpu_user += users[self.username]['Q']
+                    cpu_user = users[self.ssh.username]['cpus']
+                    cpu_user += users[self.ssh.username]['Q']
                 except KeyError:
                     cpu_user = 0
                 cpu_user = max(cpu_user, self.acquired)
@@ -140,13 +142,13 @@ class SSHPBSClusterResource(Resource, SSHClient):
 
 
     def jobids(self, jobname_prefix):
-            _, output, _ = self.execute('qstat -u %s' % self.username)
+            _, output, _ = self.ssh.execute('qstat -u %s' % self.username)
             return [l.split()[0].split(".")[0] for l in output.split("\n")[5:] if l.strip() != "" and l.split()[3].startswith("h2l")]
 
     def stop_pbsjobs(self, jobids):
         if not hasattr(jobids, "len"):
             jobids = list(jobids)
-        self.execute("qdel %s" % (" ".join(jobids)))
+        self.ssh.execute("qdel %s" % (" ".join(jobids)))
         
         
    
@@ -159,17 +161,11 @@ class LocalResource(Resource):
         #self.process_name = process_name
         self.host = 'Localhost'
 
+    
+    
     def check_resources(self):
         import psutil
-        def name(i):
-            try:
-                return psutil.Process(i).name()
-            except (psutil.AccessDenied, psutil.NoSuchProcess):
-                return ""
-
         no_cpu = multiprocessing.cpu_count()
-        cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu
-        #no_current_process = len([i for i in psutil.pids() if name(i) == self.process_name.lower()])
-        #used = max(self.acquired, no_cpu - cpu_free, no_current_process)
+        cpu_free = (1 - psutil.cpu_percent(.1) / 100) * no_cpu
         used = self.acquired
         return no_cpu, cpu_free, used
diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py
index e900a49..c089dbc 100644
--- a/wetb/utils/cluster_tools/ssh_client.py
+++ b/wetb/utils/cluster_tools/ssh_client.py
@@ -132,8 +132,9 @@ class SSHClient(object):
         return self.client
 
     def connect(self):
-#        if self.password is None or self.password == "":
-#             raise IOError("Password not set for %s"%self.host)
+        
+        print ("connect", self.host)
+        #print (traceback.print_stack())
         if self.gateway:
             if self.gateway.interactive_auth_handler:
                 self.tunnel = SSHInteractiveAuthTunnelForwarder(self.gateway.interactive_auth_handler,
@@ -156,7 +157,8 @@ class SSHClient(object):
             self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
             self.client.connect("127.0.0.1", 10022, username=self.username, password=self.password)
 
-                 
+        elif self.password is None or self.password == "":
+            raise IOError("Password not set for %s"%self.host)         
         else:
             self.client = paramiko.SSHClient()
             self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
diff --git a/wetb/utils/subset_mean.py b/wetb/utils/subset_mean.py
new file mode 100644
index 0000000..3cf59ae
--- /dev/null
+++ b/wetb/utils/subset_mean.py
@@ -0,0 +1,208 @@
+'''
+Created on 24/06/2016
+
+@author: MMPE
+'''
+
+import numpy as np
+import unittest
+from wetb.utils.geometry import rpm2rads
+from _collections import deque
+from tables.tests.test_index_backcompat import Indexes2_0TestCase
+
+
+def power_mean(power, trigger_indexes, I, rotor_speed, time, air_density=1.225, rotor_speed_mean_samples=1) :
+    """Calculate the density normalized mean power, taking acceleration of the rotor into account
+
+    Parameters
+    ---------
+    Power : array_like
+        Power [W]
+    trigger_indexes : array_like
+        Trigger indexes
+    I : float
+        Rotor inerti [kg m^2]
+    rotor_speed : array_like
+        Rotor speed [rad/s]
+    time : array_like
+        time [s]
+    air_density : int, float or array_like, optional
+        Air density.
+    rotor_speed_mean_samples : int
+        To reduce the effect of noise, the mean of a number of rotor speed samples can be used
+
+    Returns
+    -------
+    mean power including power used to (de)accelerate rotor
+
+    Examples:
+    ---------
+    turbine_power_mean = lambda power, triggers : power_mean(power, triggers, I=2.5E7, rot_speed, time, rho)
+    trigger_indexes = time_trigger(time,30)
+    wsp_mean, power_mean = subset_mean([wsp, power],trigger_indexes,mean_func={1:turbine_power_mean})
+    """
+    if rotor_speed_mean_samples == 1:
+        rs1 = rotor_speed[trigger_indexes[:-1]]
+        rs2 = rotor_speed[trigger_indexes[1:] - 1]
+    else:
+        rs = np.array([rotor_speed[max(i - rotor_speed_mean_samples, 0):i - 1 + rotor_speed_mean_samples].mean() for i in trigger_indexes])
+        rs1 = rs[:-1]
+        rs2 = rs[1:]
+
+
+    power = np.array([np.nanmean(power[i1:i2], 0) for i1, i2 in zip(trigger_indexes[:-1].tolist(), trigger_indexes[1:].tolist())])
+    if isinstance(air_density, (int, float)):
+        if air_density != 1.225:
+            power = power / air_density * 1.225
+    else:
+        air_density = np.array([np.nanmean(air_density[i1:i2], 0) for i1, i2 in zip(trigger_indexes[:-1].tolist(), trigger_indexes[1:].tolist())])
+        power = power / air_density * 1.225
+    return power + 1 / 2 * I * (rs2 ** 2 - rs1 ** 2) / (time[trigger_indexes[1:] - 1] - time[trigger_indexes[:-1]])
+
+def power_mean_func_kW(I, rotor_speed, time, air_density=1.225, rotor_speed_mean_samples=1) :
+    """Return a power mean function [kW] used to Calculate the density normalized mean power, taking acceleration of the rotor into account
+
+    Parameters
+    ---------
+    I : float
+        Rotor inerti [kg m^2]
+    rotor_speed : array_like
+        Rotor speed [rad/s]
+    time : array_like
+        time [s]
+    air_density : int, float or array_like, optional
+        Air density.
+    rotor_speed_mean_samples : int
+        To reduce the effect of noise, the mean of a number of rotor speed samples can be used
+
+    Returns
+    -------
+    mean power function
+
+    Examples:
+    ---------
+    turbine_power_mean = power_mean_func_kW(power, triggers, I=2.5E7, rot_speed, time, rho)
+    trigger_indexes = time_trigger(time,30)
+    wsp_mean, power_mean = subset_mean([wsp, power],trigger_indexes,mean_func={1:turbine_power_mean})
+    """
+    def mean_power(power, trigger_indexes):
+        return power_mean(power * 1000, trigger_indexes, I, rotor_speed, time , air_density, rotor_speed_mean_samples) / 1000
+    return mean_power
+
+
+def subset_mean(data, trigger_indexes, mean_func={}):
+    if isinstance(data, list):
+        data = np.array(data).T
+    if len(data.shape)==1:
+        no_sensors = 1
+    else:
+        no_sensors = data.shape[1]
+    if isinstance(trigger_indexes[0], tuple):
+        triggers = np.array(trigger_indexes)
+        steps = np.diff(triggers[:, 0])
+        lengths = np.diff(triggers)[:, 0]
+        if np.all(steps == steps[0]) and np.all(lengths == lengths[0]):
+            subset_mean = np.mean(np.r_[data.reshape(data.shape[0],no_sensors),np.empty((steps[0],no_sensors))+np.nan][triggers[0][0]:triggers.shape[0] * steps[0] + triggers[0][0]].reshape(triggers.shape[0], steps[0], no_sensors)[:, :lengths[0]], 1)
+        else:
+            subset_mean = np.array([np.mean(data[i1:i2], 0) for i1, i2 in trigger_indexes])
+        for index, func in mean_func.items():
+            att = data[:, index]
+            subset_mean[:, index] = func(att, trigger_indexes)
+    else:
+        steps = np.diff(trigger_indexes)
+
+        if np.all(steps == steps[0]):
+            #equal distance
+            subset_mean = np.mean(data[trigger_indexes[0]:trigger_indexes[-1]].reshape([ len(trigger_indexes) - 1, steps[0], data.shape[1]]), 1)
+        else:
+            subset_mean = np.array([np.mean(data[i1:i2], 0) for i1, i2 in zip(trigger_indexes[:-1].tolist(), trigger_indexes[1:].tolist())])
+        for index, func in mean_func.items():
+            att = data[:, index]
+            subset_mean[:, index] = func(att, trigger_indexes)
+    if len(data.shape)==1 and len(subset_mean.shape)==2:
+        return subset_mean[:,0]
+    else:
+        return subset_mean
+
+
+def cycle_trigger(values, trigger_value=None, step=1, ascending=True, tolerance=0):
+    values = np.array(values)
+    if trigger_value is None:
+        r = values.max() - values.min()
+        values = (values[:] - r / 2) % r
+        trigger_value = r / 2
+    if ascending:
+        return np.where((values[1:] > trigger_value + tolerance) & (values[:-1] <= trigger_value - tolerance))[0][::step]
+    else:
+        return np.where((values[1:] < trigger_value - tolerance) & (values[:-1] >= trigger_value + tolerance))[0][::step]
+
+def revolution_trigger(values, rpm_dt=None, dmin=5, dmax=10, ):
+    """Return indexes where values are > max(values)-dmin and decreases more than dmax
+    If RPM and time step is provided, triggers steps < time of 1rpm is removed   
+    
+    Parameters
+    ---------
+    values : array_like
+        Position signal (e.g. rotor position)
+    rpm_dt : tuple(array_like, float), optional
+        - rpm : RPM signal
+        - dt : time step between samples
+    dmin : int or float, optional
+        Minimum normal position increase between samples
+    dmax : float or int, optional
+        Maximum normal position increase between samples 
+            
+    
+    Returns
+    -------
+    trigger indexes
+    
+    """
+    
+    values = np.array(values)
+    indexes = np.where((np.diff(values)<-dmax)&(values[:-1]>values.max()-dmax))[0]
+    
+    if rpm_dt is None:
+        return indexes
+    else:
+        index_pairs = []
+        rpm, dt = rpm_dt
+        d_deg = rpm *360/60*dt
+        cum_d_deg = np.cumsum(d_deg)
+        lbound, ubound = values.max()-dmax, values.max()+dmax
+        index_pairs = [(i1,i2) for i1,i2, deg in zip(indexes[:-1], indexes[1:], cum_d_deg[indexes[1:]-1]-cum_d_deg[indexes[:-1]]) 
+                       if deg > lbound and deg<ubound]
+        return index_pairs
+    
+    
+def time_trigger(time, step, start=None, stop=None):
+    if start is None:
+        start = time[0]
+    decimals = int(np.ceil(np.log10(1 / np.nanmin(np.diff(time)))))
+    time = np.round(time - start, decimals)
+
+
+    steps = np.round(np.diff(time), decimals)
+    if np.sum(steps == steps[0])/len(time)>.99: #np.all(steps == steps[0]):
+        # equal time step
+        time = np.r_[time, time[-1] + max(set(steps), key=list(steps).count)]
+    if stop is None:
+        stop = time[~np.isnan(time)][-1]
+    else:
+        stop -= start
+    epsilon = 10 ** -(decimals + 2)
+    return np.where(((time % step < epsilon) | (time % step > step - epsilon)) & (time >= 0) & (time <= stop))[0]
+
+
+def non_nan_index_trigger(sensor, step):
+    trigger = []
+    i = 0
+    nan_indexes = deque(np.where(np.isnan(sensor))[0].tolist() + [len(sensor)])
+    while i + step <= sensor.shape[0]:
+        if i+step<=nan_indexes[0]:
+            trigger.append((i,i+step))
+            i+=step
+        else:
+            i = nan_indexes.popleft()+1
+    return trigger
+
-- 
GitLab