[Git][debian-gis-team/flox][upstream] New upstream version 0.9.6

Antonio Valentino (@antonio.valentino) gitlab at salsa.debian.org
Thu Mar 28 08:19:41 GMT 2024



Antonio Valentino pushed to branch upstream at Debian GIS Project / flox


Commits:
2cc87746 by Antonio Valentino at 2024-03-28T08:05:13+00:00
New upstream version 0.9.6
- - - - -


14 changed files:

- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .github/workflows/upstream-dev-ci.yaml
- asv_bench/benchmarks/cohorts.py
- ci/benchmark.yml
- docs/source/implementation.md
- flox/aggregate_flox.py
- flox/aggregate_npg.py
- flox/aggregate_numbagg.py
- flox/core.py
- flox/xarray.py
- tests/__init__.py
- tests/test_core.py
- tests/test_xarray.py


Changes:

=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -77,7 +77,7 @@ jobs:
           --ignore flox/tests \
           --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v4.0.0
+        uses: codecov/codecov-action at v4.1.0
         with:
           file: ./coverage.xml
           flags: unittests
@@ -131,7 +131,7 @@ jobs:
           python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
 
       - name: Upload mypy coverage to Codecov
-        uses: codecov/codecov-action at v4.0.0
+        uses: codecov/codecov-action at v4.1.0
         with:
           file: mypy_report/cobertura.xml
           flags: mypy


=====================================
.github/workflows/ci.yaml
=====================================
@@ -49,7 +49,7 @@ jobs:
         run: |
           pytest -n auto --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v4.0.0
+        uses: codecov/codecov-action at v4.1.0
         with:
           file: ./coverage.xml
           flags: unittests
@@ -93,7 +93,7 @@ jobs:
         run: |
           python -m pytest -n auto --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v4.0.0
+        uses: codecov/codecov-action at v4.1.0
         with:
           file: ./coverage.xml
           flags: unittests


=====================================
.github/workflows/upstream-dev-ci.yaml
=====================================
@@ -10,6 +10,7 @@ on:
     paths:
       - ".github/workflows/upstream-dev-ci.yaml"
       - "ci/upstream-dev-env.yml"
+      - "flox/*"
   schedule:
     - cron: "0 0 * * *" # Daily “At 00:00” UTC
   workflow_dispatch: # allows you to trigger the workflow run manually


=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -95,7 +95,7 @@ class ERA5Dataset:
     """ERA5"""
 
     def __init__(self, *args, **kwargs):
-        self.time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H"))
+        self.time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="h"))
         self.axis = (-1,)
         self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 48))
 
@@ -143,7 +143,7 @@ class PerfectMonthly(Cohorts):
     """Perfectly chunked for a "cohorts" monthly mean climatology"""
 
     def setup(self, *args, **kwargs):
-        self.time = pd.Series(pd.date_range("1961-01-01", "2018-12-31 23:59", freq="M"))
+        self.time = pd.Series(pd.date_range("1961-01-01", "2018-12-31 23:59", freq="ME"))
         self.axis = (-1,)
         self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 4))
         self.by = self.time.dt.month.values - 1
@@ -164,7 +164,7 @@ class PerfectMonthly(Cohorts):
 class ERA5Google(Cohorts):
     def setup(self, *args, **kwargs):
         TIME = 900  # 92044 in Google ARCO ERA5
-        self.time = pd.Series(pd.date_range("1959-01-01", freq="6H", periods=TIME))
+        self.time = pd.Series(pd.date_range("1959-01-01", freq="6h", periods=TIME))
         self.axis = (2,)
         self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, 1))
         self.by = self.time.dt.day.values - 1
@@ -201,3 +201,12 @@ class OISST(Cohorts):
         self.time = pd.Series(index)
         self.by = self.time.dt.dayofyear.values - 1
         self.expected = pd.RangeIndex(self.by.max() + 1)
+
+
+class RandomBigArray(Cohorts):
+    def setup(self, *args, **kwargs):
+        M, N = 100_000, 20_000
+        self.array = dask.array.random.normal(size=(M, N), chunks=(10_000, N // 5)).T
+        self.by = np.random.choice(5_000, size=M)
+        self.expected = pd.RangeIndex(5000)
+        self.axis = (1,)


=====================================
ci/benchmark.yml
=====================================
@@ -3,6 +3,7 @@ channels:
   - conda-forge
 dependencies:
   - asv
+  - build
   - cachey
   - dask-core
   - numpy>=1.22


=====================================
docs/source/implementation.md
=====================================
@@ -300,8 +300,10 @@ label overlaps with all other labels. The algorithm is as follows.
    ![cohorts-schematic](/../diagrams/containment.png)
 
 1. To choose between `"map-reduce"` and `"cohorts"`, we need a summary measure of the degree to which the labels overlap with
-   each other. We use _sparsity_ --- the number of non-zero elements in `C` divided by the number of elements in `C`, `C.nnz/C.size`.
-   When sparsity > 0.6, we choose `"map-reduce"` since there is decent overlap between (any) cohorts. Otherwise we use `"cohorts"`.
+   each other. We can use _sparsity_ --- the number of non-zero elements in `C` divided by the number of elements in `C`, `C.nnz/C.size`.
+   We use sparsity(`S`) as an approximation for the sparsity(`C`) to avoid a potentially expensive sparse matrix dot product when `S`
+   isn't particularly sparse. When sparsity(`S`) > 0.4 (arbitrary), we choose `"map-reduce"` since there is decent overlap between
+   (any) cohorts. Otherwise we use `"cohorts"`.
 
 Cool, isn't it?!
 


=====================================
flox/aggregate_flox.py
=====================================
@@ -37,7 +37,8 @@ def _lerp(a, b, *, t, dtype, out=None):
     """
     if out is None:
         out = np.empty_like(a, dtype=dtype)
-    diff_b_a = np.subtract(b, a)
+    with np.errstate(invalid="ignore"):
+        diff_b_a = np.subtract(b, a)
     # asanyarray is a stop-gap until gh-13105
     np.add(a, diff_b_a * t, out=out)
     np.subtract(b, diff_b_a * (1 - t), out=out, where=t >= 0.5)
@@ -95,7 +96,8 @@ def quantile_(array, inv_idx, *, q, axis, skipna, group_idx, dtype=None, out=Non
 
     # partition the complex array in-place
     labels_broadcast = np.broadcast_to(group_idx, array.shape)
-    cmplx = labels_broadcast + 1j * array
+    with np.errstate(invalid="ignore"):
+        cmplx = labels_broadcast + 1j * array
     cmplx.partition(kth=kth, axis=-1)
     if is_scalar_q:
         a_ = cmplx.imag


=====================================
flox/aggregate_npg.py
=====================================
@@ -88,6 +88,8 @@ def nanprod(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dt
 
 
 def _len(group_idx, array, engine, *, func, axis=-1, size=None, fill_value=None, dtype=None):
+    if array.dtype.kind in "US":
+        array = np.broadcast_to(np.array([1]), array.shape)
     result = _get_aggregate(engine).aggregate(
         group_idx,
         array,


=====================================
flox/aggregate_numbagg.py
=====================================
@@ -105,11 +105,24 @@ def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None,
     )
 
 
+def nanlen(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
+    if array.dtype.kind in "US":
+        array = np.broadcast_to(np.array([1]), array.shape)
+    return _numbagg_wrapper(
+        group_idx,
+        array,
+        axis=axis,
+        size=size,
+        func="nancount",
+        # fill_value=fill_value,
+        # dtype=dtype,
+    )
+
+
 nansum = partial(_numbagg_wrapper, func="nansum")
 nanmean = partial(_numbagg_wrapper, func="nanmean")
 nanprod = partial(_numbagg_wrapper, func="nanprod")
 nansum_of_squares = partial(_numbagg_wrapper, func="nansum_of_squares")
-nanlen = partial(_numbagg_wrapper, func="nancount")
 nanprod = partial(_numbagg_wrapper, func="nanprod")
 nanfirst = partial(_numbagg_wrapper, func="nanfirst")
 nanlast = partial(_numbagg_wrapper, func="nanlast")


=====================================
flox/core.py
=====================================
@@ -363,37 +363,55 @@ def find_group_cohorts(
         logger.info("find_group_cohorts: cohorts is preferred, chunking is perfect.")
         return "cohorts", chunks_cohorts
 
-    # Containment = |Q & S| / |Q|
+    # We'll use containment to measure degree of overlap between labels.
+    # Containment C = |Q & S| / |Q|
     #  - |X| is the cardinality of set X
     #  - Q is the query set being tested
     #  - S is the existing set
-    # We'll use containment to measure degree of overlap between labels. The bitmask
-    # matrix allows us to calculate this pretty efficiently.
-    asfloat = bitmask.astype(float)
-    # Note: While A.T @ A is a symmetric matrix, the division by chunks_per_label
-    # makes it non-symmetric.
-    containment = csr_array((asfloat.T @ asfloat) / chunks_per_label)
-
-    # The containment matrix is a measure of how much the labels overlap
-    # with each other. We treat the sparsity = (nnz/size) as a summary measure of the net overlap.
+    # The bitmask matrix S allows us to calculate this pretty efficiently using a dot product.
+    #       S.T @ S / chunks_per_label
+    #
+    # We treat the sparsity(C) = (nnz/size) as a summary measure of the net overlap.
     # 1. For high enough sparsity, there is a lot of overlap and we should use "map-reduce".
     # 2. When labels are uniformly distributed amongst all chunks
     #    (and number of labels < chunk size), sparsity is 1.
     # 3. Time grouping cohorts (e.g. dayofyear) appear as lines in this matrix.
     # 4. When there are no overlaps at all between labels, containment is a block diagonal matrix
     #    (approximately).
-    MAX_SPARSITY_FOR_COHORTS = 0.6  # arbitrary
-    sparsity = containment.nnz / math.prod(containment.shape)
+    #
+    # However computing S.T @ S can still be the slowest step, especially if S
+    # is not particularly sparse. Empirically the sparsity( S.T @ S ) > min(1, 2 x sparsity(S)).
+    # So we use sparsity(S) as a shortcut.
+    MAX_SPARSITY_FOR_COHORTS = 0.4  # arbitrary
+    sparsity = bitmask.nnz / math.prod(bitmask.shape)
     preferred_method: Literal["map-reduce"] | Literal["cohorts"]
+    logger.debug(
+        "sparsity of bitmask is {}, threshold is {}".format(  # noqa
+            sparsity, MAX_SPARSITY_FOR_COHORTS
+        )
+    )
     if sparsity > MAX_SPARSITY_FOR_COHORTS:
-        logger.info("sparsity is {}".format(sparsity))  # noqa
         if not merge:
-            logger.info("find_group_cohorts: merge=False, choosing 'map-reduce'")
+            logger.info(
+                "find_group_cohorts: bitmask sparsity={}, merge=False, choosing 'map-reduce'".format(  # noqa
+                    sparsity
+                )
+            )
             return "map-reduce", {}
         preferred_method = "map-reduce"
     else:
         preferred_method = "cohorts"
 
+    # Note: While A.T @ A is a symmetric matrix, the division by chunks_per_label
+    #       makes it non-symmetric.
+    asfloat = bitmask.astype(float)
+    containment = csr_array(asfloat.T @ asfloat / chunks_per_label)
+
+    logger.debug(
+        "sparsity of containment matrix is {}".format(  # noqa
+            containment.nnz / math.prod(containment.shape)
+        )
+    )
     # Use a threshold to force some merging. We do not use the filtered
     # containment matrix for estimating "sparsity" because it is a bit
     # hard to reason about.


=====================================
flox/xarray.py
=====================================
@@ -201,10 +201,10 @@ def xarray_reduce(
     >>> da = da = xr.ones_like(labels)
     >>> # Sum all values in da that matches the elements in the group index:
     >>> xarray_reduce(da, labels, func="sum")
-    <xarray.DataArray 'label' (label: 4)>
+    <xarray.DataArray 'label' (label: 4)> Size: 32B
     array([3, 2, 2, 2])
     Coordinates:
-      * label    (label) int64 0 1 2 3
+      * label    (label) int64 32B 0 1 2 3
     """
 
     if skipna is not None and isinstance(func, Aggregation):
@@ -259,7 +259,7 @@ def xarray_reduce(
     for var in maybe_drop:
         maybe_midx = ds._indexes.get(var, None)
         if isinstance(maybe_midx, PandasMultiIndex):
-            idx_coord_names = set(maybe_midx.index.names + [maybe_midx.dim])
+            idx_coord_names = set(tuple(maybe_midx.index.names) + (maybe_midx.dim,))
             idx_other_names = idx_coord_names - set(maybe_drop)
             more_drop.update(idx_other_names)
     maybe_drop.update(more_drop)
@@ -303,14 +303,17 @@ def xarray_reduce(
         # reducing along a dimension along which groups do not vary
         # This is really just a normal reduction.
         # This is not right when binning so we exclude.
-        if isinstance(func, str):
-            dsfunc = func[3:] if skipna else func
-        else:
+        if isinstance(func, str) and func.startswith("nan"):
+            raise ValueError(f"Specify func={func[3:]}, skipna=True instead of func={func}")
+        elif isinstance(func, Aggregation):
             raise NotImplementedError(
                 "func must be a string when reducing along a dimension not present in `by`"
             )
-        # TODO: skipna needs test
-        result = getattr(ds_broad, dsfunc)(dim=dim_tuple, skipna=skipna)
+        # skipna is not supported for all reductions
+        # https://github.com/pydata/xarray/issues/8819
+        kwargs = {"skipna": skipna} if skipna is not None else {}
+        kwargs.update(finalize_kwargs)
+        result = getattr(ds_broad, func)(dim=dim_tuple, **kwargs)
         if isinstance(obj, xr.DataArray):
             return obj._from_temp_dataset(result)
         else:


=====================================
tests/__init__.py
=====================================
@@ -124,3 +124,35 @@ def assert_equal_tuple(a, b):
             np.testing.assert_array_equal(a_, b_)
         else:
             assert a_ == b_
+
+
+SCIPY_STATS_FUNCS = ("mode", "nanmode")
+BLOCKWISE_FUNCS = ("median", "nanmedian", "quantile", "nanquantile") + SCIPY_STATS_FUNCS
+ALL_FUNCS = (
+    "sum",
+    "nansum",
+    "argmax",
+    "nanfirst",
+    "nanargmax",
+    "prod",
+    "nanprod",
+    "mean",
+    "nanmean",
+    "var",
+    "nanvar",
+    "std",
+    "nanstd",
+    "max",
+    "nanmax",
+    "min",
+    "nanmin",
+    "argmin",
+    "nanargmin",
+    "any",
+    "all",
+    "nanlast",
+    "median",
+    "nanmedian",
+    "quantile",
+    "nanquantile",
+) + tuple(pytest.param(func, marks=requires_scipy) for func in SCIPY_STATS_FUNCS)


=====================================
tests/test_core.py
=====================================
@@ -31,12 +31,14 @@ from flox.core import (
 )
 
 from . import (
+    ALL_FUNCS,
+    BLOCKWISE_FUNCS,
+    SCIPY_STATS_FUNCS,
     assert_equal,
     assert_equal_tuple,
     has_dask,
     raise_if_dask_computes,
     requires_dask,
-    requires_scipy,
 )
 
 logger = logging.getLogger("flox")
@@ -60,36 +62,6 @@ else:
 
 
 DEFAULT_QUANTILE = 0.9
-SCIPY_STATS_FUNCS = ("mode", "nanmode")
-BLOCKWISE_FUNCS = ("median", "nanmedian", "quantile", "nanquantile") + SCIPY_STATS_FUNCS
-ALL_FUNCS = (
-    "sum",
-    "nansum",
-    "argmax",
-    "nanfirst",
-    "nanargmax",
-    "prod",
-    "nanprod",
-    "mean",
-    "nanmean",
-    "var",
-    "nanvar",
-    "std",
-    "nanstd",
-    "max",
-    "nanmax",
-    "min",
-    "nanmin",
-    "argmin",
-    "nanargmin",
-    "any",
-    "all",
-    "nanlast",
-    "median",
-    "nanmedian",
-    "quantile",
-    "nanquantile",
-) + tuple(pytest.param(func, marks=requires_scipy) for func in SCIPY_STATS_FUNCS)
 
 if TYPE_CHECKING:
     from flox.core import T_Agg, T_Engine, T_ExpectedGroupsOpt, T_Method
@@ -908,7 +880,7 @@ def test_find_cohorts_missing_groups():
 
 @pytest.mark.parametrize("chunksize", [12, 13, 14, 24, 36, 48, 72, 71])
 def test_verify_complex_cohorts(chunksize: int) -> None:
-    time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H"))
+    time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="h"))
     chunks = (chunksize,) * (len(time) // chunksize)
     by = np.array(time.dt.dayofyear.values)
 
@@ -1091,7 +1063,7 @@ def test_empty_bins(func, engine):
 
 
 def test_datetime_binning():
-    time_bins = pd.date_range(start="2010-08-01", end="2010-08-15", freq="24H")
+    time_bins = pd.date_range(start="2010-08-01", end="2010-08-15", freq="24h")
     by = pd.date_range("2010-08-01", "2010-08-15", freq="15min")
 
     (actual,) = _convert_expected_groups_to_index((time_bins,), isbin=(True,), sort=False)
@@ -1153,9 +1125,9 @@ def test_group_by_datetime(engine, method):
     if method == "blockwise":
         return None
 
-    edges = pd.date_range("1999-12-31", "2000-12-31", freq="M").to_series().to_numpy()
+    edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy()
     actual, _ = groupby_reduce(daskarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs)
-    expected = data.resample("M").mean().to_numpy()
+    expected = data.resample("ME").mean().to_numpy()
     assert_equal(expected, actual)
 
     actual, _ = groupby_reduce(
@@ -1548,7 +1520,7 @@ def test_validate_reindex() -> None:
 @requires_dask
 def test_1d_blockwise_sort_optimization():
     # Make sure for resampling problems sorting isn't done.
-    time = pd.Series(pd.date_range("2020-09-01", "2020-12-31 23:59", freq="3H"))
+    time = pd.Series(pd.date_range("2020-09-01", "2020-12-31 23:59", freq="3h"))
     array = dask.array.ones((len(time),), chunks=(224,))
 
     actual, _ = groupby_reduce(array, time.dt.dayofyear.values, method="blockwise", func="count")
@@ -1710,7 +1682,18 @@ def test_multiple_quantiles(q, chunk, func, by_ndim):
     actual, _ = groupby_reduce(array, labels, func=func, finalize_kwargs=dict(q=q), axis=axis)
     sorted_array = array[..., [0, 1, 2, 4, 3, 5, 6]]
     f = partial(getattr(np, func), q=q, axis=axis, keepdims=True)
+    if chunk:
+        sorted_array = sorted_array.compute()
     expected = np.concatenate((f(sorted_array[..., :4]), f(sorted_array[..., 4:])), axis=-1)
     if by_ndim == 2:
         expected = expected.squeeze(axis=-2)
     assert_equal(expected, actual, tolerance=1e-14)
+
+
+ at pytest.mark.parametrize("dtype", ["U3", "S3"])
+def test_nanlen_string(dtype, engine):
+    array = np.array(["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"], dtype=dtype)
+    by = np.array([0, 0, 1, 2, 1, 0])
+    expected = np.array([3, 2, 1], dtype=np.intp)
+    actual, *_ = groupby_reduce(array, by, func="count", engine=engine)
+    assert_equal(expected, actual)


=====================================
tests/test_xarray.py
=====================================
@@ -9,6 +9,7 @@ xr = pytest.importorskip("xarray")
 from flox.xarray import rechunk_for_blockwise, xarray_reduce
 
 from . import (
+    ALL_FUNCS,
     assert_equal,
     has_dask,
     raise_if_dask_computes,
@@ -710,3 +711,32 @@ def test_multiple_quantiles(q, chunk, by_ndim, skipna):
     with xr.set_options(use_flox=False):
         expected = da.groupby(by).quantile(q, skipna=skipna)
     xr.testing.assert_allclose(expected, actual)
+
+
+ at pytest.mark.parametrize("func", ALL_FUNCS)
+def test_direct_reduction(func):
+    if "arg" in func or "mode" in func:
+        pytest.skip()
+    # regression test for https://github.com/pydata/xarray/issues/8819
+    rand = np.random.choice([True, False], size=(2, 3))
+    if func not in ["any", "all"]:
+        rand = rand.astype(float)
+
+    if "nan" in func:
+        func = func[3:]
+        kwargs = {"skipna": True}
+    else:
+        kwargs = {}
+
+    if "first" not in func and "last" not in func:
+        kwargs["dim"] = "y"
+
+    if "quantile" in func:
+        kwargs["q"] = 0.9
+
+    data = xr.DataArray(rand, dims=("x", "y"), coords={"x": [10, 20], "y": [0, 1, 2]})
+    with xr.set_options(use_flox=True):
+        actual = xarray_reduce(data, "x", func=func, **kwargs)
+    with xr.set_options(use_flox=False):
+        expected = getattr(data.groupby("x", squeeze=False), func)(**kwargs)
+    xr.testing.assert_identical(expected, actual)



View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/2cc87746821d9aeff2bb389981cd152e905f68ce

-- 
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/2cc87746821d9aeff2bb389981cd152e905f68ce
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240328/337bc7ba/attachment-0001.htm>


More information about the Pkg-grass-devel mailing list