rhg_compute_tools package¶
Subpackages¶
Submodules¶
rhg_compute_tools.gcs module¶
Tools for interacting with GCS infrastructure.
-
rhg_compute_tools.gcs.
authenticated_client
(credentials=None, **client_kwargs)[source]¶ Convenience function to create an authenticated GCS client.
Parameters: - credentials (str or None, optional) – Str path to storage credentials authentication file. If None is passed (default) will create a Client object with no args, using the authorization credentials for the current environment. See the [google cloud storage docs]( https://googleapis.dev/python/google-api-core/latest/auth.html) for an overview of the authorization options.
- client_kwargs (optional) – kwargs to pass to the get_client function
Returns: Return type: google.cloud.storage.Client
-
rhg_compute_tools.gcs.
cp
(src, dest, flags=[])[source]¶ Copy a file or recursively copy a directory from local path to GCS or vice versa. Must have already authenticated to use. Notebook servers are automatically authenticated, but workers need to pass the path to the authentication json file to the GOOGLE_APPLICATION_CREDENTIALS env var. This is done automatically for rhg-data.json when using the get_worker wrapper.
Parameters: - dest (src,) – The paths to the source and destination file or directory. If on GCS, either the /gcs or gs:/ prefix will work.
- flags (list of str, optional) – String of flags to add to the gsutil cp command. e.g. flags=[‘r’] will run the command gsutil -m cp -r… (recursive copy)
Returns: - str – stdout from gsutil call
- str – stderr from gsutil call
datetime.timedelta
– Time it took to copy file(s).
-
rhg_compute_tools.gcs.
create_directory_markers
(bucket_name, project=None, client=None, prefix=None)[source]¶ Add directory markers in-place on a google cloud storage bucket
Parameters: - bucket_name (str) – name of the Google Cloud Storage bucket
- project (str or None, optional) – name of the Google Cloud Platform project. If None, inferred from the default project as determined by the client.
- client (google.cloud.storage.client.Client or None, optional) – Optionally pass a google.cloud.storage Client object to set auth and settings
- prefix (str or None, optional) – Prefix (relative to bucket root) below which to create markers
-
rhg_compute_tools.gcs.
get_bucket
(credentials=None, bucket_name='rhg-data', return_client=False, **client_kwargs)[source]¶ Return a bucket object from Rhg’s GCS system.
Parameters: - credentials (str or None, optional) – Str path to storage credentials authentication file. If None is passed (default) will create a Client object with no args, using the authorization credentials for the current environment. See the [google cloud storage docs]( https://googleapis.dev/python/google-api-core/latest/auth.html) for an overview of the authorization options.
- bucket_name (str, optional) – Name of bucket. Typically, we work with
rhg_data
(default) - return_client (bool, optional) – Return the Client object as a second object.
- client_kwargs (optional) – kwargs to pass to the get_client function
Returns: bucket
Return type:
-
rhg_compute_tools.gcs.
replicate_directory_structure_on_gcs
(src, dst, client)[source]¶ Replicate a local directory structure on google cloud storage
Parameters: - src (str) – Path to the root directory on the source machine. The directory structure within this directory will be reproduced within dst, e.g. /Users/myusername/my/data
- dst (str) – A url for the root directory of the destination, starting with gs://[bucket_name]/, e.g. gs://my_bucket/path/to/my/data
- client (google.cloud.storage.client.Client) – An authenticated
google.cloud.storage.client.Client
object.
-
rhg_compute_tools.gcs.
rm
(path, flags=[])[source]¶ Remove a file or recursively remove a directory from local path to GCS or vice versa. Must have already authenticated to use. Notebook servers are automatically authenticated, but workers need to pass the path to the authentication json file to the GOOGLE_APPLICATION_CREDENTIALS env var. This is done automatically for rhg-data.json when using the get_worker wrapper.
Parameters: - path (str or
pathlib.Path
) – The path to the source and destination file or directory. Either the /gcs or gs:/ prefix will work. - flags (list of str, optional) – String of flags to add to the gsutil rm command. e.g. flags=[‘r’] will run the command gsutil -m rm -r… (recursive remove)
Returns: - str – stdout from gsutil call
- str – stderr from gsutil call
datetime.timedelta
– Time it took to copy file(s).
- path (str or
-
rhg_compute_tools.gcs.
sync
(src, dest, flags=['r', 'd'])[source]¶ Sync a directory from local to GCS or vice versa. Uses gsutil rsync. Must have already authenticated to use. Notebook servers are automatically authenticated, but workers need to pass the path to the authentication json file to the GCLOUD_DEFAULT_TOKEN_FILE env var. This is done automatically for rhg-data.json when using the get_worker wrapper.
Parameters: - dest (src,) – The paths to the source and destination file or directory. If on GCS, either the /gcs or gs:/ prefix will work.
- flags (list of str, optional) – String of flags to add to the gsutil cp command. e.g. flags=[‘r’,’d’] will run the command gsutil -m cp -r -d… (recursive copy, delete any files on dest that are not on src). This is the default set of flags.
Returns: - str – stdout from gsutil call
- str – stderr from gsutil call
datetime.timedelta
– Time it took to copy file(s).
rhg_compute_tools.kubernetes module¶
Tools for interacting with kubernetes.
-
rhg_compute_tools.kubernetes.
get_big_cluster
(*args, **kwargs)[source]¶ Start a cluster with 2x the memory and CPU per worker relative to default
All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a
dask_gateway.Gateway
object and call gateway.cluster_options().Parameters: - name (str, optional) – Name of worker image to use (e.g.
rhodium/worker:latest
). IfNone
(default), default to worker specified intemplate_path
. - tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with
name
, which should include a tag. If provided, overrides the tag of the image specified intemplate_path
. IfNone
(default), the full image specified inname
ortemplate_path
is used. - extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed
using
pip install extra_pip_packages
. - profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
- cpus (float, optional) – Set the CPUs requested for your workers as defined by
profile
. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364). - cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing
cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name)
. May not use ifcred_path
is specified. - cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if
cred_name
is specified. - env_items (dict, optional) –
A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in
template_path
, e.g.{ 'MY_ENV_VAR': 'some string', }
- extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results
in no additional labels besides those in the template, as well as
jupyter_user
, which is inferred from theJUPYTERHUB_USER
, or, if not set, the server’s hostname. - extra_pod_tolerations (list of dict, optional) –
List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:
extra_pod_tolerations=[ { "effect": "NoSchedule", "key": "k8s.dask.org_dedicated", "operator": "Equal", "value": "worker-highcpu" }, { "effect": "NoSchedule", "key": "k8s.dask.org/dedicated", "operator": "Equal", "value": "worker-highcpu" } ]
- keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if
extra_pod_tolerations
isNone
or has length 0.
Returns: - client (object) –
dask.distributed.Client
connected to cluster - cluster (object) – Pre-configured
dask_gateway.GatewayCluster
See also
get_micro_cluster()
:- A cluster with one-CPU workers
get_standard_cluster()
:- The default cluster specification
get_big_cluster()
:- A cluster with workers twice the size of the default
get_giant_cluster()
:- A cluster with workers four times the size of the default
- name (str, optional) – Name of worker image to use (e.g.
-
rhg_compute_tools.kubernetes.
get_cluster
(*args, **kwargs)[source]¶ All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a
dask_gateway.Gateway
object and call gateway.cluster_options().Parameters: - name (str, optional) – Name of worker image to use (e.g.
rhodium/worker:latest
). IfNone
(default), default to worker specified intemplate_path
. - tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with
name
, which should include a tag. If provided, overrides the tag of the image specified intemplate_path
. IfNone
(default), the full image specified inname
ortemplate_path
is used. - extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed
using
pip install extra_pip_packages
. - profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
- cpus (float, optional) – Set the CPUs requested for your workers as defined by
profile
. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364). - cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing
cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name)
. May not use ifcred_path
is specified. - cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if
cred_name
is specified. - env_items (dict, optional) –
A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in
template_path
, e.g.{ 'MY_ENV_VAR': 'some string', }
- extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results
in no additional labels besides those in the template, as well as
jupyter_user
, which is inferred from theJUPYTERHUB_USER
, or, if not set, the server’s hostname. - extra_pod_tolerations (list of dict, optional) –
List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:
extra_pod_tolerations=[ { "effect": "NoSchedule", "key": "k8s.dask.org_dedicated", "operator": "Equal", "value": "worker-highcpu" }, { "effect": "NoSchedule", "key": "k8s.dask.org/dedicated", "operator": "Equal", "value": "worker-highcpu" } ]
- keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if
extra_pod_tolerations
isNone
or has length 0.
Returns: - client (object) –
dask.distributed.Client
connected to cluster - cluster (object) – Pre-configured
dask_gateway.GatewayCluster
See also
get_micro_cluster()
:- A cluster with one-CPU workers
get_standard_cluster()
:- The default cluster specification
get_big_cluster()
:- A cluster with workers twice the size of the default
get_giant_cluster()
:- A cluster with workers four times the size of the default
- name (str, optional) – Name of worker image to use (e.g.
-
rhg_compute_tools.kubernetes.
get_giant_cluster
(*args, **kwargs)[source]¶ Start a cluster with 4x the memory and CPU per worker relative to default
All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a
dask_gateway.Gateway
object and call gateway.cluster_options().Parameters: - name (str, optional) – Name of worker image to use (e.g.
rhodium/worker:latest
). IfNone
(default), default to worker specified intemplate_path
. - tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with
name
, which should include a tag. If provided, overrides the tag of the image specified intemplate_path
. IfNone
(default), the full image specified inname
ortemplate_path
is used. - extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed
using
pip install extra_pip_packages
. - profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
- cpus (float, optional) – Set the CPUs requested for your workers as defined by
profile
. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364). - cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing
cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name)
. May not use ifcred_path
is specified. - cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if
cred_name
is specified. - env_items (dict, optional) –
A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in
template_path
, e.g.{ 'MY_ENV_VAR': 'some string', }
- extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results
in no additional labels besides those in the template, as well as
jupyter_user
, which is inferred from theJUPYTERHUB_USER
, or, if not set, the server’s hostname. - extra_pod_tolerations (list of dict, optional) –
List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:
extra_pod_tolerations=[ { "effect": "NoSchedule", "key": "k8s.dask.org_dedicated", "operator": "Equal", "value": "worker-highcpu" }, { "effect": "NoSchedule", "key": "k8s.dask.org/dedicated", "operator": "Equal", "value": "worker-highcpu" } ]
- keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if
extra_pod_tolerations
isNone
or has length 0.
Returns: - client (object) –
dask.distributed.Client
connected to cluster - cluster (object) – Pre-configured
dask_gateway.GatewayCluster
See also
get_micro_cluster()
:- A cluster with one-CPU workers
get_standard_cluster()
:- The default cluster specification
get_big_cluster()
:- A cluster with workers twice the size of the default
get_giant_cluster()
:- A cluster with workers four times the size of the default
- name (str, optional) – Name of worker image to use (e.g.
-
rhg_compute_tools.kubernetes.
get_micro_cluster
(*args, **kwargs)[source]¶ Start a cluster with a single CPU per worker
All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a
dask_gateway.Gateway
object and call gateway.cluster_options().Parameters: - name (str, optional) – Name of worker image to use (e.g.
rhodium/worker:latest
). IfNone
(default), default to worker specified intemplate_path
. - tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with
name
, which should include a tag. If provided, overrides the tag of the image specified intemplate_path
. IfNone
(default), the full image specified inname
ortemplate_path
is used. - extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed
using
pip install extra_pip_packages
. - profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
- cpus (float, optional) – Set the CPUs requested for your workers as defined by
profile
. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364). - cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing
cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name)
. May not use ifcred_path
is specified. - cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if
cred_name
is specified. - env_items (dict, optional) –
A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in
template_path
, e.g.{ 'MY_ENV_VAR': 'some string', }
- extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results
in no additional labels besides those in the template, as well as
jupyter_user
, which is inferred from theJUPYTERHUB_USER
, or, if not set, the server’s hostname. - extra_pod_tolerations (list of dict, optional) –
List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:
extra_pod_tolerations=[ { "effect": "NoSchedule", "key": "k8s.dask.org_dedicated", "operator": "Equal", "value": "worker-highcpu" }, { "effect": "NoSchedule", "key": "k8s.dask.org/dedicated", "operator": "Equal", "value": "worker-highcpu" } ]
- keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if
extra_pod_tolerations
isNone
or has length 0.
Returns: - client (object) –
dask.distributed.Client
connected to cluster - cluster (object) – Pre-configured
dask_gateway.GatewayCluster
See also
get_micro_cluster()
:- A cluster with one-CPU workers
get_standard_cluster()
:- The default cluster specification
get_big_cluster()
:- A cluster with workers twice the size of the default
get_giant_cluster()
:- A cluster with workers four times the size of the default
- name (str, optional) – Name of worker image to use (e.g.
-
rhg_compute_tools.kubernetes.
get_standard_cluster
(*args, **kwargs)[source]¶ Start a cluster with 1x the memory and CPU per worker relative to default
All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a
dask_gateway.Gateway
object and call gateway.cluster_options().Parameters: - name (str, optional) – Name of worker image to use (e.g.
rhodium/worker:latest
). IfNone
(default), default to worker specified intemplate_path
. - tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with
name
, which should include a tag. If provided, overrides the tag of the image specified intemplate_path
. IfNone
(default), the full image specified inname
ortemplate_path
is used. - extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed
using
pip install extra_pip_packages
. - profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
- cpus (float, optional) – Set the CPUs requested for your workers as defined by
profile
. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364). - cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing
cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name)
. May not use ifcred_path
is specified. - cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if
cred_name
is specified. - env_items (dict, optional) –
A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in
template_path
, e.g.{ 'MY_ENV_VAR': 'some string', }
- extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results
in no additional labels besides those in the template, as well as
jupyter_user
, which is inferred from theJUPYTERHUB_USER
, or, if not set, the server’s hostname. - extra_pod_tolerations (list of dict, optional) –
List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:
extra_pod_tolerations=[ { "effect": "NoSchedule", "key": "k8s.dask.org_dedicated", "operator": "Equal", "value": "worker-highcpu" }, { "effect": "NoSchedule", "key": "k8s.dask.org/dedicated", "operator": "Equal", "value": "worker-highcpu" } ]
- keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if
extra_pod_tolerations
isNone
or has length 0.
Returns: - client (object) –
dask.distributed.Client
connected to cluster - cluster (object) – Pre-configured
dask_gateway.GatewayCluster
See also
get_micro_cluster()
:- A cluster with one-CPU workers
get_standard_cluster()
:- The default cluster specification
get_big_cluster()
:- A cluster with workers twice the size of the default
get_giant_cluster()
:- A cluster with workers four times the size of the default
- name (str, optional) – Name of worker image to use (e.g.
rhg_compute_tools.utils module¶
-
class
rhg_compute_tools.utils.
NumpyEncoder
(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶ Bases:
json.encoder.JSONEncoder
Helper class for json.dumps to coerce numpy objects to native python
-
default
(obj)[source]¶ Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
-
-
rhg_compute_tools.utils.
block_globals
[source]¶ Decorator to prevent globals and undefined closures in functions and classes
Parameters: - obj (function) – Function to decorate. All globals not matching one of the allowed types will raise an AssertionError
- allowed_types (type or tuple of types, optional) – Types which are allowed as globals. By default, functions and
modules are allowed. The full set of allowed types is drawn from
the
types
module, and includesFunctionType
,ModuleType
,MethodType
,ClassType
,BuiltinMethodType
, andBuiltinFunctionType
. - include_defaults (bool, optional) – If allowed_types is provided, setting
include_defaults
to True will append the default list of functions, modules, and methods to the user-passed list of allowed types. Default is True, in which case any user-passed elements will be added to the defaults described above. Setting to False will allow only the types passed inallowed_types
. - whitelist (list of str, optional) – Optional list of variable names to whitelist. If a list is provided, global variables will be compared to elements of this list based on their string names. Default (None) is no whitelist.
Examples
Wrap a function to block globals:
>>> my_data = 10 >>> @block_globals ... def add_5(data): ... ''' can you spot the global? ''' ... a_number = 5 ... result = a_number + my_data ... return result Traceback (most recent call last): ... TypeError: Illegal <class 'int'> global found in add_5: my_data
Wrapping a class will prevent globals from being used in all methods:
>>> @block_globals ... class MyClass: ... ... @staticmethod ... def add_5(data): ... ''' can you spot the global? ''' ... a_number = 5 ... result = a_number + my_data ... return result Traceback (most recent call last): ... TypeError: Illegal <class 'int'> global found in add_5: my_data
By default, functions and modules are allowed in the list of globals. You can modify this list with the
allowed_types
argument:>>> result_formatter = 'my number is {}' >>> @block_globals(allowed_types=str) ... def add_5(data): ... ''' only allowed globals here! ''' ... a_number = 5 ... result = a_number + data ... return result_formatter.format(result) ... >>> add_5(3) 'my number is 8'
block_globals will also catch undefined references:
>>> @block_globals ... def get_mean(df): ... return da.mean() Traceback (most recent call last): ... TypeError: Undefined global in get_mean: da
-
rhg_compute_tools.utils.
checkpoint
(jobs, futures, job_name, log_dir='.', extra_pending=None, extra_errors=None, extra_others=None)[source]¶ checkpoint and save a job state to disk
-
rhg_compute_tools.utils.
collapse
(*args, **kwargs)[source]¶ Collapse positional and keyword arguments into an (args, kwargs) tuple
Intended for use with the
expand()
decoratorParameters: - *args – Variable length argument list.
- **kwargs – Arbitrary keyword arguments.
Returns: - args (tuple) – Positional arguments tuple
- kwargs (dict) – Keyword argument dictionary
-
rhg_compute_tools.utils.
collapse_product
(*args, **kwargs)[source]¶ Parameters: - *args – Variable length list of iterables
- **kwargs – Keyword arguments, whose values must be iterables
Returns: Generator with collapsed arguments
Return type: iterator
See also
Function()
- py:func:collapse
Examples
>>> @expand ... def my_func(a, b, exp=1): ... return (a * b)**exp ... >>> product_args = list(collapse_product( ... [0, 1, 2], ... [0.5, 2], ... exp=[0, 1])) >>> product_args [((0, 0.5), {'exp': 0}), ((0, 0.5), {'exp': 1}), ((0, 2), {'exp': 0}), ((0, 2), {'exp': 1}), ((1, 0.5), {'exp': 0}), ((1, 0.5), {'exp': 1}), ((1, 2), {'exp': 0}), ((1, 2), {'exp': 1}), ((2, 0.5), {'exp': 0}), ((2, 0.5), {'exp': 1}), ((2, 2), {'exp': 0}), ((2, 2), {'exp': 1})] >>> list(map(my_func, product_args)) [1.0, 0.0, 1, 0, 1.0, 0.5, 1, 2, 1.0, 1.0, 1, 4]
-
rhg_compute_tools.utils.
expand
(func)[source]¶ Decorator to expand an (args, kwargs) tuple in function calls
Intended for use with the
collapse()
functionParameters: func (function) – Function to have arguments expanded. Func can have any number of positional and keyword arguments. Returns: wrapped – Wrapped version of func
which accepts a single(args, kwargs)
tuple.Return type: function Examples
>>> @expand ... def my_func(a, b, exp=1): ... return (a * b)**exp ... >>> my_func(((2, 3), {})) 6 >>> my_func(((2, 3, 2), {})) 36 >>> my_func((tuple([]), {'b': 4, 'exp': 2, 'a': 1})) 16
This function can be used in combination with the
collapse
helper function, which allows more natural parameter calls>>> my_func(collapse(2, 3, exp=2)) 36
These can then be paired to enable many parameterized function calls:
>>> func_calls = [collapse(a, a+1, exp=a) for a in range(5)] >>> list(map(my_func, func_calls)) [1, 2, 36, 1728, 160000]
-
rhg_compute_tools.utils.
get_repo_state
(repository_root: [<class 'str'>, None] = None) → dict[source]¶ Get a dictionary summarizing the current state of a repository.
Parameters: repository_root (str or None) – Path to the root of the repository to document. If None
(default), the current directory will be used, and will search parent directories for a git repository. If a string is passed, parent directories will not be searched - the directory must be a repository root which conatins a.git
directory.Returns: repo_state – Dictionary of repository information documenting the current state Return type: dict
-
rhg_compute_tools.utils.
recover
(job_name, log_dir='.')[source]¶ recover pending, errored, other jobs from a checkpoint
-
rhg_compute_tools.utils.
retry_with_timeout
[source]¶ Execute
func
n_tries
times, each time only allowingretry_freq
seconds for the function to complete. There are two main cases where this could be useful:- You have a function that you know should execute quickly, but you may get occasional errors when running it simultaneously on a large number of workers. An example of this is massively parallelized I/O operations of netcdfs on GCS.
- You have a function that may or may not take a long time, but you want to skip it if it takes too long.
There are two possible ways that this timeout function is implemented, each with pros and cons:
- Using python’s native
threading
module. If you are executingfunc
outside of adask
worker, you likely will want this approach. It may be slightly faster and has the benefit of starting the timeout clock when the function starts executing (rather than when the function is submitted to a dask scheduler). Note: This approach will also work if callingfunc
from a dask worker, but only if the cluster was set up such thatthreads_per_worker=1
. Otherwise, this may cause issues if used from a dask worker. - Using
dask
. If you would like a dask worker to execute this function, you likely will want this approach. It can be executed from a dask worker regardless of the number of threads per worker (see above), but has the downside that the timeout clock begins oncefunc
is submitted, rather than when it begins executing.
Parameters: - func (callable) – The function you would like to execute with a timeout backoff.
- retry_freq (float) – The number of seconds to wait between successive retries of
func
. - n_tries (int) – The number of retries to attempt before raising an error if none were successful
- use_dask (bool) – If true, will try to use the
dask
-based implementation (see description above). If noClient
instance is present, will fall back touse_dask=False
.
Returns: Return type: The return value of
func
Raises: - dask.distributed.TimeoutError : – If the function does not execute successfully in the specified
retry_freq
, after tryingn_tries
times. - ValueError : – If
use_dask=True
, and aClient
instance is present, but this fucntion is executed from the client (rather than as a task submitted to a worker), you will getValueError("No workers found")
.
Examples
>>> import time >>> @retry_with_timeout(retry_freq=.5, n_tries=1) ... def wait_func(timeout): ... time.sleep(timeout) >>> wait_func(.1) >>> wait_func(1) Traceback (most recent call last): ... asyncio.exceptions.TimeoutError: Func did not complete successfully in allowed time/number of retries.
rhg_compute_tools.xarray module¶
-
rhg_compute_tools.xarray.
choose_along_axis
(arr, axis=-1, replace=True, nchoices=1, p=None)[source]¶ Wrapper on np.random.choice, but along a single dimension within a larger array
Parameters: - arr (np.array) – Array with more than one dimension. Choices will be drawn from along the
axis
dimension. - axis (integer, optional) – Dimension along which to draw samples
- replace (bool, optional) – Whether to sample with replacement. Passed to
np.random.choice()
. Default 1. - nchoices (int, optional) – Number of samples to draw. Must be less than or equal to the number of valid options if replace is False. Default 1.
- p (np.array) – Array with the same shape as
arr
with weights for each choice. Each dimension is sampled independently, so weights will be normalized to 1 along theaxis
dimension.
Returns: sampled – Array with the same shape as
arr
but with lengthnchoices
along axisaxis
and with values chosen from the values ofarr
along dimensionaxis
with weightsp
.Return type: np.array
Examples
Let’s say we have an array with NaNs in it:
>>> arr = np.arange(40).reshape(4, 2, 5).astype(float) >>> for i in range(4): ... arr[i, :, i+1:] = np.nan >>> arr array([[[ 0., nan, nan, nan, nan], [ 5., nan, nan, nan, nan]], [[10., 11., nan, nan, nan], [15., 16., nan, nan, nan]], [[20., 21., 22., nan, nan], [25., 26., 27., nan, nan]], [[30., 31., 32., 33., nan], [35., 36., 37., 38., nan]]])
We can set weights such that we only select from non-nan values
>>> p = (~np.isnan(arr)) >>> p = p / p.sum(axis=2).reshape(4, 2, 1)
Now, sampling from this along the second dimension will draw from these values:
>>> np.random.seed(1) >>> choose_along_axis(arr, 2, p=p, nchoices=10) array([[[ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.], [ 5., 5., 5., 5., 5., 5., 5., 5., 5., 5.]], [[11., 11., 10., 11., 11., 11., 10., 10., 10., 11.], [15., 15., 16., 16., 16., 15., 16., 16., 15., 16.]], [[22., 22., 20., 22., 20., 21., 22., 20., 20., 20.], [25., 27., 25., 25., 26., 25., 26., 25., 26., 27.]], [[30., 31., 32., 31., 30., 32., 32., 32., 33., 32.], [38., 35., 35., 38., 36., 35., 38., 36., 38., 37.]]])
See also
np.random.choice()
: 1-d version of this function- arr (np.array) – Array with more than one dimension. Choices will be drawn from along the
-
rhg_compute_tools.xarray.
choose_along_dim
(da, dim, samples=1, expand=None, new_dim_name=None)[source]¶ Sample values from a DataArray along a dimension
Wraps
np.random.choice()
to sample a different random index (or set of indices) from along dimensiondim
for each combination of elements along the other dimensions. This is very different from block resampling - to block resample along a dimension simply choose a set of indices and draw these from the array usingxr.DataArray.sel()
.Parameters: - da (xr.DataArray) – DataArray from which to sample values.
- dim (str) – Dimension along which to sample. Sampling will draw from elements along this dimension for all combinations of other dimensions.
- samples (int, optional) – Number of samples to take from the dimension
dim
. If greater than 1,expand
is ignored (and set to True). - expand (bool, optional) – Whether to expand the array along the sampled dimension.
- new_dim_name (str, optoinal) – Name for the new dimension. If not provided, will use
dim
.
Returns: sampled – DataArray with sampled values chosen along dimension
dim
Return type: xr.DataArray
Examples
>>> da = xr.DataArray( ... np.arange(40).reshape(4, 2, 5), ... dims=['x', 'y', 'z'], ... coords=[np.arange(4), np.arange(2), np.arange(5)], ... ) >>> da <xarray.DataArray (x: 4, y: 2, z: 5)> array([[[ 0, 1, 2, 3, 4], [ 5, 6, 7, 8, 9]], [[10, 11, 12, 13, 14], [15, 16, 17, 18, 19]], [[20, 21, 22, 23, 24], [25, 26, 27, 28, 29]], [[30, 31, 32, 33, 34], [35, 36, 37, 38, 39]]]) Coordinates: * x (x) int64 0 1 2 3 * y (y) int64 0 1 * z (z) int64 0 1 2 3 4
We can take a random value along the
'z'
dimension:>>> np.random.seed(1) >>> choose_along_dim(da, 'z') <xarray.DataArray (x: 4, y: 2)> array([[ 2, 8], [10, 16], [20, 25], [30, 36]]) Coordinates: * x (x) int64 0 1 2 3 * y (y) int64 0 1
If you provide a
sample
argument greater than one (or set expand=True) the array will be expanded to a new dimension:>>> np.random.seed(1) >>> choose_along_dim(da, 'z', samples=3) <xarray.DataArray (x: 4, y: 2, z: 3)> array([[[ 2, 3, 0], [ 6, 5, 5]], [[10, 11, 11], [17, 17, 18]], [[21, 24, 20], [28, 27, 27]], [[30, 30, 34], [39, 36, 38]]]) Coordinates: * x (x) int64 0 1 2 3 * y (y) int64 0 1 * z (z) int64 0 1 2
-
rhg_compute_tools.xarray.
dataarray_from_delayed
(futures, dim=None, client=None, **client_kwargs)[source]¶ Returns a DataArray from a list of futures
Parameters: - futures (list) – list of
dask.delayed.Future
objects holdingxarray.DataArray
objects. - dim (str, optional) – dimension along which to concat
xarray.DataArray
. Inferred by default. - client (object, optional) –
dask.distributed.Client
to use in gathering metadata on futures. If not provided, client is inferred from context. - client_kwargs (optional) – kwargs to pass to
client.map
andclient.gather
commands (e.g.priority
)
Returns: array –
xarray.DataArray
concatenated alongdim
with adask.array.Array
backend.Return type: Examples
Given a mapped xarray DataArray, pull the metadata into memory while leaving the data on the workers:
>>> import numpy as np, pandas as pd >>> def build_arr(multiplier): ... return multiplier * xr.DataArray( ... np.arange(2), dims=['x'], coords=[['a', 'b']]) ... >>> client = dd.Client() >>> fut = client.map(build_arr, range(3)) >>> da = dataarray_from_delayed( ... fut, ... dim=pd.Index(range(3), name='simulation'), ... priority=1 ... ) ... >>> da <xarray.DataArray ...(simulation: 3, x: 2)> dask.array<...shape=(3, 2), dtype=int64, chunksize=(1, 2), chunktype=numpy.ndarray> Coordinates: * x (x) <U1 'a' 'b' * simulation (simulation) int64 0 1 2 >>> client.close()
- futures (list) – list of
-
rhg_compute_tools.xarray.
dataarrays_from_delayed
(futures, client=None, **client_kwargs)[source]¶ Returns a list of xarray dataarrays from a list of futures of dataarrays
Parameters: - futures (list) – list of
dask.delayed.Future
objects holdingxarray.DataArray
objects. - client (object, optional) –
dask.distributed.Client
to use in gathering metadata on futures. If not provided, client is inferred from context. - client_kwargs (optional) – kwargs to pass to
client.map
andclient.gather
commands (e.g.priority
)
Returns: arrays – list of
xarray.DataArray
objects withdask.array.Array
backends.Return type: list
Examples
Given a mapped xarray DataArray, pull the metadata into memory while leaving the data on the workers:
>>> import numpy as np >>> def build_arr(multiplier): ... return multiplier * xr.DataArray( ... np.arange(2), dims=['x'], coords=[['a', 'b']]) ... >>> client = dd.Client() >>> fut = client.map(build_arr, range(3)) >>> arrs = dataarrays_from_delayed(fut, priority=1) >>> arrs[-1] <xarray.DataArray ...(x: 2)> dask.array<...shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray> Coordinates: * x (x) <U1 'a' 'b'
This list of arrays can now be manipulated using normal xarray tools:
>>> xr.concat(arrs, dim='simulation') <xarray.DataArray ...(simulation: 3, x: 2)> dask.array<...shape=(3, 2), dtype=int64, chunksize=(1, 2), chunktype=numpy.ndarray> Coordinates: * x (x) <U1 'a' 'b' Dimensions without coordinates: simulation >>> client.close()
- futures (list) – list of
-
rhg_compute_tools.xarray.
dataset_from_delayed
(futures, dim=None, client=None, **client_kwargs)[source]¶ Returns an
xarray.Dataset
from a list of futuresParameters: - futures (list) – list of
dask.delayed.Future
objects holdingxarray.Dataset
objects. - dim (str, optional) – dimension along which to concat
xarray.Dataset
. Inferred by default. - client (object, optional) –
dask.distributed.Client
to use in gathering metadata on futures. If not provided, client is inferred from context. - client_kwargs (optional) – kwargs to pass to
client.map
andclient.gather
commands (e.g.priority
)
Returns: dataset –
xarray.Dataset
concatenated alongdim
withdask.array.Array
backends for each variable.Return type: Examples
Given a mapped
xarray.Dataset
, pull the metadata into memory while leaving the data on the workers:>>> import numpy as np, pandas as pd >>> def build_ds(multiplier): ... return multiplier * xr.Dataset({ ... 'var1': xr.DataArray( ... np.arange(2), dims=['x'], coords=[['a', 'b']])}) ... >>> client = dd.Client() >>> fut = client.map(build_ds, range(3)) >>> ds = dataset_from_delayed(fut, dim=pd.Index(range(3), name='y'), priority=1) >>> ds <xarray.Dataset> Dimensions: (x: 2, y: 3) Coordinates: * x (x) <U1 'a' 'b' * y (y) int64 0 1 2 Data variables: var1 (y, x) int64 dask.array<chunksize=(1, 2), meta=np.ndarray> >>> client.close()
- futures (list) – list of
-
rhg_compute_tools.xarray.
datasets_from_delayed
(futures, client=None, **client_kwargs)[source]¶ Returns a list of xarray datasets from a list of futures of datasets
Parameters: - futures (list) – list of
dask.delayed.Future
objects holdingxarray.Dataset
objects. - client (object, optional) –
dask.distributed.Client
to use in gathering metadata on futures. If not provided, client is inferred from context. - client_kwargs (optional) – kwargs to pass to
client.map
andclient.gather
commands (e.g.priority
)
Returns: datasets – list of
xarray.Dataset
objects withdask.array.Array
backends for each variable.Return type: list
Examples
Given a mapped
xarray.Dataset
, pull the metadata into memory while leaving the data on the workers:>>> import numpy as np >>> def build_ds(multiplier): ... return multiplier * xr.Dataset({ ... 'var1': xr.DataArray( ... np.arange(2), dims=['x'], coords=[['a', 'b']])}) ... >>> client = dd.Client() >>> fut = client.map(build_ds, range(3)) >>> arrs = datasets_from_delayed(fut, priority=1) >>> arrs[-1] <xarray.Dataset> Dimensions: (x: 2) Coordinates: * x (x) <U1 'a' 'b' Data variables: var1 (x) int64 dask.array<chunksize=(2,), meta=np.ndarray>
This list of arrays can now be manipulated using normal xarray tools:
>>> xr.concat(arrs, dim='y') <xarray.Dataset> Dimensions: (x: 2, y: 3) Coordinates: * x (x) <U1 'a' 'b' Dimensions without coordinates: y Data variables: var1 (y, x) int64 dask.array<chunksize=(1, 2), meta=np.ndarray> >>> client.close()
- futures (list) – list of
-
rhg_compute_tools.xarray.
document_dataset
(ds: xarray.core.dataset.Dataset, repository_root: [<class 'str'>, None] = None, tz: str = 'UTC', inplace: bool = True) → xarray.core.dataset.Dataset[source]¶ Add repository state and timestamp to dataset attrs
Parameters: - ds (xr.Dataset) – Dataset to document
- repository_root (str or None, optional) – Path to the root of the repository to document. If
None
(default), the current directory will be used, and will search parent directories for a git repository. If a string is passed, parent directories will not be searched - the directory must be a repository root which conatins a.git
directory. - tz (str, optional) – time zone string parseable by datetime.datetime (e.g. “US/Pacific”). Default “UTC”.
- inplace (bool, optional) – Whether to update the dataset’s attributes in place (default) or to return a copy of the dataset.
Returns: ds – Dataset with updated attribute information. A dataset is returned regardless of arguments - the inplace argument determines whether the returned dataset will be a shallow copy or the original object (default).
Return type: xr.Dataset
-
class
rhg_compute_tools.xarray.
random
(xarray_obj)[source]¶ Bases:
object
-
choice
(dim, samples=1, expand=None, new_dim_name=None)[source]¶ Sample values from a DataArray along a dimension
Wraps
np.random.choice()
to sample a different random index (or set of indices) from along dimensiondim
for each combination of elements along the other dimensions. This is very different from block resampling - to block resample along a dimension simply choose a set of indices and draw these from the array usingxr.DataArray.sel()
.Parameters: - da (xr.DataArray) – DataArray from which to sample values.
- dim (str) – Dimension along which to sample. Sampling will draw from elements along this dimension for all combinations of other dimensions.
- samples (int, optional) – Number of samples to take from the dimension
dim
. If greater than 1,expand
is ignored (and set to True). - expand (bool, optional) – Whether to expand the array along the sampled dimension.
- new_dim_name (str, optoinal) – Name for the new dimension. If not provided, will use
dim
.
Returns: sampled – DataArray with sampled values chosen along dimension
dim
Return type: xr.DataArray
Examples
>>> da = xr.DataArray( ... np.arange(40).reshape(4, 2, 5), ... dims=['x', 'y', 'z'], ... coords=[np.arange(4), np.arange(2), np.arange(5)], ... ) >>> da <xarray.DataArray (x: 4, y: 2, z: 5)> array([[[ 0, 1, 2, 3, 4], [ 5, 6, 7, 8, 9]], [[10, 11, 12, 13, 14], [15, 16, 17, 18, 19]], [[20, 21, 22, 23, 24], [25, 26, 27, 28, 29]], [[30, 31, 32, 33, 34], [35, 36, 37, 38, 39]]]) Coordinates: * x (x) int64 0 1 2 3 * y (y) int64 0 1 * z (z) int64 0 1 2 3 4
We can take a random value along the
'z'
dimension:>>> np.random.seed(1) >>> choose_along_dim(da, 'z') <xarray.DataArray (x: 4, y: 2)> array([[ 2, 8], [10, 16], [20, 25], [30, 36]]) Coordinates: * x (x) int64 0 1 2 3 * y (y) int64 0 1
If you provide a
sample
argument greater than one (or set expand=True) the array will be expanded to a new dimension:>>> np.random.seed(1) >>> choose_along_dim(da, 'z', samples=3) <xarray.DataArray (x: 4, y: 2, z: 3)> array([[[ 2, 3, 0], [ 6, 5, 5]], [[10, 11, 11], [17, 17, 18]], [[21, 24, 20], [28, 27, 27]], [[30, 30, 34], [39, 36, 38]]]) Coordinates: * x (x) int64 0 1 2 3 * y (y) int64 0 1 * z (z) int64 0 1 2
-
Module contents¶
Top-level package for RHG Compute Tools.