Skip to content

Commit

Permalink
Merge pull request #37 from msrosenberg/MSR-develop
Browse files Browse the repository at this point in the history
permutation tests for correlograms
  • Loading branch information
msrosenberg authored Feb 19, 2021
2 parents a80d010 + 1d2af09 commit fdeb6d0
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 119 deletions.
10 changes: 5 additions & 5 deletions pyssage/anisotropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def bearing_analysis(data: numpy.ndarray, distances: numpy.ndarray, angles: nump
for a in range(nbearings):
test_angle = a * angle_width
b_matrix = distances * numpy.square(numpy.cos(angles - test_angle))
r, p_value, _, _, _, rand_p, _ = pyssage.mantel.mantel(data, b_matrix, [], npermutations)
r, p_value, _, _, _, rand_p,_, _ = pyssage.mantel.mantel(data, b_matrix, [], npermutations)
if npermutations > 0:
output.append([a*180/nbearings, r, p_value, rand_p])
else:
Expand All @@ -47,11 +47,11 @@ def bearing_analysis(data: numpy.ndarray, distances: numpy.ndarray, angles: nump
output_text.append("Tested {} vectors".format(nbearings))
output_text.append("")
if npermutations > 0:
col_headers = ("Bearing", "Correlation", "Prob", "RandProb")
col_formats = ("f", "f", "f", "f")
col_headers = ["Bearing", "Correlation", "Prob", "RandProb"]
col_formats = ["f", "f", "f", "f"]
else:
col_headers = ("Bearing", "Correlation", "Prob")
col_formats = ("f", "f", "f")
col_headers = ["Bearing", "Correlation", "Prob"]
col_formats = ["f", "f", "f"]
create_output_table(output_text, output, col_headers, col_formats)
bearing_output = namedtuple("bearing_output", ["output_values", "output_text"])
return bearing_output(output, output_text)
Expand Down
5 changes: 3 additions & 2 deletions pyssage/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from pyssage.classes import Point, Triangle, VoronoiEdge, VoronoiTessellation, VoronoiPolygon, Connections
from pyssage.utils import euclidean_angle, check_for_square_matrix

__all__ = ["delaunay_tessellation", "relative_neighborhood_network", "gabriel_network",
"minimum_spanning_tree", "connect_distance_range", "least_diagonal_network", "nearest_neighbor_connections"]
__all__ = ["connect_distance_range", "delaunay_tessellation", "distance_classes_to_connections",
"gabriel_network", "least_diagonal_network", "minimum_spanning_tree", "nearest_neighbor_connections",
"relative_neighborhood_network"]


def create_point_list(x: numpy.ndarray, y: numpy.ndarray) -> list:
Expand Down
158 changes: 114 additions & 44 deletions pyssage/correlogram.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from math import sqrt, pi, degrees
from typing import Optional, Tuple
from collections import namedtuple
import numpy
import scipy.stats
from pyssage.classes import Connections
from pyssage.utils import create_output_table, check_for_square_matrix
import pyssage.mantel

__all__ = ["bearing_correlogram", "correlogram", "windrose_correlogram"]


def check_variance_assumption(x: Optional[str]) -> None:
"""
Expand All @@ -21,21 +24,35 @@ def check_variance_assumption(x: Optional[str]) -> None:


def morans_i(y: numpy.ndarray, weights: Connections, alt_weights: Optional[numpy.ndarray] = None,
variance: Optional[str] = "random"):
variance: Optional[str] = "random", permutations: int = 0):
check_variance_assumption(variance)
n = len(y)
# mean_y = numpy.average(y)
mean_y = numpy.mean(y)
dev_y = y - mean_y # deviations from mean
w = weights.as_binary()
if alt_weights is not None: # multiply to create non-binary weights, if necessary
w = w * alt_weights
sumyij = numpy.sum(numpy.outer(dev_y, dev_y) * w, dtype=numpy.float64)
sumy2 = numpy.sum(numpy.square(dev_y), dtype=numpy.float64) # sum of squared deviations from mean
sumw = numpy.sum(w, dtype=numpy.float64) # sum of weight matrix
sumw2 = sumw**2
moran = n * sumyij / (sumw * sumy2)
expected = -1 / (n - 1)
sumyij = numpy.sum(numpy.outer(dev_y, dev_y) * w, dtype=numpy.float64)
moran = n * sumyij / (sumw * sumy2)

# permutations
permuted_i_list = [moran]
perm_p = 1
if permutations > 0:
rand_y = numpy.copy(dev_y)
for k in range(permutations-1):
numpy.random.shuffle(rand_y)
perm_sumyij = numpy.sum(numpy.outer(rand_y, rand_y) * w, dtype=numpy.float64)
perm_moran = n * perm_sumyij / (sumw * sumy2)
permuted_i_list.append(perm_moran)
if abs(perm_moran-expected) >= abs(moran-expected):
perm_p += 1
perm_p /= permutations

if variance is None:
sd, z, p = None, None, None
else:
Expand All @@ -51,24 +68,38 @@ def morans_i(y: numpy.ndarray, weights: Connections, alt_weights: Optional[numpy
z = abs(moran - expected) / sd
p = scipy.stats.norm.sf(z)*2 # two-tailed test

return weights.min_scale, weights.max_scale, weights.n_pairs(), expected, moran, sd, z, p
return weights.min_scale, weights.max_scale, weights.n_pairs(), expected, moran, sd, z, p, perm_p, permuted_i_list


def gearys_c(y: numpy.ndarray, weights: Connections, alt_weights: Optional[numpy.ndarray] = None,
variance: Optional[str] = "random"):
variance: Optional[str] = "random", permutations: int = 0):
check_variance_assumption(variance)
n = len(y)
# mean_y = numpy.average(y)
mean_y = numpy.mean(y)
dev_y = y - mean_y # deviations from mean
w = weights.as_binary()
if alt_weights is not None: # multiply to create non-binary weights, if necessary
w *= alt_weights
sumdif2 = numpy.sum(numpy.square(w * (dev_y[:, numpy.newaxis] - dev_y)), dtype=numpy.float64)
sumy2 = numpy.sum(numpy.square(dev_y), dtype=numpy.float64) # sum of squared deviations from mean
sumw = numpy.sum(w, dtype=numpy.float64) # sum of weight matrix
sumw2 = sumw**2
sumdif2 = numpy.sum(numpy.square(w * (dev_y[:, numpy.newaxis] - dev_y)), dtype=numpy.float64)
geary = (n - 1) * sumdif2 / (2 * sumw * sumy2)

# permutations
permuted_c_list = [geary]
perm_p = 1
if permutations > 0:
rand_y = numpy.copy(dev_y)
for k in range(permutations-1):
numpy.random.shuffle(rand_y)
perm_sumdif2 = numpy.sum(numpy.square(w * (rand_y[:, numpy.newaxis] - rand_y)), dtype=numpy.float64)
perm_geary = (n - 1) * perm_sumdif2 / (2 * sumw * sumy2)
permuted_c_list.append(perm_geary)
if abs(perm_geary - 1) >= abs(geary - 1):
perm_p += 1
perm_p /= permutations

if variance is None:
sd, z, p = None, None, None
else:
Expand All @@ -86,11 +117,11 @@ def gearys_c(y: numpy.ndarray, weights: Connections, alt_weights: Optional[numpy
z = abs(geary - 1) / sd
p = scipy.stats.norm.sf(z)*2 # two-tailed test

return weights.min_scale, weights.max_scale, weights.n_pairs(), 1, geary, sd, z, p
return weights.min_scale, weights.max_scale, weights.n_pairs(), 1, geary, sd, z, p, perm_p, permuted_c_list


def mantel_correl(y: numpy.ndarray, weights: Connections, alt_weights: Optional[numpy.ndarray] = None,
variance: Optional[str] = "random"):
variance=None, permutations: int = 0):
"""
in order to get the bearing version to work right, we have to use normal binary weights, then reverse the sign
of the resulting Mantel correlation. if we use reverse binary weighting we end up multiplying the 'out of
Expand All @@ -99,12 +130,12 @@ def mantel_correl(y: numpy.ndarray, weights: Connections, alt_weights: Optional[
w = weights.as_binary()
if alt_weights is not None: # multiply to create non-binary weights, if necessary
w *= alt_weights
r, p_value, _, _, _, _, z = pyssage.mantel.mantel(y, w, [])
return weights.min_scale, weights.max_scale, weights.n_pairs(), 0, -r, -z, p_value
r, p_value, _, _, _, permuted_two_p, permuted_rs, z = pyssage.mantel.mantel(y, w, [], permutations=permutations)
return weights.min_scale, weights.max_scale, weights.n_pairs(), 0, -r, -z, p_value, permuted_two_p, permuted_rs


def correlogram(data: numpy.ndarray, dist_class_connections: list, metric: morans_i,
variance: Optional[str] = "random"):
variance: Optional[str] = "random", permutations: int = 0) -> Tuple[list, list, list]:
if metric == morans_i:
metric_title = "Moran's I"
exp_format = "f"
Expand All @@ -118,8 +149,14 @@ def correlogram(data: numpy.ndarray, dist_class_connections: list, metric: moran
metric_title = ""
exp_format = ""
output = []
all_permuted_values = []
for dc in dist_class_connections:
output.append(metric(data, dc, variance=variance))
*tmp_out, permuted_values = metric(data, dc, variance=variance, permutations=permutations)
if permutations > 0:
output.append(tmp_out)
all_permuted_values.append(permuted_values)
else:
output.append(tmp_out[:len(tmp_out)-1])

# create basic output text
output_text = list()
Expand All @@ -128,20 +165,27 @@ def correlogram(data: numpy.ndarray, dist_class_connections: list, metric: moran
output_text.append("# of data points = {}".format(len(data)))
if variance is not None:
output_text.append("Distribution assumption = {}".format(variance))
if permutations > 0:
output_text.append("Permutation probability calculated from {} permutations".format(permutations))
output_text.append("")
if metric == mantel_correl:
col_headers = ("Min dist", "Max dist", "# pairs", "Expected", metric_title, "Z", "Prob")
col_formats = ("f", "f", "d", exp_format, "f", "f", "f")
else:
col_headers = ("Min dist", "Max dist", "# pairs", "Expected", metric_title, "SD", "Z", "Prob")
col_formats = ("f", "f", "d", exp_format, "f", "f", "f", "f")
col_headers = ["Min dist", "Max dist", "# pairs", "Expected", metric_title, "Z", "Prob"]
col_formats = ["f", "f", "d", exp_format, "f", "f", "f"]
if metric != mantel_correl:
col_headers.insert(5, "SD")
col_formats.insert(5, "f")
if permutations > 0:
col_headers.append("PermProb")
col_formats.append("f")

create_output_table(output_text, output, col_headers, col_formats)

return output, output_text
correlogram_output = namedtuple("correlogram_output", ["output_values", "output_text", "permuted_values"])
return correlogram_output(output, output_text, all_permuted_values)


def bearing_correlogram(data: numpy.ndarray, dist_class_connections: list, angles: numpy.ndarray, n_bearings: int = 18,
metric=morans_i, variance: Optional[str] = "random"):
metric=morans_i, variance: Optional[str] = "random",
permutations: int = 0) -> Tuple[list, list, list]:
if metric == morans_i:
metric_title = "Moran's I"
exp_format = "f"
Expand All @@ -164,11 +208,17 @@ def bearing_correlogram(data: numpy.ndarray, dist_class_connections: list, angle
bearing_weights.append(numpy.square(numpy.cos(angles - a)))

output = []
all_permuted_values = []
for i, b in enumerate(bearing_weights):
for dc in dist_class_connections:
tmp_out = list(metric(data, dc, alt_weights=b, variance=variance))
*tmp_out, permuted_values = metric(data, dc, alt_weights=b, variance=variance, permutations=permutations)
tmp_out = list(tmp_out)
tmp_out.insert(2, degrees(bearings[i]))
output.append(tmp_out)
if permutations > 0:
output.append(tmp_out)
all_permuted_values.append(permuted_values)
else:
output.append(tmp_out[:len(tmp_out)-1])

# create basic output text
output_text = list()
Expand All @@ -177,16 +227,21 @@ def bearing_correlogram(data: numpy.ndarray, dist_class_connections: list, angle
output_text.append("# of data points = {}".format(len(data)))
if variance is not None:
output_text.append("Distribution assumption = {}".format(variance))
if permutations > 0:
output_text.append("Permutation probability calculated from {} permutations".format(permutations))
output_text.append("")
if metric == mantel_correl:
col_headers = ("Min dist", "Max dist", "Bearing", "# pairs", "Expected", metric_title, "Z", "Prob")
col_formats = ("f", "f", "f", "d", exp_format, "f", "f", "f")
else:
col_headers = ("Min dist", "Max dist", "Bearing", "# pairs", "Expected", metric_title, "SD", "Z", "Prob")
col_formats = ("f", "f", "f", "d", exp_format, "f", "f", "f", "f")
col_headers = ["Min dist", "Max dist", "Bearing", "# pairs", "Expected", metric_title, "Z", "Prob"]
col_formats = ["f", "f", "f", "d", exp_format, "f", "f", "f"]
if metric != mantel_correl:
col_headers.insert(6, "SD")
col_formats.insert(6, "f")
if permutations > 0:
col_headers.append("PermProb")
col_formats.append("f")
create_output_table(output_text, output, col_headers, col_formats)

return output, output_text
correlogram_output = namedtuple("correlogram_output", ["output_values", "output_text", "permuted_values"])
return correlogram_output(output, output_text, all_permuted_values)


def windrose_sectors_per_annulus(segment_param: int, annulus: int) -> int:
Expand All @@ -211,7 +266,8 @@ def create_windrose_connections(distances: numpy.ndarray, angles: numpy.ndarray,

def windrose_correlogram(data: numpy.ndarray, distances: numpy.ndarray, angles: numpy.ndarray,
radius_c: float, radius_d: float, radius_e: float, segment_param: int = 4,
min_pairs: int = 21, metric=morans_i, variance: Optional[str] = "random"):
min_pairs: int = 21, metric=morans_i, variance: Optional[str] = "random",
permutations: int = 0) -> Tuple[list, list, list, list]:
if metric == morans_i:
metric_title = "Moran's I"
exp_format = "f"
Expand All @@ -237,18 +293,25 @@ def windrose_correlogram(data: numpy.ndarray, distances: numpy.ndarray, angles:
all_output = []
# all_output is needed for graphing the output *if* we want to include those sectors with too few pairs, but
# still more than zero
all_permuted_values = []
for annulus in range(n_annuli):
for sector in range(windrose_sectors_per_annulus(segment_param, annulus)):
connection, min_ang, max_ang = create_windrose_connections(distances, angles, annulus, sector,
segment_param, radius_c, radius_d, radius_e)
np = connection.n_pairs()
if np >= min_pairs:
tmp_out = list(metric(data, connection, variance=variance))
*tmp_out, permuted_values = metric(data, connection, variance=variance, permutations=permutations)
tmp_out = list(tmp_out)
# add sector angles to output
tmp_out.insert(2, degrees(min_ang))
tmp_out.insert(3, degrees(max_ang))
output.append(tmp_out)
all_output.append(tmp_out)
if permutations > 0:
output.append(tmp_out)
all_output.append(tmp_out)
all_permuted_values.append(permuted_values)
else:
output.append(tmp_out[:len(tmp_out) - 1])
all_output.append(tmp_out[:len(tmp_out) - 1])
else:
# using -1 for the probability as an indicator that nothing was calculated
if metric == mantel_correl:
Expand All @@ -257,6 +320,8 @@ def windrose_correlogram(data: numpy.ndarray, distances: numpy.ndarray, angles:
else:
tmp_out = [connection.min_scale, connection.max_scale, degrees(min_ang), degrees(max_ang),
np, 0, 0, 0, 0, -1]
if permutations > 0:
tmp_out.append(-1)
all_output.append(tmp_out)

# create basic output text
Expand All @@ -270,16 +335,21 @@ def windrose_correlogram(data: numpy.ndarray, distances: numpy.ndarray, angles:
output_text.append("")
if variance is not None:
output_text.append("Distribution assumption = {}".format(variance))
if permutations > 0:
output_text.append("Permutation probability calculated from {} permutations".format(permutations))

output_text.append("")
if metric == mantel_correl:
col_headers = ("Min dist", "Max dist", "Min angle", "Max angle", "# pairs", "Expected", metric_title,
"Z", "Prob")
col_formats = ("f", "f", "f", "f", "d", exp_format, "f", "f", "f")
else:
col_headers = ("Min dist", "Max dist", "Min angle", "Max angle", "# pairs", "Expected", metric_title, "SD",
"Z", "Prob")
col_formats = ("f", "f", "f", "f", "d", exp_format, "f", "f", "f", "f")
col_headers = ["Min dist", "Max dist", "Min angle", "Max angle", "# pairs", "Expected", metric_title,
"Z", "Prob"]
col_formats = ["f", "f", "f", "f", "d", exp_format, "f", "f", "f"]
if metric != mantel_correl:
col_headers.insert(7, "SD")
col_formats.insert(7, "f")
if permutations > 0:
col_headers.append("PermProb")
col_formats.append("f")
create_output_table(output_text, output, col_headers, col_formats)

return output, output_text, all_output
windrose_correlogram_output = namedtuple("windrose_correlogram_output", ["output_values", "output_text",
"all_output", "permuted_values"])
return windrose_correlogram_output(output, output_text, all_output, all_permuted_values)
Loading

0 comments on commit fdeb6d0

Please sign in to comment.