"""
Slope Covariance Matrix Generation
----------------------------------
Slope covariance matrix routines for AO systems observing through Von Karmon turbulence. Such matrices have a variety
of uses, though they are especially useful for creating 'tomographic reconstructors' that can reconstruct some 'psuedo'
WFS measurements in a required direction (where there might be an interesting science target but no guide stars),
given some actual measurements in other directions (where the some suitable guide stars are).
.. warning::
This code has been tested qualitatively and seems OK, but needs more rigorous testing.
.. codeauthor::
Andrew Reeves <a.p.reeves@durham.ac.uk>
"""
import multiprocessing
import numpy
import scipy.special
[docs]class CovarianceMatrix(object):
"""
A creator of slope covariance matrices in Von Karmon turbulence, based on the paper by Martin et al, SPIE, 2012.
Given a list of paramters describing an AO WFS system and the atmosphere above the telescope, this class can
compute the covariance matrix between all the WFS measurements. This can support LGS sources that exist at a
finite altitude. When computing the covariance matrix, Python's multiprocessing module is used to spread the work
between different processes and processing cores.
On initialisation, this class performs some initial calculations and parameter sorting. To create the
covariance matrix run the ``make_covariace_matrix`` method. This may take some time depending on your paramters...
Parameters:
n_wfs (int): Number of wavefront sensors present.
pupil_masks (ndarray): A map of the pupil for each WFS which is nx_subaps by ny_subaps. 1 if subap active, 0 if not.
telescope_diameter (float): Diameter of the telescope
subap_diameters (ndarray): The diameter of the sub-apertures for each WFS in metres
gs_altitudes (ndarray): Reciprocal (1/metres) of the Guide star alitude for each WFS
gs_positions (ndarray): X,Y position of each WFS in arcsecs. Array shape (Wfs, 2)
wfs_wavelengths (ndarray): Wavelength each WFS observes
n_layers (int): The number of atmospheric turbulence layers
layer_altitudes (ndarray): The altitude of each turbulence layer in meters
layer_r0s (ndarray): The Fried parameter of each turbulence layer
layer_L0s (ndarray): The outer-scale of each layer in metres
threads (int, optional): Number of processes to use for calculation. default is 1
"""
def __init__(
self, n_wfs, pupil_masks, telescope_diameter, subap_diameters, gs_altitudes, gs_positions, wfs_wavelengths,
n_layers, layer_altitudes, layer_r0s, layer_L0s, threads=1):
self.threads = threads
self.n_wfs = n_wfs
self.subap_diameters = subap_diameters
self.wfs_wavelengths = wfs_wavelengths
self.telescope_diameter = telescope_diameter
self.pupil_masks = pupil_masks
self.gs_altitudes = gs_altitudes
self.gs_positions = gs_positions
self.n_layers = n_layers
self.layer_altitudes = layer_altitudes
self.layer_r0s = layer_r0s
self.layer_L0s = layer_L0s
# print("n_layers: {}".format(n_layers))
self.n_subaps = []
self.total_subaps = 0
for wfs_n in range(n_wfs):
self.n_subaps.append(pupil_masks[wfs_n].sum())
self.total_subaps += self.n_subaps[wfs_n]
self.n_subaps = numpy.array(self.n_subaps, dtype="int")
self.total_subaps = int(self.total_subaps)
[docs] def make_covariance_matrix(self):
"""
Calculate and build the covariance matrix
Returns:
ndarray: Covariance Matrix
"""
# make a list of the positions of the centre of every sub-aperture
# for each WFS un units of metres from the centre of the pupil
self.subap_positions = []
for wfs_n in range(self.n_wfs):
wfs_subap_pos = numpy.array(numpy.where(self.pupil_masks[wfs_n] == 1)).T * self.subap_diameters[wfs_n]
wfs_subap_pos -= self.telescope_diameter/2.
wfs_subap_pos -= self.subap_diameters[wfs_n]/2.
self.subap_positions.append(wfs_subap_pos)
# print("WFS {}, max position: {}m".format(wfs_n, abs(wfs_subap_pos).max()))
# Create a list with n_layers elements, each of which is a list of the WFS meta-subap positions at that altitude
self.subap_layer_positions = []
self.subap_layer_diameters = []
for layer_n, layer_altitude in enumerate(self.layer_altitudes):
subap_n = 0
wfs_pos = []
wfs_subap_diameters = []
for wfs_n in range(self.n_wfs):
# Scale for LGS
if self.gs_altitudes[wfs_n] != 0:
# print("Its an LGS!")
scale_factor = (1 - layer_altitude/self.gs_altitudes[wfs_n])
positions = scale_factor * self.subap_positions[wfs_n].copy()
else:
scale_factor = 1
positions = self.subap_positions[wfs_n].copy()
# translate due to GS position
gs_pos_rad = numpy.array(self.gs_positions[wfs_n]) * numpy.pi/180/3600
# print("GS Positions: {} rad".format(gs_pos_rad))
translation = gs_pos_rad * layer_altitude
# print("Translation: {} m".format(translation))
# print("Max position before translation: {} m".format(abs(positions).max()))
positions += translation
# print("Max position: {} m".format(abs(positions).max()))
wfs_pos.append(positions)
wfs_subap_diameters.append(self.subap_diameters[wfs_n] * scale_factor)
self.subap_layer_diameters.append(wfs_subap_diameters)
self.subap_layer_positions.append(wfs_pos)
if self.threads == 1:
self._make_covariance_matrix()
else:
self._make_covariance_matrix_mp(self.threads)
self.covariance_matrix = mirror_covariance_matrix(self.covariance_matrix)
return self.covariance_matrix
def _make_covariance_matrix(self):
# Now compile the covariance matrix
self.covariance_matrix = numpy.zeros((2 * self.total_subaps, 2 * self.total_subaps)).astype("float32")
for layer_n in range(self.n_layers):
# print("Compute Layer {}".format(layer_n))
subap_ni = 0
for wfs_i in range(self.n_wfs):
subap_nj = 0
# Only loop over upper diagonal of covariance matrix as its symmetrical
for wfs_j in range(wfs_i+1):
cov_xx, cov_yy, cov_xy = wfs_covariance(
self.n_subaps[wfs_i], self.n_subaps[wfs_j],
self.subap_layer_positions[layer_n][wfs_i], self.subap_layer_positions[layer_n][wfs_j],
self.subap_layer_diameters[layer_n][wfs_i], self.subap_layer_diameters[layer_n][wfs_j],
self.layer_r0s[layer_n], self.layer_L0s[layer_n])
subap_ni = self.n_subaps[:wfs_i].sum()
subap_nj = self.n_subaps[:wfs_j].sum()
# Coordinates of the XX covariance
cov_mat_coord_x1 = subap_ni * 2
cov_mat_coord_x2 = subap_ni * 2 + self.n_subaps[wfs_i]
cov_mat_coord_y1 = subap_nj * 2
cov_mat_coord_y2 = subap_nj * 2 + self.n_subaps[wfs_j]
# print("covmat coords: ({}: {}, {}: {})".format(cov_mat_coord_x1, cov_mat_coord_x2, cov_mat_coord_y1, cov_mat_coord_y2))
r0_scale = ((self.wfs_wavelengths[wfs_i] * self.wfs_wavelengths[wfs_j])
/ (8 * numpy.pi**2 * self.subap_layer_diameters[layer_n][wfs_i] * self.subap_layer_diameters[layer_n][wfs_j])
)
self.covariance_matrix[
cov_mat_coord_x1: cov_mat_coord_x2, cov_mat_coord_y1: cov_mat_coord_y2
] += cov_xx * r0_scale
self.covariance_matrix[
cov_mat_coord_x1 + self.n_subaps[wfs_i]: cov_mat_coord_x2 + self.n_subaps[wfs_i],
cov_mat_coord_y1: cov_mat_coord_y2] += cov_xy * r0_scale
self.covariance_matrix[
cov_mat_coord_x1: cov_mat_coord_x2,
cov_mat_coord_y1 + self.n_subaps[wfs_j]: cov_mat_coord_y2 + self.n_subaps[wfs_j]
] += numpy.fliplr(numpy.flipud(cov_xy)) * r0_scale
self.covariance_matrix[
cov_mat_coord_x1 + self.n_subaps[wfs_i]: cov_mat_coord_x2 + self.n_subaps[wfs_i],
cov_mat_coord_y1 + self.n_subaps[wfs_j]: cov_mat_coord_y2 + self.n_subaps[wfs_j]
] += cov_yy * r0_scale
def _make_covariance_matrix_mp(self, threads):
pool = multiprocessing.Pool(threads)
# Now compile the covariance matrix
self.covariance_matrix = numpy.zeros((2 * self.total_subaps, 2 * self.total_subaps), dtype="float32")
for layer_n in range(self.n_layers):
# print("Compute Layer {}".format(layer_n))
args = []
for wfs_i in range(self.n_wfs):
# Only loop over upper diagonal of covariance matrix as its symmetrical
for wfs_j in range(wfs_i+1):
args.append((
self.n_subaps[wfs_i], self.n_subaps[wfs_j],
self.subap_layer_positions[layer_n][wfs_i], self.subap_layer_positions[layer_n][wfs_j],
self.subap_layer_diameters[layer_n][wfs_i], self.subap_layer_diameters[layer_n][wfs_j],
self.layer_r0s[layer_n], self.layer_L0s[layer_n]))
self.cov_mats = pool.map(wfs_covariance_mpwrap, args)
thread_n = 0
for wfs_i in range(self.n_wfs):
for wfs_j in range(wfs_i+1):
cov_xx, cov_yy, cov_xy = self.cov_mats[thread_n]
subap_ni = self.n_subaps[:wfs_i].sum()
subap_nj = self.n_subaps[:wfs_j].sum()
# Coordinates of the XX covariance
cov_mat_coord_x1 = subap_ni * 2
cov_mat_coord_x2 = subap_ni * 2 + self.n_subaps[wfs_i]
cov_mat_coord_y1 = subap_nj * 2
cov_mat_coord_y2 = subap_nj * 2 + self.n_subaps[wfs_j]
# print("covmat coords: ({}: {}, {}: {})".format(cov_mat_coord_x1, cov_mat_coord_x2, cov_mat_coord_y1, cov_mat_coord_y2))
r0_scale = ((self.wfs_wavelengths[wfs_i] * self.wfs_wavelengths[wfs_j])
/ (8 * numpy.pi**2 * self.subap_layer_diameters[layer_n][wfs_i] * self.subap_layer_diameters[layer_n][wfs_j])
)
self.covariance_matrix[
cov_mat_coord_x1: cov_mat_coord_x2, cov_mat_coord_y1: cov_mat_coord_y2
] += cov_xx * r0_scale
self.covariance_matrix[
cov_mat_coord_x1 + self.n_subaps[wfs_i]: cov_mat_coord_x2 + self.n_subaps[wfs_i],
cov_mat_coord_y1: cov_mat_coord_y2] += cov_xy * r0_scale
self.covariance_matrix[
cov_mat_coord_x1: cov_mat_coord_x2,
cov_mat_coord_y1 + self.n_subaps[wfs_j]: cov_mat_coord_y2 + self.n_subaps[wfs_j]
] += numpy.fliplr(numpy.flipud(cov_xy)) * r0_scale
self.covariance_matrix[
cov_mat_coord_x1 + self.n_subaps[wfs_i]: cov_mat_coord_x2 + self.n_subaps[wfs_i],
cov_mat_coord_y1 + self.n_subaps[wfs_j]: cov_mat_coord_y2 + self.n_subaps[wfs_j]
] += cov_yy * r0_scale
thread_n += 1
[docs] def make_tomographic_reconstructor(self, svd_conditioning=0):
"""
Creats a tomohraphic reconstructor from the covariance matrix as in Vidal, 2010.
See the documentation for the function `create_tomographic_covariance_reconstructor` in this module.
Assumes the 1st WFS given is the one for which reconstruction is required to.
Parameters:
svd_conditioning (float): Conditioning for the SVD used in inversion.
Returns:
ndarray: A tomohraphic reconstructor.
"""
self.tomographic_reconstructor = create_tomographic_covariance_reconstructor(
self.covariance_matrix, self.n_subaps[0], svd_conditioning)
return self.tomographic_reconstructor
[docs]def wfs_covariance_mpwrap(args):
return wfs_covariance(*args)
[docs]def wfs_covariance(n_subaps1, n_subaps2, wfs1_positions, wfs2_positions, wfs1_diam, wfs2_diam, r0, L0):
"""
Calculates the covariance between 2 WFSs
Parameters:
n_subaps1 (int): number of sub-apertures in WFS 1
n_subaps2 (int): number of sub-apertures in WFS 1
wfs1_positions (ndarray): Central position of each sub-apeture from telescope centre for WFS 1
wfs2_positions (ndarray): Central position of each sub-apeture from telescope centre for WFS 2
wfs1_diam: Diameter of WFS 1 sub-apertures
wfs2_diam: Diameter of WFS 2 sub-apertures
r0: Fried parameter of turbulence
L0: Outer scale of turbulence
Returns:
slope covariance of X with X , slope covariance of Y with Y, slope covariance of X with Y
"""
xy_seperations = calculate_wfs_seperations(n_subaps1, n_subaps2, wfs1_positions, wfs2_positions)
# print("Min seperation: {}".format(abs(xy_seperations).min()))
cov_xx = compute_covariance_xx(xy_seperations, wfs1_diam, wfs2_diam, r0, L0)
cov_yy = compute_covariance_yy(xy_seperations, wfs1_diam, wfs2_diam, r0, L0)
cov_xy = compute_covariance_xy(xy_seperations, wfs1_diam, wfs2_diam, r0, L0)
return cov_xx, cov_yy, cov_xy
[docs]def calculate_wfs_seperations(n_subaps1, n_subaps2, wfs1_positions, wfs2_positions):
"""
Calculates the seperation between all sub-apertures in two WFSs
Parameters:
n_subaps1 (int): Number of sub-apertures in WFS 1
n_subaps2 (int): Number of sub-apertures in WFS 2
wfs1_positions (ndarray): Array of the X, Y positions of the centre of each sub-aperture with respect to the centre of the telescope pupil
wfs2_positions (ndarray): Array of the X, Y positions of the centre of each sub-aperture with respect to the centre of the telescope pupil
Returns:
ndarray: 2-D Array of sub-aperture seperations
"""
xy_separations = numpy.zeros((n_subaps1, n_subaps2, 2), dtype='float64')
for i, (x1, y1) in enumerate(wfs1_positions):
for j, (x2, y2) in enumerate(wfs2_positions):
xy_separations[i, j] = (x2-x1), (y2-y1)
xy_separations += 1e-20
return xy_separations
# def get_wfs_wfs_covariance(self, ):
def compute_covariance_xx(seperation, subap1_diam, subap2_diam, r0, L0):
x1 = seperation[..., 0] + (subap2_diam - subap1_diam) * 0.5
r1 = numpy.sqrt(x1**2 + seperation[..., 1]**2)
x2 = seperation[..., 0] - (subap2_diam + subap1_diam) * 0.5
r2 = numpy.sqrt(x2**2 + seperation[..., 1]**2)
x3 = seperation[..., 0] + (subap2_diam + subap1_diam) * 0.5
r3 = numpy.sqrt(x3**2 + seperation[..., 1]**2)
Cxx = (-2 * structure_function_vk(r1, r0, L0)
+ structure_function_vk(r2, r0, L0)
+ structure_function_vk(r3, r0, L0)
)
return Cxx
def compute_covariance_yy(seperation, subap1_diam, subap2_diam, r0, L0):
y1 = seperation[..., 1] + (subap2_diam - subap1_diam) * 0.5
r1 = numpy.sqrt(seperation[..., 0]**2 + y1**2)
y2 = seperation[..., 1] - (subap2_diam + subap1_diam) * 0.5
r2 = numpy.sqrt(seperation[..., 0]**2 + y2**2)
y3 = seperation[..., 1] + (subap2_diam + subap1_diam) * 0.5
r3 = numpy.sqrt(seperation[..., 0]**2 + y3**2)
Cyy = (-2 * structure_function_vk(r1, r0, L0)
+ structure_function_vk(r2, r0, L0)
+ structure_function_vk(r3, r0, L0)
)
return Cyy
def compute_covariance_xy(seperation, subap1_diam, subap2_diam, r0, L0):
x1 = seperation[..., 0] + subap1_diam * 0.5
y1 = seperation[..., 1] - subap2_diam * 0.5
r1 = numpy.sqrt(x1**2 + y1**2)
x2 = seperation[..., 0] - subap1_diam * 0.5
y2 = seperation[..., 1] + subap2_diam * 0.5
r2 = numpy.sqrt(x2**2 + y2**2)
x3 = seperation[..., 0] + subap1_diam * 0.5
y3 = seperation[..., 1] + subap2_diam * 0.5
r3 = numpy.sqrt(x3**2 + y3**2)
x4 = seperation[..., 0] - subap1_diam * 0.5
y4 = seperation[..., 1] - subap2_diam * 0.5
r4 = numpy.sqrt(x4**2 + y4**2)
# print("\n",r1, r2, r3, r4)
Cxy = (- structure_function_vk(r1, r0, L0)
- structure_function_vk(r2, r0, L0)
+ structure_function_vk(r3, r0, L0)
+ structure_function_vk(r4, r0, L0)
)
return Cxy
[docs]def structure_function_vk(seperation, r0, L0):
"""
Computes the Von Karmon structure function of atmospheric turbulence
Parameters:
seperation (ndarray, float): float or array of data representing seperations between points
r0 (float): Fried parameter for atmosphere
L0 (float): Outer scale of turbulence
Returns:
ndarray, float: Structure function for seperation(s)
"""
## theoretical structure function
D_vk = ( 0.17253 * (L0 / (r0)) ** (5. / 3.)
* (1 - 2 * numpy.pi ** (5. / 6.) * ((seperation) / L0) ** (5. / 6.)
/ scipy.special.gamma(5. / 6.)
* scipy.special.kv(5. / 6., (2 * numpy.pi * seperation) / L0))
)
return D_vk
[docs]def structure_function_kolmogorov(separation, r0):
'''
Compute the Kolmogorov phase structure function
Parameters:
separation (ndarray, float): float or array of data representing
separations between points
r0 (float): Fried parameter for atmosphere
Returns:
ndarray, float: Structure function for separation(s)
'''
return 6.88 * (separation / r0)**(5. / 3.)
[docs]def calculate_structure_function(phase, nbOfPoint=None, step=None):
'''
Compute the structure function of an 2D array, along the first dimension.
SF defined as sf[j]= < (phase - phase_shifted_by_j)^2 >
Translated from a YAO function.
Parameters:
phase (ndarray, 2d): 2d-array
nbOfPoint (int): final size of the structure function vector. Default is phase.shape[1] / 4
step (int): step in pixel when computing the sf. (step * sampling_phase) gives the sf sampling in meters. Default is 1
Returns:
ndarray, float: values for the structure function of the data.
'''
if nbOfPoint is None:
nbOfPoint = phase.shape[1] / 4
if step is None:
step = 1
step = int(step)
xm = int(numpy.min([nbOfPoint, phase.shape[1] / step - 1]))
sf_x = numpy.empty(xm)
for i in range(step, xm * step, step):
sf_x[int(i / step)] = numpy.mean((phase[0:-i, :] - phase[i:, :])**2)
return sf_x
[docs]def mirror_covariance_matrix(cov_mat):
"""
Mirrors a covariance matrix around the axis of the diagonal.
Parameters:
cov_mat (ndarray): The covariance matrix to mirror
n_subaps (ndarray): Number of sub-aperture in each WFS
"""
return numpy.bitwise_or(cov_mat.view("int32"), cov_mat.T.view("int32")).view("float32")
[docs]def create_tomographic_covariance_reconstructor(covariance_matrix, n_onaxis_subaps, svd_conditioning=0):
"""
Calculates a tomographic reconstructor using the method of Vidal, JOSA A, 2010.
Uses a slope covariance matrix to generate a reconstruction matrix that will convert a collection of measurements
from WFSs observing in a variety of directions (the "off-axis" directions) to the measurements that would have been
observed by a WFS observing in a different direction (the "on-axis" direction.
The given covariance matrix must include the covariance between all WFS measurements,
including the "psuedo" WFS pointing in the on-axis direction.
It is assumed that the covariance of measurements from this on-axis direction are first in the covariance matrix.
To create the tomographic reconstructor it is neccessary to invert the covariance matrix between off-axis WFS
measurements. An SVD based psueod inversion is used for this (`numpy.linalg.pinv`). A conditioning to this SVD may
be required to filter potentially unwanted modes.
Args:
covariance_matrix (ndarray): A covariance matrix between WFSs
n_onaxis_subaps (int): Number of sub-aperture in on-axis WFS
svd_conditioning (float, optional): Conditioning for SVD inversion (default is 0)
Returns:
"""
cov_onoff = covariance_matrix[:2 * n_onaxis_subaps, 2 * n_onaxis_subaps:]
cov_offoff = covariance_matrix[2 * n_onaxis_subaps:, 2 * n_onaxis_subaps:]
icov_offoff = numpy.linalg.pinv(cov_offoff, rcond=svd_conditioning)
tomo_recon = cov_onoff.dot(icov_offoff)
return tomo_recon