Commit 620fa98c authored by Kenneth Loenbaek's avatar Kenneth Loenbaek
Browse files

Added RST2dict

Started sim2dict
parent d9e74704
import numpy as np
from .util import FortranFile_wskip
import os
def XXD2dict(filename, load_names=None, read_default=True, blocks=None, load_ghost=False, as_matrix=False, is2D=None):
"""
......@@ -89,56 +90,83 @@ def XXD2dict(filename, load_names=None, read_default=True, blocks=None, load_gho
data_out[key] = data.reshape((nblock_out,)+mat_size)
return data_out, bsize, nblock_out
def initialize_data_array(nblocks, bsize, dtype=float, vec=False, is2D=False):
exp = 2 if is2D else 3
if vec:
return np.empty([nblocks, 3, bsize ** exp], dtype=dtype)
else:
return np.empty([nblocks, bsize ** exp], dtype=dtype)
def RST2dict(filename, load_names=None, read_default=True, blocks=None, load_ghost=False, as_matrix=False, is2D=None):
if only_names:
data_out = set()
else:
data_out = {}
def RST2dict(filename, load_names=None, read_default=True, blocks=None, load_ghost=False, as_matrix=False,
is2D=False, vectors=None):
# Updating default settings
if load_names is None:
load_names = set()
if load_p_and_vel:
load_names.update(["p", "u", "v", "w"])
if read_default:
load_names.update(["p", "Vel"])
if vectors is None:
vectors = dict()
always_read_data = {"bsize", "nblock"}
# Default vector for velocity
vectors["Vel"] = ["u", "v", "w"]
# Dict containing field -> vector connection
fields_in_vectors = dict()
for name, fields in vectors.items():
for field in fields:
fields_in_vectors[field] = name
bsize = None
nblock = None
data_out = dict()
with FortranFile_wskip(filename, "r") as file:
# Reading header
restart_version, restart_version_type = read_restart_version(file)
read_restart_version(file)
# Creating ouput data lists
# Reading main file data
read_more = True
while (read_more):
# Reading data header (name, data-type, array_type, size)
name, dtype, action, size = read_data_header(file)
# Reading data
data = read_data(file, dtype, action, size, nblock,
(((name in load_names) and not only_names) or (name in always_read_data)))
# Setting bsize, nblock variables and testing for stop read
if name == "bsize":
bsize = data
bsize = read_data(file, dtype, action, size, nblock, True)
# Inialize arrays
ppbs_wghost = bsize + 3 # +3 -> 2 for ghost 1 for vertex and not cell
if load_ghost: # nppb -> points per block side
ppbs = ppbs_wghost
else:
ppbs = bsize + 1
elif name == "nblock":
nblock = data
nblock = read_data(file, dtype, action, size, nblock, True)
if blocks is None:
blocks = range(1, nblock + 1)
nblock_out = len(blocks)
elif name == "stop":
read_more = False
elif (name in load_names) or ((name in fields_in_vectors) and (fields_in_vectors[name] in load_names)):
dtype_np = float if dtype == "REAL" else int
if name in fields_in_vectors: # Reading vector field
vec_name = fields_in_vectors[name]
if vec_name not in data_out:
data_out[vec_name] = initialize_data_array(nblock_out, ppbs, dtype_np, True, is2D)
data_out[vec_name][:,
vectors[vec_name].index(name), :] = read_data(file, dtype, action, size, nblock,
data_out[vec_name][:, vectors[vec_name].index(name), :],
blocks, load_ghost, ppbs, is2D)
else: # Reading scalar field
data_out[name] = read_data(file, dtype, action, size, nblock, None, blocks, load_ghost, ppbs, is2D)
else:
if is_data_field(dtype, action):
if only_names:
data_out.add(name)
elif (data is not None) and (name in load_names):
data_out[name] = data
return data_out
read_data(file, dtype, action, size, nblock, skip=True)
if as_matrix:
for key, data in data_out.items():
mat_size = (ppbs, ppbs) if is2D else (ppbs, ppbs, ppbs)
if len(data.shape) == 3:
data_out[key] = data.reshape((nblock_out, 3)+mat_size)
else:
data_out[key] = data.reshape((nblock_out,)+mat_size)
return data_out, bsize, nblock
def RST2names(filename, name_out="fields"):
""" Reads names in .RST file
......@@ -164,7 +192,7 @@ def RST2names(filename, name_out="fields"):
name, dtype, action, size = read_data_header(file)
# Reading data (only reading nblock)
data = read_data(file, dtype, action, size, nblock, name == "nblock")
data = read_data(file, dtype, action, size, nblock, skip=not name == "nblock")
# Setting bsize, nblock variables and testing for stop read
if name == "stop":
......@@ -185,6 +213,30 @@ def RST2names(filename, name_out="fields"):
data_out.add(name)
return data_out
def sim2dict(filename_RST, filename_XXD=None, load_names=None, read_default=True, blocks=None, load_ghost=False,
as_matrix=False, is2D=None, vectors=None):
# If no grid filename is set
if filename_XXD is None:
proj_name = filename_RST[:filename_XXD.index(".RST")]
if proj_name+".X3D" in os.listdir("."):
filename_XXD = proj_name + ".X3D"
elif proj_name+".X2D" in os.listdir("."):
filename_XXD = proj_name + ".X2D"
else:
raise ValueError("Could not find grid a suitble .XXD file in current dir with project name: %s"%proj_name)
if is2D is None:
if ".X2D" in filename_XXD:
is2D = True
else:
is2D = False
def initialize_data_array(nblocks, bsize, dtype=float, vec=False, is2D=False):
exp = 2 if is2D else 3
if vec:
return np.zeros([nblocks, 3, bsize ** exp], dtype=dtype)
else:
return np.zeros([nblocks, bsize ** exp], dtype=dtype)
def read_restart_version(file):
restart_version = file.read_record("S20")[0].decode().strip()
......@@ -200,64 +252,72 @@ def read_data_header(file):
return name, dtype, action, size
def read_data(file, dtype, action, size, nblock, return_data=True):
data = None
def read_data(file, dtype, action, size, nblock, data=None, blocks=None, load_ghost=False, bsize=0, is2D=False,
skip=False):
bsize_wghost = bsize if load_ghost else bsize+2
# Integer of size 1
if ((dtype == "INTEGER") or (dtype == "LOGICAL")) and (size == 1) and (
action == "BCAST"):
if return_data:
data = file.read_record("I")[0]
else:
if skip:
file.skip_record("I")
else:
data = file.read_record("I")[0]
# Integers with size large than 1 and BCST (common to all processes)
elif ((dtype == "INTEGER") or (dtype == "LOGICAL")) and (action == "BCAST"):
if return_data:
data = file.read_record("i4")
else:
if skip:
file.skip_record("i4")
else:
data = file.read_record("i4")
# Integers with size larger than 1 and SCATTER (from each block)
elif ((dtype == "INTEGER") or (dtype == "LOGICAL")) and (action == "SCATTER"):
if return_data:
data = np.zeros([nblock, size], dtype=int)
if skip:
for iblock in range(nblock):
data[iblock, :] = file.read_record("i4")
file.skip_record("i4")
else:
if data is None:
data = initialize_data_array(len(blocks), bsize, int, False, is2D)
for iblock in range(nblock):
file.skip_record("i4")
if iblock in blocks:
data[blocks.index(iblock), :] = file.read_block("i4", load_ghost, bsize_wghost, is2D)
else:
file.skip_record("i4")
# Float of size 1
elif (dtype == "REAL") and (size == 1) and (action == "BCAST"):
if return_data:
data = file.read_record("f")[0]
else:
if skip:
file.skip_record("f")
else:
data = file.read_record("f")[0]
# Float with size large than 1 and BCST (common to all processes)
elif (dtype == "REAL") and (action == "BCAST"):
if return_data:
data = file.read_record("f8")
else:
if skip:
file.skip_record("f8")
else:
data = file.read_record("f8")
# Float with size larger than 1 and SCATTER (from each block)
elif (dtype == "REAL") and (action == "SCATTER"):
if return_data:
data = np.zeros([nblock, size])
if skip:
for iblock in range(nblock):
data[iblock, :] = file.read_record("f8")
file.skip_record("f8")
else:
if data is None:
data = initialize_data_array(len(blocks), bsize, float, False, is2D)
for iblock in range(nblock):
file.skip_record("f8")
if iblock in blocks:
data[blocks.index(iblock), :] = file.read_block("f8", load_ghost, bsize_wghost, is2D)
else:
file.skip_record("f8")
# Char with set size
elif (dtype == "CHARACTER") and (action == "BCAST"):
if return_data:
data = file.read_record("S%i" % size)[0].decode().strip()
else:
if skip:
file.skip_record("S%i" % size)
else:
data = file.read_record("S%i" % size)[0].decode().strip()
return data
......
......@@ -190,7 +190,136 @@ class TestRST2names(unittest.TestCase):
for name in fields:
self.assertTrue(name in fields_info)
class TestRST2dict(unittest.TestCase):
def test_default(self):
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01")
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock, 3, (bsize+1)**3))
np.testing.assert_equal(RST["p"].shape, (nblock, (bsize+1)**3))
def test_load_ghost(self):
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01", load_ghost=True)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock, 3, (bsize+3)**3))
np.testing.assert_equal(RST["p"].shape, (nblock, (bsize+3)**3))
def test_blocks(self):
blocks = [1, 2]+ list(range(5, 8))
nblock_out = len(blocks)
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01", blocks=blocks)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock_out, 3, (bsize+1)**3))
np.testing.assert_equal(RST["p"].shape, (nblock_out, (bsize+1)**3))
def test_read_non_default_fields(self):
names = {"fa_PJix", "fa_PJiy", "fa_PJiz"}
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01", names,
read_default=False)
self.assertFalse("Vel" in RST)
self.assertFalse("p" in RST)
for name in names:
self.assertTrue(name in RST)
self.assertTrue(RST[name].dtype == float)
np.testing.assert_equal(RST[name].shape, (nblock, (bsize + 1) ** 3))
def test_read_non_default_fields_as_vec(self):
vector = dict(fa_PJi=["fa_PJix", "fa_PJiy", "fa_PJiz"])
names = {"fa_PJi"}
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01", names,
read_default=False, vectors=vector)
self.assertFalse("Vel" in RST)
self.assertFalse("p" in RST)
for name in names:
self.assertTrue(name in RST)
self.assertTrue(RST[name].dtype == float)
np.testing.assert_equal(RST[name].shape, (nblock, 3, (bsize + 1) ** 3))
def test_as_matrix(self):
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01", as_matrix=True)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock, 3, bsize + 1, bsize + 1, bsize + 1))
np.testing.assert_equal(RST["p"].shape, (nblock, bsize + 1, bsize + 1, bsize + 1))
def test_default_2D(self):
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01", is2D=True)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock, 3, (bsize+1)**2))
np.testing.assert_equal(RST["p"].shape, (nblock, (bsize+1)**2))
def test_load_ghost_2D(self):
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01", is2D=True, load_ghost=True)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock, 3, (bsize+3)**2))
np.testing.assert_equal(RST["p"].shape, (nblock, (bsize+3)**2))
def test_blocks_2D(self):
blocks = [1, 2]+ list(range(5, 8))
nblock_out = len(blocks)
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01", blocks=blocks, is2D=True)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock_out, 3, (bsize+1)**2))
np.testing.assert_equal(RST["p"].shape, (nblock_out, (bsize+1)**2))
def test_read_non_default_fields_2D(self):
names = {"flowksi", "flowksi1"}
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01", names,
read_default=False, is2D=True)
self.assertFalse("Vel" in RST)
self.assertFalse("p" in RST)
for name in names:
self.assertTrue(name in RST)
self.assertTrue(RST[name].dtype == float)
np.testing.assert_equal(RST[name].shape, (nblock, (bsize + 1) ** 2))
def test_read_non_default_fields_as_vec_2D(self):
vector = dict(flowksi=["flowksi", "flowksi1"])
names = {"flowksi"}
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01", names,
read_default=False, vectors=vector, is2D=True)
self.assertFalse("Vel" in RST)
self.assertFalse("p" in RST)
for name in names:
self.assertTrue(name in RST)
self.assertTrue(RST[name].dtype == float)
np.testing.assert_equal(RST[name].shape, (nblock, 3, (bsize + 1) ** 2))
def test_as_matrix_2D(self):
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01", as_matrix=True, is2D=True)
self.assertTrue("Vel" in RST)
self.assertTrue("p" in RST)
self.assertTrue(RST["p"].dtype == float)
self.assertTrue(RST["Vel"].dtype == float)
np.testing.assert_equal(RST["Vel"].shape, (nblock, 3, bsize + 1, bsize + 1))
np.testing.assert_equal(RST["p"].shape, (nblock, bsize + 1, bsize + 1))
def test_fail_read_dim(self):
with self.assertRaises(ValueError): # 2D without flag
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("cyl.RST.01")
with self.assertRaises(ValueError): # 3D with 2D flag
RST, bsize, nblock = ellipsys2vtk.ellipsys2dict.RST2dict("ADsim.RST.01", is2D=True)
if __name__ == "__main__":
out = TestX2D2dict()
out.test_read_only_attr()
\ No newline at end of file
out = TestRST2dict()
out.test_default()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment