import functools
import dask.array
import numpy as np
import pandas as pd
import xarray as xr
from dask import distributed as dd
import rhg_compute_tools.utils
[docs]def dataarrays_from_delayed(futures, client=None, **client_kwargs):
"""
Returns a list of xarray dataarrays from a list of futures of dataarrays
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.DataArray` objects.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
client_kwargs : optional
kwargs to pass to ``client.map`` and ``client.gather`` commands (e.g.
``priority``)
Returns
-------
arrays : list
list of :py:class:`xarray.DataArray` objects with
:py:class:`dask.array.Array` backends.
Examples
--------
Given a mapped xarray DataArray, pull the metadata into memory while
leaving the data on the workers:
.. code-block:: python
>>> import numpy as np
>>> def build_arr(multiplier):
... return multiplier * xr.DataArray(
... np.arange(2), dims=['x'], coords=[['a', 'b']])
...
>>> client = dd.Client()
>>> fut = client.map(build_arr, range(3))
>>> arrs = dataarrays_from_delayed(fut, priority=1)
>>> arrs[-1] # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.DataArray ...(x: 2)>
dask.array<...shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>
Coordinates:
* x (x) <U1 'a' 'b'
This list of arrays can now be manipulated using normal xarray tools:
.. code-block:: python
>>> xr.concat(arrs, dim='simulation') # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.DataArray ...(simulation: 3, x: 2)>
dask.array<...shape=(3, 2), dtype=int64, chunksize=(1, 2), chunktype=numpy.ndarray>
Coordinates:
* x (x) <U1 'a' 'b'
Dimensions without coordinates: simulation
>>> client.close()
"""
if client is None:
client = dd.get_client()
delayed_arrays = client.map(lambda x: x.data, futures, **client_kwargs)
dask_array_metadata = client.gather(
client.map(lambda x: (x.data.shape, x.data.dtype), futures, **client_kwargs)
)
dask_arrays = [
dask.array.from_delayed(delayed_arrays[i], *dask_array_metadata[i])
for i in range(len(futures))
]
# using dict(x.coords) b/c gathering coords can blow up memory for some reason
array_metadata = client.gather(
client.map(
lambda x: {
"dims": x.dims,
"coords": dict(x.coords),
"attrs": x.attrs,
"name": x.name,
},
futures,
**client_kwargs
)
)
data_arrays = [
xr.DataArray(dask_arrays[i], **array_metadata[i]) for i in range(len(futures))
]
return data_arrays
[docs]def dataarray_from_delayed(futures, dim=None, client=None, **client_kwargs):
"""
Returns a DataArray from a list of futures
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.DataArray` objects.
dim : str, optional
dimension along which to concat :py:class:`xarray.DataArray`.
Inferred by default.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
client_kwargs : optional
kwargs to pass to ``client.map`` and ``client.gather`` commands (e.g.
``priority``)
Returns
-------
array : object
:py:class:`xarray.DataArray` concatenated along ``dim`` with
a :py:class:`dask.array.Array` backend.
Examples
--------
Given a mapped xarray DataArray, pull the metadata into memory while
leaving the data on the workers:
.. code-block:: python
>>> import numpy as np, pandas as pd
>>> def build_arr(multiplier):
... return multiplier * xr.DataArray(
... np.arange(2), dims=['x'], coords=[['a', 'b']])
...
>>> client = dd.Client()
>>> fut = client.map(build_arr, range(3))
>>> da = dataarray_from_delayed(
... fut,
... dim=pd.Index(range(3), name='simulation'),
... priority=1
... )
...
>>> da # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.DataArray ...(simulation: 3, x: 2)>
dask.array<...shape=(3, 2), dtype=int64, chunksize=(1, 2), chunktype=numpy.ndarray>
Coordinates:
* x (x) <U1 'a' 'b'
* simulation (simulation) int64 0 1 2
>>> client.close()
"""
data_arrays = dataarrays_from_delayed(futures, client=client, **client_kwargs)
da = xr.concat(data_arrays, dim=dim)
return da
[docs]def datasets_from_delayed(futures, client=None, **client_kwargs):
"""
Returns a list of xarray datasets from a list of futures of datasets
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.Dataset` objects.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
client_kwargs : optional
kwargs to pass to ``client.map`` and ``client.gather`` commands (e.g.
``priority``)
Returns
-------
datasets : list
list of :py:class:`xarray.Dataset` objects with
:py:class:`dask.array.Array` backends for each variable.
Examples
--------
Given a mapped :py:class:`xarray.Dataset`, pull the metadata into memory
while leaving the data on the workers:
.. code-block:: python
>>> import numpy as np
>>> def build_ds(multiplier):
... return multiplier * xr.Dataset({
... 'var1': xr.DataArray(
... np.arange(2), dims=['x'], coords=[['a', 'b']])})
...
>>> client = dd.Client()
>>> fut = client.map(build_ds, range(3))
>>> arrs = datasets_from_delayed(fut, priority=1)
>>> arrs[-1] # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.Dataset>
Dimensions: (x: 2)
Coordinates:
* x (x) <U1 'a' 'b'
Data variables:
var1 (x) int64 dask.array<chunksize=(2,), meta=np.ndarray>
This list of arrays can now be manipulated using normal xarray tools:
.. code-block:: python
>>> xr.concat(arrs, dim='y') # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.Dataset>
Dimensions: (x: 2, y: 3)
Coordinates:
* x (x) <U1 'a' 'b'
Dimensions without coordinates: y
Data variables:
var1 (y, x) int64 dask.array<chunksize=(1, 2), meta=np.ndarray>
>>> client.close()
"""
if client is None:
client = dd.get_client()
data_var_keys = client.gather(
client.map(lambda x: list(x.data_vars.keys()), futures, **client_kwargs)
)
delayed_arrays = [
{
k: client.submit(lambda x: x[k].data, futures[i], **client_kwargs)
for k in data_var_keys[i]
}
for i in range(len(futures))
]
dask_array_metadata = client.gather(
[
{
k: (
client.submit(
lambda x: (x[k].data.shape, x[k].data.dtype),
futures[i],
**client_kwargs
)
)
for k in data_var_keys[i]
}
for i in range(len(futures))
]
)
dask_data_arrays = [
{
k: (
dask.array.from_delayed(
delayed_arrays[i][k], *dask_array_metadata[i][k]
)
)
for k in data_var_keys[i]
}
for i in range(len(futures))
]
# using dict(x.coords) b/c gathering coords can blow up memory for some reason
array_metadata = client.gather(
[
{
k: client.submit(
lambda x: {
"dims": x[k].dims,
"coords": dict(x[k].coords),
"attrs": x[k].attrs,
},
futures[i],
**client_kwargs
)
for k in data_var_keys[i]
}
for i in range(len(futures))
]
)
data_arrays = [
{
k: (xr.DataArray(dask_data_arrays[i][k], **array_metadata[i][k]))
for k in data_var_keys[i]
}
for i in range(len(futures))
]
datasets = [xr.Dataset(arr) for arr in data_arrays]
dataset_metadata = client.gather(
client.map(lambda x: x.attrs, futures, **client_kwargs)
)
for i in range(len(futures)):
datasets[i].attrs.update(dataset_metadata[i])
return datasets
[docs]def dataset_from_delayed(futures, dim=None, client=None, **client_kwargs):
"""
Returns an :py:class:`xarray.Dataset` from a list of futures
Parameters
----------
futures : list
list of :py:class:`dask.delayed.Future` objects holding
:py:class:`xarray.Dataset` objects.
dim : str, optional
dimension along which to concat :py:class:`xarray.Dataset`.
Inferred by default.
client : object, optional
:py:class:`dask.distributed.Client` to use in gathering
metadata on futures. If not provided, client is inferred
from context.
client_kwargs : optional
kwargs to pass to ``client.map`` and ``client.gather`` commands (e.g.
``priority``)
Returns
-------
dataset : object
:py:class:`xarray.Dataset` concatenated along ``dim`` with
:py:class:`dask.array.Array` backends for each variable.
Examples
--------
Given a mapped :py:class:`xarray.Dataset`, pull the metadata into memory
while leaving the data on the workers:
.. code-block:: python
>>> import numpy as np, pandas as pd
>>> def build_ds(multiplier):
... return multiplier * xr.Dataset({
... 'var1': xr.DataArray(
... np.arange(2), dims=['x'], coords=[['a', 'b']])})
...
>>> client = dd.Client()
>>> fut = client.map(build_ds, range(3))
>>> ds = dataset_from_delayed(fut, dim=pd.Index(range(3), name='y'), priority=1)
>>> ds # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.Dataset>
Dimensions: (x: 2, y: 3)
Coordinates:
* x (x) <U1 'a' 'b'
* y (y) int64 0 1 2
Data variables:
var1 (y, x) int64 dask.array<chunksize=(1, 2), meta=np.ndarray>
>>> client.close()
"""
datasets = datasets_from_delayed(futures, client=client, **client_kwargs)
ds = xr.concat(datasets, dim=dim)
return ds
[docs]def choose_along_axis(arr, axis=-1, replace=True, nchoices=1, p=None):
"""
Wrapper on np.random.choice, but along a single dimension within a larger array
Parameters
----------
arr : np.array
Array with more than one dimension. Choices will be drawn from along the
``axis`` dimension.
axis : integer, optional
Dimension along which to draw samples
replace : bool, optional
Whether to sample with replacement. Passed to :py:func:`np.random.choice`.
Default 1.
nchoices : int, optional
Number of samples to draw. Must be less than or equal to the number of
valid options if replace is False. Default 1.
p : np.array
Array with the same shape as ``arr`` with weights for each choice. Each
dimension is sampled independently, so weights will be normalized to 1
along the ``axis`` dimension.
Returns
-------
sampled : np.array
Array with the same shape as ``arr`` but with length ``nchoices`` along axis
``axis`` and with values chosen from the values of ``arr`` along dimension
``axis`` with weights ``p``.
Examples
--------
Let's say we have an array with NaNs in it:
.. code-block:: python
>>> arr = np.arange(40).reshape(4, 2, 5).astype(float)
>>> for i in range(4):
... arr[i, :, i+1:] = np.nan
>>> arr # doctest: +NORMALIZE_WHITESPACE
array([[[ 0., nan, nan, nan, nan],
[ 5., nan, nan, nan, nan]],
[[10., 11., nan, nan, nan],
[15., 16., nan, nan, nan]],
[[20., 21., 22., nan, nan],
[25., 26., 27., nan, nan]],
[[30., 31., 32., 33., nan],
[35., 36., 37., 38., nan]]])
We can set weights such that we only select from non-nan values
.. code-block:: python
>>> p = (~np.isnan(arr))
>>> p = p / p.sum(axis=2).reshape(4, 2, 1)
Now, sampling from this along the second dimension will draw from
these values:
.. code-block:: python
>>> np.random.seed(1)
>>> choose_along_axis(arr, 2, p=p, nchoices=10) # doctest: +NORMALIZE_WHITESPACE
array([[[ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[ 5., 5., 5., 5., 5., 5., 5., 5., 5., 5.]],
[[11., 11., 10., 11., 11., 11., 10., 10., 10., 11.],
[15., 15., 16., 16., 16., 15., 16., 16., 15., 16.]],
[[22., 22., 20., 22., 20., 21., 22., 20., 20., 20.],
[25., 27., 25., 25., 26., 25., 26., 25., 26., 27.]],
[[30., 31., 32., 31., 30., 32., 32., 32., 33., 32.],
[38., 35., 35., 38., 36., 35., 38., 36., 38., 37.]]])
See Also
--------
:py:func:`np.random.choice` : 1-d version of this function
"""
if p is None:
p = np.ones_like(arr).astype(float) / arr.shape[axis]
axis = axis % len(arr.shape)
new_shape = tuple(list(arr.shape[:axis]) + [nchoices] + list(arr.shape[axis + 1 :]))
result = np.ndarray(shape=new_shape, dtype=arr.dtype)
for ind in np.ndindex(tuple([l for i, l in enumerate(arr.shape) if i != axis])):
indexer = tuple(list(ind[:axis]) + [slice(None)] + list(ind[axis:]))
result[indexer] = np.random.choice(
arr[indexer],
size=nchoices,
replace=replace,
p=p[indexer],
)
return result
[docs]def choose_along_dim(da, dim, samples=1, expand=None, new_dim_name=None):
"""
Sample values from a DataArray along a dimension
Wraps :py:func:`np.random.choice` to sample a different random index
(or set of indices) from along dimension ``dim`` for each combination of
elements along the other dimensions. This is very different from block
resampling - to block resample along a dimension simply choose a set
of indices and draw these from the array using :py:meth:`xr.DataArray.sel`.
Parameters
----------
da : xr.DataArray
DataArray from which to sample values.
dim: str
Dimension along which to sample. Sampling will draw from elements along
this dimension for all combinations of other dimensions.
samples : int, optional
Number of samples to take from the dimension ``dim``. If greater than 1,
``expand`` is ignored (and set to True).
expand : bool, optional
Whether to expand the array along the sampled dimension.
new_dim_name : str, optoinal
Name for the new dimension. If not provided, will use ``dim``.
Returns
-------
sampled : xr.DataArray
DataArray with sampled values chosen along dimension ``dim``
Examples
--------
.. code-block:: python
>>> da = xr.DataArray(
... np.arange(40).reshape(4, 2, 5),
... dims=['x', 'y', 'z'],
... coords=[np.arange(4), np.arange(2), np.arange(5)],
... )
>>> da # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.DataArray (x: 4, y: 2, z: 5)>
array([[[ 0, 1, 2, 3, 4],
[ 5, 6, 7, 8, 9]],
[[10, 11, 12, 13, 14],
[15, 16, 17, 18, 19]],
[[20, 21, 22, 23, 24],
[25, 26, 27, 28, 29]],
[[30, 31, 32, 33, 34],
[35, 36, 37, 38, 39]]])
Coordinates:
* x (x) int64 0 1 2 3
* y (y) int64 0 1
* z (z) int64 0 1 2 3 4
We can take a random value along the ``'z'`` dimension:
.. code-block:: python
>>> np.random.seed(1)
>>> choose_along_dim(da, 'z')
<xarray.DataArray (x: 4, y: 2)>
array([[ 2, 8],
[10, 16],
[20, 25],
[30, 36]])
Coordinates:
* x (x) int64 0 1 2 3
* y (y) int64 0 1
If you provide a ``sample`` argument greater than one (or
set expand=True) the array will be expanded to a new
dimension:
.. code-block:: python
>>> np.random.seed(1)
>>> choose_along_dim(da, 'z', samples=3) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
<xarray.DataArray (x: 4, y: 2, z: 3)>
array([[[ 2, 3, 0],
[ 6, 5, 5]],
[[10, 11, 11],
[17, 17, 18]],
[[21, 24, 20],
[28, 27, 27]],
[[30, 30, 34],
[39, 36, 38]]])
Coordinates:
* x (x) int64 0 1 2 3
* y (y) int64 0 1
* z (z) int64 0 1 2
"""
sampled = choose_along_axis(da.values, axis=da.get_axis_num(dim), nchoices=samples)
if samples > 1:
expand = True
if not expand:
sampled = np.take(sampled, 0, axis=da.get_axis_num(dim))
return xr.DataArray(
sampled,
dims=[d for d in da.dims if d != dim],
coords=[da.coords[d] for d in da.dims if d != dim],
)
else:
if new_dim_name is None:
new_dim_name = dim
return xr.DataArray(
sampled,
dims=[d if d != dim else new_dim_name for d in da.dims],
coords=[da.coords[d] if d != dim else np.arange(samples) for d in da.dims],
)
[docs]@xr.register_dataarray_accessor("random")
class random:
def __init__(self, xarray_obj):
self._xarray_obj = xarray_obj
[docs] @functools.wraps(choose_along_dim)
def choice(self, *args, **kwargs):
return choose_along_dim(self._xarray_obj, *args, **kwargs)
[docs]def document_dataset(
ds: xr.Dataset,
repository_root : [str, None] = None,
tz : str = "UTC",
inplace : bool = True,
) -> xr.Dataset:
"""
Add repository state and timestamp to dataset attrs
Parameters
----------
ds : xr.Dataset
Dataset to document
repository_root : str or None, optional
Path to the root of the repository to document. If ``None`` (default),
the current directory will be used, and will search parent directories
for a git repository. If a string is passed, parent directories will
not be searched - the directory must be a repository root which
conatins a ``.git`` directory.
tz : str, optional
time zone string parseable by datetime.datetime (e.g. "US/Pacific").
Default "UTC".
inplace : bool, optional
Whether to update the dataset's attributes in place (default) or to return
a copy of the dataset.
Returns
-------
ds : xr.Dataset
Dataset with updated attribute information. A dataset is returned regardless
of arguments - the inplace argument determines whether the returned dataset
will be a shallow copy or the original object (default).
"""
if not inplace:
ds = ds.copy(deep=False)
ds.attrs.update(rhg_compute_tools.utils.get_repo_state(repository_root))
ds.attrs["updated"] = pd.Timestamp.now(tz=tz).strftime("%c (%Z)")
return ds