Edit on GitHub

utils.math.statistics

Mathematical statistics.

  1"""
  2Mathematical statistics.
  3"""
  4
  5import numpy
  6import pandas
  7
  8from typing import List, Hashable
  9
 10def findOutliers(
 11    srs: pandas.Series,
 12    sigma: int = 3,
 13    joinNeighbouringOutliers: bool = True,
 14    howManyCountAsNeighbours: int = 3
 15) -> pandas.Series:
 16    """
 17    Find outliers in the array and return the mask where outliers
 18    are marked with `True`. NaNs and INFs, if any, do not count
 19    as outliers.
 20
 21    If `joinNeighbouringOutliers` is `True`, then non-outlier values
 22    between neighbouring outliers will be marked as outliers too. The same
 23    goes for starting/ending elements, if they are close enough to an outlier.
 24
 25    Example with `joinNeighbouringOutliers` set to `True`:
 26
 27    ``` py
 28    import pandas
 29    import numpy
 30
 31    from phab.utils.math import statistics
 32
 33    srs = pandas.Series(
 34        [4, 111, 4, 4, 5, 6, numpy.inf, 2, 4, 4, numpy.nan, 1, 1e15, 4, 3, 3, 101, 2, 4, 3]
 35    )
 36
 37    markedOutliers = statistics.findOutliers(
 38        srs,
 39        sigma=3,
 40        joinNeighbouringOutliers=True,
 41        howManyCountAsNeighbours=3
 42    )
 43
 44    for i, v in srs.items():
 45        if markedOutliers[i] == True:
 46            print(f"{v} - outlier")
 47        else:
 48            print(f"{v} - regular")
 49    # 4.0 - outlier
 50    # 111.0 - outlier
 51    # 4.0 - regular
 52    # 4.0 - regular
 53    # 5.0 - regular
 54    # 6.0 - regular
 55    # inf - regular
 56    # 2.0 - regular
 57    # 4.0 - regular
 58    # 4.0 - regular
 59    # nan - regular
 60    # 1.0 - regular
 61    # 1000000000000000.0 - outlier
 62    # 4.0 - outlier
 63    # 3.0 - outlier
 64    # 3.0 - outlier
 65    # 101.0 - outlier
 66    # 2.0 - outlier
 67    # 4.0 - outlier
 68    # 3.0 - outlier
 69    ```
 70
 71    Example with `joinNeighbouringOutliers` set to `False`:
 72
 73    ``` py
 74    # ...
 75
 76    markedOutliers = statistics.findOutliers(
 77        srs,
 78        sigma=3,
 79        joinNeighbouringOutliers=False,
 80        howManyCountAsNeighbours=3
 81    )
 82
 83    for i, v in srs.items():
 84        if markedOutliers[i] == True:
 85            print(f"{v} - outlier")
 86        else:
 87            print(f"{v} - regular")
 88    # 4.0 - regular
 89    # 111.0 - outlier
 90    # 4.0 - regular
 91    # 4.0 - regular
 92    # 5.0 - regular
 93    # 6.0 - regular
 94    # inf - regular
 95    # 2.0 - regular
 96    # 4.0 - regular
 97    # 4.0 - regular
 98    # nan - regular
 99    # 1.0 - regular
100    # 1000000000000000.0 - outlier
101    # 4.0 - regular
102    # 3.0 - regular
103    # 3.0 - regular
104    # 101.0 - outlier
105    # 2.0 - regular
106    # 4.0 - regular
107    # 3.0 - regular
108    ```
109    """
110    # tbl = pandas.DataFrame({"vls": srs})
111    tbl = pandas.DataFrame(srs)
112
113    # find NaNs/INFs
114    tbl["finite"] = numpy.isfinite(srs)
115
116    # every element is an outlier by default
117    tbl["outliers"] = numpy.ones(len(tbl.index), dtype=bool)
118
119    # NaNs and INFs are not outliers
120    # print(tbl["finite"].loc[lambda x: x == False].index)
121    tbl.loc[
122        tbl["finite"] == False,  # noqa: E712
123        "outliers"
124    ] = False
125
126    tableFinite = tbl[
127        tbl["finite"] == True  # noqa: E712
128    ].iloc[:, 0]
129
130    med = numpy.median(tableFinite.values)  # type:ignore[arg-type] # ya hz
131    sig = 1.48 * numpy.median(numpy.abs(tableFinite - med))  # type:ignore[operator] # ya hz
132
133    for i, v in tableFinite.items():
134        if v > med - sigma * sig:
135            tbl.at[i, "outliers"] = False
136
137    for i, v in tableFinite.items():
138        if v >= med + sigma * sig:
139            tbl.at[i, "outliers"] = True
140
141    if not joinNeighbouringOutliers:
142        # the entire table with all columns (the original and two new ones,
143        # "finite" and "ouliers")
144        # return tbl
145        #
146        # or just two columns: first one with the values and "outliers"
147        # return tbl.iloc[:,0:3:2]
148        #
149        # or just the outliers
150        return tbl["outliers"]
151        #
152        # or just the "outliers" that are True, because it's a waste to pass
153        # around all the values, let alone the entire table
154        # return tbl[tbl["outliers"] == True]["outliers"]
155    else:
156        outlrs: pandas.Series = tbl["outliers"].copy()
157        elementsSincePreviousOutlier: int = 0
158        #
159        # later we might want to have the number of neighbours to scale
160        # with the list length, but for now it is passed as a fixed
161        # number in the `howManyCountAsNeighbours` argument of the function
162        # howManyCountAsNeighbours = round(len(outlrs.index) / 10)
163        #
164        # intuitively it should be `False`, but we need to account
165        # for possible neighbours from the very start
166        countingSincePreviousOutlier: bool = True
167        potentialNeighbourOutliers: List[Hashable] = []
168        for i, v in outlrs.items():
169            if v == True:  # noqa: E712
170                if countingSincePreviousOutlier:
171                    # make all the previous elements to be outliers too
172                    for pno in potentialNeighbourOutliers:
173                        outlrs[pno] = True
174                    potentialNeighbourOutliers = []
175                    elementsSincePreviousOutlier = 0
176                else:
177                    countingSincePreviousOutlier = True
178            else:
179                if countingSincePreviousOutlier:
180                    elementsSincePreviousOutlier += 1
181                    if elementsSincePreviousOutlier > howManyCountAsNeighbours:
182                        elementsSincePreviousOutlier = 0
183                        countingSincePreviousOutlier = False
184                        potentialNeighbourOutliers = []
185                    else:
186                        potentialNeighbourOutliers.append(i)
187        # if there are some pending potential neighbour outliers
188        # after we finished iterating the list, make them outliers
189        if len(potentialNeighbourOutliers) > 0:
190            for pno in potentialNeighbourOutliers:
191                outlrs[pno] = True
192            potentialNeighbourOutliers = []
193        return outlrs
def findOutliers( srs: pandas.core.series.Series, sigma: int = 3, joinNeighbouringOutliers: bool = True, howManyCountAsNeighbours: int = 3) -> pandas.core.series.Series:
 11def findOutliers(
 12    srs: pandas.Series,
 13    sigma: int = 3,
 14    joinNeighbouringOutliers: bool = True,
 15    howManyCountAsNeighbours: int = 3
 16) -> pandas.Series:
 17    """
 18    Find outliers in the array and return the mask where outliers
 19    are marked with `True`. NaNs and INFs, if any, do not count
 20    as outliers.
 21
 22    If `joinNeighbouringOutliers` is `True`, then non-outlier values
 23    between neighbouring outliers will be marked as outliers too. The same
 24    goes for starting/ending elements, if they are close enough to an outlier.
 25
 26    Example with `joinNeighbouringOutliers` set to `True`:
 27
 28    ``` py
 29    import pandas
 30    import numpy
 31
 32    from phab.utils.math import statistics
 33
 34    srs = pandas.Series(
 35        [4, 111, 4, 4, 5, 6, numpy.inf, 2, 4, 4, numpy.nan, 1, 1e15, 4, 3, 3, 101, 2, 4, 3]
 36    )
 37
 38    markedOutliers = statistics.findOutliers(
 39        srs,
 40        sigma=3,
 41        joinNeighbouringOutliers=True,
 42        howManyCountAsNeighbours=3
 43    )
 44
 45    for i, v in srs.items():
 46        if markedOutliers[i] == True:
 47            print(f"{v} - outlier")
 48        else:
 49            print(f"{v} - regular")
 50    # 4.0 - outlier
 51    # 111.0 - outlier
 52    # 4.0 - regular
 53    # 4.0 - regular
 54    # 5.0 - regular
 55    # 6.0 - regular
 56    # inf - regular
 57    # 2.0 - regular
 58    # 4.0 - regular
 59    # 4.0 - regular
 60    # nan - regular
 61    # 1.0 - regular
 62    # 1000000000000000.0 - outlier
 63    # 4.0 - outlier
 64    # 3.0 - outlier
 65    # 3.0 - outlier
 66    # 101.0 - outlier
 67    # 2.0 - outlier
 68    # 4.0 - outlier
 69    # 3.0 - outlier
 70    ```
 71
 72    Example with `joinNeighbouringOutliers` set to `False`:
 73
 74    ``` py
 75    # ...
 76
 77    markedOutliers = statistics.findOutliers(
 78        srs,
 79        sigma=3,
 80        joinNeighbouringOutliers=False,
 81        howManyCountAsNeighbours=3
 82    )
 83
 84    for i, v in srs.items():
 85        if markedOutliers[i] == True:
 86            print(f"{v} - outlier")
 87        else:
 88            print(f"{v} - regular")
 89    # 4.0 - regular
 90    # 111.0 - outlier
 91    # 4.0 - regular
 92    # 4.0 - regular
 93    # 5.0 - regular
 94    # 6.0 - regular
 95    # inf - regular
 96    # 2.0 - regular
 97    # 4.0 - regular
 98    # 4.0 - regular
 99    # nan - regular
100    # 1.0 - regular
101    # 1000000000000000.0 - outlier
102    # 4.0 - regular
103    # 3.0 - regular
104    # 3.0 - regular
105    # 101.0 - outlier
106    # 2.0 - regular
107    # 4.0 - regular
108    # 3.0 - regular
109    ```
110    """
111    # tbl = pandas.DataFrame({"vls": srs})
112    tbl = pandas.DataFrame(srs)
113
114    # find NaNs/INFs
115    tbl["finite"] = numpy.isfinite(srs)
116
117    # every element is an outlier by default
118    tbl["outliers"] = numpy.ones(len(tbl.index), dtype=bool)
119
120    # NaNs and INFs are not outliers
121    # print(tbl["finite"].loc[lambda x: x == False].index)
122    tbl.loc[
123        tbl["finite"] == False,  # noqa: E712
124        "outliers"
125    ] = False
126
127    tableFinite = tbl[
128        tbl["finite"] == True  # noqa: E712
129    ].iloc[:, 0]
130
131    med = numpy.median(tableFinite.values)  # type:ignore[arg-type] # ya hz
132    sig = 1.48 * numpy.median(numpy.abs(tableFinite - med))  # type:ignore[operator] # ya hz
133
134    for i, v in tableFinite.items():
135        if v > med - sigma * sig:
136            tbl.at[i, "outliers"] = False
137
138    for i, v in tableFinite.items():
139        if v >= med + sigma * sig:
140            tbl.at[i, "outliers"] = True
141
142    if not joinNeighbouringOutliers:
143        # the entire table with all columns (the original and two new ones,
144        # "finite" and "ouliers")
145        # return tbl
146        #
147        # or just two columns: first one with the values and "outliers"
148        # return tbl.iloc[:,0:3:2]
149        #
150        # or just the outliers
151        return tbl["outliers"]
152        #
153        # or just the "outliers" that are True, because it's a waste to pass
154        # around all the values, let alone the entire table
155        # return tbl[tbl["outliers"] == True]["outliers"]
156    else:
157        outlrs: pandas.Series = tbl["outliers"].copy()
158        elementsSincePreviousOutlier: int = 0
159        #
160        # later we might want to have the number of neighbours to scale
161        # with the list length, but for now it is passed as a fixed
162        # number in the `howManyCountAsNeighbours` argument of the function
163        # howManyCountAsNeighbours = round(len(outlrs.index) / 10)
164        #
165        # intuitively it should be `False`, but we need to account
166        # for possible neighbours from the very start
167        countingSincePreviousOutlier: bool = True
168        potentialNeighbourOutliers: List[Hashable] = []
169        for i, v in outlrs.items():
170            if v == True:  # noqa: E712
171                if countingSincePreviousOutlier:
172                    # make all the previous elements to be outliers too
173                    for pno in potentialNeighbourOutliers:
174                        outlrs[pno] = True
175                    potentialNeighbourOutliers = []
176                    elementsSincePreviousOutlier = 0
177                else:
178                    countingSincePreviousOutlier = True
179            else:
180                if countingSincePreviousOutlier:
181                    elementsSincePreviousOutlier += 1
182                    if elementsSincePreviousOutlier > howManyCountAsNeighbours:
183                        elementsSincePreviousOutlier = 0
184                        countingSincePreviousOutlier = False
185                        potentialNeighbourOutliers = []
186                    else:
187                        potentialNeighbourOutliers.append(i)
188        # if there are some pending potential neighbour outliers
189        # after we finished iterating the list, make them outliers
190        if len(potentialNeighbourOutliers) > 0:
191            for pno in potentialNeighbourOutliers:
192                outlrs[pno] = True
193            potentialNeighbourOutliers = []
194        return outlrs

Find outliers in the array and return the mask where outliers are marked with True. NaNs and INFs, if any, do not count as outliers.

If joinNeighbouringOutliers is True, then non-outlier values between neighbouring outliers will be marked as outliers too. The same goes for starting/ending elements, if they are close enough to an outlier.

Example with joinNeighbouringOutliers set to True:

import pandas
import numpy

from phab.utils.math import statistics

srs = pandas.Series(
    [4, 111, 4, 4, 5, 6, numpy.inf, 2, 4, 4, numpy.nan, 1, 1e15, 4, 3, 3, 101, 2, 4, 3]
)

markedOutliers = statistics.findOutliers(
    srs,
    sigma=3,
    joinNeighbouringOutliers=True,
    howManyCountAsNeighbours=3
)

for i, v in srs.items():
    if markedOutliers[i] == True:
        print(f"{v} - outlier")
    else:
        print(f"{v} - regular")
# 4.0 - outlier
# 111.0 - outlier
# 4.0 - regular
# 4.0 - regular
# 5.0 - regular
# 6.0 - regular
# inf - regular
# 2.0 - regular
# 4.0 - regular
# 4.0 - regular
# nan - regular
# 1.0 - regular
# 1000000000000000.0 - outlier
# 4.0 - outlier
# 3.0 - outlier
# 3.0 - outlier
# 101.0 - outlier
# 2.0 - outlier
# 4.0 - outlier
# 3.0 - outlier

Example with joinNeighbouringOutliers set to False:

# ...

markedOutliers = statistics.findOutliers(
    srs,
    sigma=3,
    joinNeighbouringOutliers=False,
    howManyCountAsNeighbours=3
)

for i, v in srs.items():
    if markedOutliers[i] == True:
        print(f"{v} - outlier")
    else:
        print(f"{v} - regular")
# 4.0 - regular
# 111.0 - outlier
# 4.0 - regular
# 4.0 - regular
# 5.0 - regular
# 6.0 - regular
# inf - regular
# 2.0 - regular
# 4.0 - regular
# 4.0 - regular
# nan - regular
# 1.0 - regular
# 1000000000000000.0 - outlier
# 4.0 - regular
# 3.0 - regular
# 3.0 - regular
# 101.0 - outlier
# 2.0 - regular
# 4.0 - regular
# 3.0 - regular