rhg_compute_tools package

Submodules

rhg_compute_tools.gcs module

Tools for interacting with GCS infrastructure.

rhg_compute_tools.gcs.authenticated_client(credentials=None, **client_kwargs)[source]

Convenience function to create an authenticated GCS client.

Parameters:
  • credentials (str or None, optional) – Str path to storage credentials authentication file. If None is passed (default) will create a Client object with no args, using the authorization credentials for the current environment. See the [google cloud storage docs]( https://googleapis.dev/python/google-api-core/latest/auth.html) for an overview of the authorization options.
  • client_kwargs (optional) – kwargs to pass to the get_client function
Returns:

Return type:

google.cloud.storage.Client

rhg_compute_tools.gcs.cp(src, dest, flags=[])[source]

Copy a file or recursively copy a directory from local path to GCS or vice versa. Must have already authenticated to use. Notebook servers are automatically authenticated, but workers need to pass the path to the authentication json file to the GOOGLE_APPLICATION_CREDENTIALS env var. This is done automatically for rhg-data.json when using the get_worker wrapper.

Parameters:
  • dest (src,) – The paths to the source and destination file or directory. If on GCS, either the /gcs or gs:/ prefix will work.
  • flags (list of str, optional) – String of flags to add to the gsutil cp command. e.g. flags=[‘r’] will run the command gsutil -m cp -r… (recursive copy)
Returns:

  • str – stdout from gsutil call
  • str – stderr from gsutil call
  • datetime.timedelta – Time it took to copy file(s).

rhg_compute_tools.gcs.create_directories_under_blob(blob, project=None, client=None)[source]
rhg_compute_tools.gcs.create_directory_markers(bucket_name, project=None, client=None, prefix=None)[source]

Add directory markers in-place on a google cloud storage bucket

Parameters:
  • bucket_name (str) – name of the Google Cloud Storage bucket
  • project (str or None, optional) – name of the Google Cloud Platform project. If None, inferred from the default project as determined by the client.
  • client (google.cloud.storage.client.Client or None, optional) – Optionally pass a google.cloud.storage Client object to set auth and settings
  • prefix (str or None, optional) – Prefix (relative to bucket root) below which to create markers
rhg_compute_tools.gcs.get_bucket(credentials=None, bucket_name='rhg-data', return_client=False, **client_kwargs)[source]

Return a bucket object from Rhg’s GCS system.

Parameters:
  • credentials (str or None, optional) – Str path to storage credentials authentication file. If None is passed (default) will create a Client object with no args, using the authorization credentials for the current environment. See the [google cloud storage docs]( https://googleapis.dev/python/google-api-core/latest/auth.html) for an overview of the authorization options.
  • bucket_name (str, optional) – Name of bucket. Typically, we work with rhg_data (default)
  • return_client (bool, optional) – Return the Client object as a second object.
  • client_kwargs (optional) – kwargs to pass to the get_client function
Returns:

bucket

Return type:

google.cloud.storage.bucket.Bucket

rhg_compute_tools.gcs.ls(dir_path)[source]

List a directory quickly using gsutil

rhg_compute_tools.gcs.replicate_directory_structure_on_gcs(src, dst, client)[source]

Replicate a local directory structure on google cloud storage

Parameters:
  • src (str) – Path to the root directory on the source machine. The directory structure within this directory will be reproduced within dst, e.g. /Users/myusername/my/data
  • dst (str) – A url for the root directory of the destination, starting with gs://[bucket_name]/, e.g. gs://my_bucket/path/to/my/data
  • client (google.cloud.storage.client.Client) – An authenticated google.cloud.storage.client.Client object.
rhg_compute_tools.gcs.rm(path, flags=[])[source]

Remove a file or recursively remove a directory from local path to GCS or vice versa. Must have already authenticated to use. Notebook servers are automatically authenticated, but workers need to pass the path to the authentication json file to the GOOGLE_APPLICATION_CREDENTIALS env var. This is done automatically for rhg-data.json when using the get_worker wrapper.

Parameters:
  • path (str or pathlib.Path) – The path to the source and destination file or directory. Either the /gcs or gs:/ prefix will work.
  • flags (list of str, optional) – String of flags to add to the gsutil rm command. e.g. flags=[‘r’] will run the command gsutil -m rm -r… (recursive remove)
Returns:

  • str – stdout from gsutil call
  • str – stderr from gsutil call
  • datetime.timedelta – Time it took to copy file(s).

rhg_compute_tools.gcs.sync(src, dest, flags=['r', 'd'])[source]

Sync a directory from local to GCS or vice versa. Uses gsutil rsync. Must have already authenticated to use. Notebook servers are automatically authenticated, but workers need to pass the path to the authentication json file to the GCLOUD_DEFAULT_TOKEN_FILE env var. This is done automatically for rhg-data.json when using the get_worker wrapper.

Parameters:
  • dest (src,) – The paths to the source and destination file or directory. If on GCS, either the /gcs or gs:/ prefix will work.
  • flags (list of str, optional) – String of flags to add to the gsutil cp command. e.g. flags=[‘r’,’d’] will run the command gsutil -m cp -r -d… (recursive copy, delete any files on dest that are not on src). This is the default set of flags.
Returns:

  • str – stdout from gsutil call
  • str – stderr from gsutil call
  • datetime.timedelta – Time it took to copy file(s).

rhg_compute_tools.kubernetes module

Tools for interacting with kubernetes.

rhg_compute_tools.kubernetes.get_big_cluster(*args, **kwargs)[source]

Start a cluster with 2x the memory and CPU per worker relative to default

All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a dask_gateway.Gateway object and call gateway.cluster_options().

Parameters:
  • name (str, optional) – Name of worker image to use (e.g. rhodium/worker:latest). If None (default), default to worker specified in template_path.
  • tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with name, which should include a tag. If provided, overrides the tag of the image specified in template_path. If None (default), the full image specified in name or template_path is used.
  • extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed using pip install extra_pip_packages.
  • profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
  • cpus (float, optional) – Set the CPUs requested for your workers as defined by profile. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364).
  • cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name). May not use if cred_path is specified.
  • cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if cred_name is specified.
  • env_items (dict, optional) –

    A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in template_path, e.g.

    {
        'MY_ENV_VAR': 'some string',
    }
    
  • extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results in no additional labels besides those in the template, as well as jupyter_user, which is inferred from the JUPYTERHUB_USER, or, if not set, the server’s hostname.
  • extra_pod_tolerations (list of dict, optional) –

    List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:

    extra_pod_tolerations=[
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org_dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        },
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org/dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        }
    ]
    
  • keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if extra_pod_tolerations is None or has length 0.
Returns:

  • client (object) – dask.distributed.Client connected to cluster
  • cluster (object) – Pre-configured dask_gateway.GatewayCluster

See also

get_micro_cluster() :
A cluster with one-CPU workers
get_standard_cluster() :
The default cluster specification
get_big_cluster() :
A cluster with workers twice the size of the default
get_giant_cluster() :
A cluster with workers four times the size of the default
rhg_compute_tools.kubernetes.get_cluster(*args, **kwargs)[source]

All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a dask_gateway.Gateway object and call gateway.cluster_options().

Parameters:
  • name (str, optional) – Name of worker image to use (e.g. rhodium/worker:latest). If None (default), default to worker specified in template_path.
  • tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with name, which should include a tag. If provided, overrides the tag of the image specified in template_path. If None (default), the full image specified in name or template_path is used.
  • extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed using pip install extra_pip_packages.
  • profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
  • cpus (float, optional) – Set the CPUs requested for your workers as defined by profile. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364).
  • cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name). May not use if cred_path is specified.
  • cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if cred_name is specified.
  • env_items (dict, optional) –

    A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in template_path, e.g.

    {
        'MY_ENV_VAR': 'some string',
    }
    
  • extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results in no additional labels besides those in the template, as well as jupyter_user, which is inferred from the JUPYTERHUB_USER, or, if not set, the server’s hostname.
  • extra_pod_tolerations (list of dict, optional) –

    List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:

    extra_pod_tolerations=[
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org_dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        },
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org/dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        }
    ]
    
  • keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if extra_pod_tolerations is None or has length 0.
Returns:

  • client (object) – dask.distributed.Client connected to cluster
  • cluster (object) – Pre-configured dask_gateway.GatewayCluster

See also

get_micro_cluster() :
A cluster with one-CPU workers
get_standard_cluster() :
The default cluster specification
get_big_cluster() :
A cluster with workers twice the size of the default
get_giant_cluster() :
A cluster with workers four times the size of the default
rhg_compute_tools.kubernetes.get_giant_cluster(*args, **kwargs)[source]

Start a cluster with 4x the memory and CPU per worker relative to default

All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a dask_gateway.Gateway object and call gateway.cluster_options().

Parameters:
  • name (str, optional) – Name of worker image to use (e.g. rhodium/worker:latest). If None (default), default to worker specified in template_path.
  • tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with name, which should include a tag. If provided, overrides the tag of the image specified in template_path. If None (default), the full image specified in name or template_path is used.
  • extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed using pip install extra_pip_packages.
  • profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
  • cpus (float, optional) – Set the CPUs requested for your workers as defined by profile. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364).
  • cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name). May not use if cred_path is specified.
  • cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if cred_name is specified.
  • env_items (dict, optional) –

    A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in template_path, e.g.

    {
        'MY_ENV_VAR': 'some string',
    }
    
  • extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results in no additional labels besides those in the template, as well as jupyter_user, which is inferred from the JUPYTERHUB_USER, or, if not set, the server’s hostname.
  • extra_pod_tolerations (list of dict, optional) –

    List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:

    extra_pod_tolerations=[
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org_dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        },
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org/dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        }
    ]
    
  • keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if extra_pod_tolerations is None or has length 0.
Returns:

  • client (object) – dask.distributed.Client connected to cluster
  • cluster (object) – Pre-configured dask_gateway.GatewayCluster

See also

get_micro_cluster() :
A cluster with one-CPU workers
get_standard_cluster() :
The default cluster specification
get_big_cluster() :
A cluster with workers twice the size of the default
get_giant_cluster() :
A cluster with workers four times the size of the default
rhg_compute_tools.kubernetes.get_micro_cluster(*args, **kwargs)[source]

Start a cluster with a single CPU per worker

All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a dask_gateway.Gateway object and call gateway.cluster_options().

Parameters:
  • name (str, optional) – Name of worker image to use (e.g. rhodium/worker:latest). If None (default), default to worker specified in template_path.
  • tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with name, which should include a tag. If provided, overrides the tag of the image specified in template_path. If None (default), the full image specified in name or template_path is used.
  • extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed using pip install extra_pip_packages.
  • profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
  • cpus (float, optional) – Set the CPUs requested for your workers as defined by profile. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364).
  • cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name). May not use if cred_path is specified.
  • cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if cred_name is specified.
  • env_items (dict, optional) –

    A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in template_path, e.g.

    {
        'MY_ENV_VAR': 'some string',
    }
    
  • extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results in no additional labels besides those in the template, as well as jupyter_user, which is inferred from the JUPYTERHUB_USER, or, if not set, the server’s hostname.
  • extra_pod_tolerations (list of dict, optional) –

    List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:

    extra_pod_tolerations=[
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org_dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        },
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org/dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        }
    ]
    
  • keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if extra_pod_tolerations is None or has length 0.
Returns:

  • client (object) – dask.distributed.Client connected to cluster
  • cluster (object) – Pre-configured dask_gateway.GatewayCluster

See also

get_micro_cluster() :
A cluster with one-CPU workers
get_standard_cluster() :
The default cluster specification
get_big_cluster() :
A cluster with workers twice the size of the default
get_giant_cluster() :
A cluster with workers four times the size of the default
rhg_compute_tools.kubernetes.get_standard_cluster(*args, **kwargs)[source]

Start a cluster with 1x the memory and CPU per worker relative to default

All arguments are optional. If not provided, defaults will be used. To view defaults, instantiate a dask_gateway.Gateway object and call gateway.cluster_options().

Parameters:
  • name (str, optional) – Name of worker image to use (e.g. rhodium/worker:latest). If None (default), default to worker specified in template_path.
  • tag (str, optional) – Tag of the worker image to use. Cannot be used in combination with name, which should include a tag. If provided, overrides the tag of the image specified in template_path. If None (default), the full image specified in name or template_path is used.
  • extra_pip_packages (str, optional) – Extra pip packages to install on worker. Packages are installed using pip install extra_pip_packages.
  • profile (One of ["micro", "standard", "big", "giant"]) – Determines size of worker. CPUs assigned are slightly under 1, 2, 4, and 8, respectively. Memory assigned is slightly over 6, 12, 24, and 48 GB, respectively.
  • cpus (float, optional) – Set the CPUs requested for your workers as defined by profile. Will raise error if >7.5, because our 8-CPU nodes need ~.5 vCPU for kubernetes pods. (NOTE 12/15/20: This is currently set to 1 by default to allow for mapping big workflows across inputs, see https://github.com/dask/dask-gateway/issues/364).
  • cred_name (str, optional) – Name of Google Cloud credentials file to use, equivalent to providing cred_path='/opt/gcsfuse_tokens/{}.json'.format(cred_name). May not use if cred_path is specified.
  • cred_path (str, optional) – Path to Google Cloud credentials file to use. May not use if cred_name is specified.
  • env_items (dict, optional) –

    A dictionary of env variable ‘name’-‘value’ pairs to append to the env variables included in template_path, e.g.

    {
        'MY_ENV_VAR': 'some string',
    }
    
  • extra_worker_labels (dict, optional) – Dictionary of kubernetes labels to apply to pods. None (default) results in no additional labels besides those in the template, as well as jupyter_user, which is inferred from the JUPYTERHUB_USER, or, if not set, the server’s hostname.
  • extra_pod_tolerations (list of dict, optional) –

    List of pod toleration dictionaries. For example, to match a node pool NoSchedule toleration, you might provide:

    extra_pod_tolerations=[
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org_dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        },
        {
            "effect": "NoSchedule",
            "key": "k8s.dask.org/dedicated",
            "operator": "Equal",
            "value": "worker-highcpu"
        }
    ]
    
  • keep_default_tolerations (bool, optional) – Whether to append (default) or replace the default tolerations. Ignored if extra_pod_tolerations is None or has length 0.
Returns:

  • client (object) – dask.distributed.Client connected to cluster
  • cluster (object) – Pre-configured dask_gateway.GatewayCluster

See also

get_micro_cluster() :
A cluster with one-CPU workers
get_standard_cluster() :
The default cluster specification
get_big_cluster() :
A cluster with workers twice the size of the default
get_giant_cluster() :
A cluster with workers four times the size of the default
rhg_compute_tools.kubernetes.traceback(ftr)[source]

rhg_compute_tools.utils module

class rhg_compute_tools.utils.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.encoder.JSONEncoder

Helper class for json.dumps to coerce numpy objects to native python

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
rhg_compute_tools.utils.block_globals[source]

Decorator to prevent globals and undefined closures in functions and classes

Parameters:
  • obj (function) – Function to decorate. All globals not matching one of the allowed types will raise an AssertionError
  • allowed_types (type or tuple of types, optional) – Types which are allowed as globals. By default, functions and modules are allowed. The full set of allowed types is drawn from the types module, and includes FunctionType, ModuleType, MethodType, ClassType, BuiltinMethodType, and BuiltinFunctionType.
  • include_defaults (bool, optional) – If allowed_types is provided, setting include_defaults to True will append the default list of functions, modules, and methods to the user-passed list of allowed types. Default is True, in which case any user-passed elements will be added to the defaults described above. Setting to False will allow only the types passed in allowed_types.
  • whitelist (list of str, optional) – Optional list of variable names to whitelist. If a list is provided, global variables will be compared to elements of this list based on their string names. Default (None) is no whitelist.

Examples

Wrap a function to block globals:

>>> my_data = 10

>>> @block_globals
... def add_5(data):
...     ''' can you spot the global? '''
...     a_number = 5
...     result = a_number + my_data
...     return result  
Traceback (most recent call last):
...
TypeError: Illegal <class 'int'> global found in add_5: my_data

Wrapping a class will prevent globals from being used in all methods:

>>> @block_globals
... class MyClass:
...
...     @staticmethod
...     def add_5(data):
...         ''' can you spot the global? '''
...         a_number = 5
...         result = a_number + my_data
...         return result  
Traceback (most recent call last):
...
TypeError: Illegal <class 'int'> global found in add_5: my_data

By default, functions and modules are allowed in the list of globals. You can modify this list with the allowed_types argument:

>>> result_formatter = 'my number is {}'
>>> @block_globals(allowed_types=str)
... def add_5(data):
...     ''' only allowed globals here! '''
...     a_number = 5
...     result = a_number + data
...     return result_formatter.format(result)
...
>>> add_5(3)
'my number is 8'

block_globals will also catch undefined references:

>>> @block_globals
... def get_mean(df):
...     return da.mean()  
Traceback (most recent call last):
...
TypeError: Undefined global in get_mean: da
rhg_compute_tools.utils.checkpoint(jobs, futures, job_name, log_dir='.', extra_pending=None, extra_errors=None, extra_others=None)[source]

checkpoint and save a job state to disk

rhg_compute_tools.utils.collapse(*args, **kwargs)[source]

Collapse positional and keyword arguments into an (args, kwargs) tuple

Intended for use with the expand() decorator

Parameters:
  • *args – Variable length argument list.
  • **kwargs – Arbitrary keyword arguments.
Returns:

  • args (tuple) – Positional arguments tuple
  • kwargs (dict) – Keyword argument dictionary

rhg_compute_tools.utils.collapse_product(*args, **kwargs)[source]
Parameters:
  • *args – Variable length list of iterables
  • **kwargs – Keyword arguments, whose values must be iterables
Returns:

Generator with collapsed arguments

Return type:

iterator

See also

Function()
py:func:collapse

Examples

>>> @expand
... def my_func(a, b, exp=1):
...     return (a * b)**exp
...

>>> product_args = list(collapse_product(
...     [0, 1, 2],
...     [0.5, 2],
...     exp=[0, 1]))

>>> product_args  
[((0, 0.5), {'exp': 0}),
 ((0, 0.5), {'exp': 1}),
 ((0, 2), {'exp': 0}),
 ((0, 2), {'exp': 1}),
 ((1, 0.5), {'exp': 0}),
 ((1, 0.5), {'exp': 1}),
 ((1, 2), {'exp': 0}),
 ((1, 2), {'exp': 1}),
 ((2, 0.5), {'exp': 0}),
 ((2, 0.5), {'exp': 1}),
 ((2, 2), {'exp': 0}),
 ((2, 2), {'exp': 1})]

>>> list(map(my_func, product_args))
[1.0, 0.0, 1, 0, 1.0, 0.5, 1, 2, 1.0, 1.0, 1, 4]
rhg_compute_tools.utils.expand(func)[source]

Decorator to expand an (args, kwargs) tuple in function calls

Intended for use with the collapse() function

Parameters:func (function) – Function to have arguments expanded. Func can have any number of positional and keyword arguments.
Returns:wrapped – Wrapped version of func which accepts a single (args, kwargs) tuple.
Return type:function

Examples

>>> @expand
... def my_func(a, b, exp=1):
...     return (a * b)**exp
...

>>> my_func(((2, 3), {}))
6

>>> my_func(((2, 3, 2), {}))
36

>>> my_func((tuple([]), {'b': 4, 'exp': 2, 'a': 1}))
16

This function can be used in combination with the collapse helper function, which allows more natural parameter calls

>>> my_func(collapse(2, 3, exp=2))
36

These can then be paired to enable many parameterized function calls:

>>> func_calls = [collapse(a, a+1, exp=a) for a in range(5)]

>>> list(map(my_func, func_calls))
[1, 2, 36, 1728, 160000]
rhg_compute_tools.utils.get_repo_state(repository_root: [<class 'str'>, None] = None) → dict[source]

Get a dictionary summarizing the current state of a repository.

Parameters:repository_root (str or None) – Path to the root of the repository to document. If None (default), the current directory will be used, and will search parent directories for a git repository. If a string is passed, parent directories will not be searched - the directory must be a repository root which conatins a .git directory.
Returns:repo_state – Dictionary of repository information documenting the current state
Return type:dict
class rhg_compute_tools.utils.html(body)[source]

Bases: object

rhg_compute_tools.utils.recover(job_name, log_dir='.')[source]

recover pending, errored, other jobs from a checkpoint

rhg_compute_tools.utils.retry_with_timeout[source]

Execute func n_tries times, each time only allowing retry_freq seconds for the function to complete. There are two main cases where this could be useful:

  1. You have a function that you know should execute quickly, but you may get occasional errors when running it simultaneously on a large number of workers. An example of this is massively parallelized I/O operations of netcdfs on GCS.
  2. You have a function that may or may not take a long time, but you want to skip it if it takes too long.

There are two possible ways that this timeout function is implemented, each with pros and cons:

  1. Using python’s native threading module. If you are executing func outside of a dask worker, you likely will want this approach. It may be slightly faster and has the benefit of starting the timeout clock when the function starts executing (rather than when the function is submitted to a dask scheduler). Note: This approach will also work if calling func from a dask worker, but only if the cluster was set up such that threads_per_worker=1. Otherwise, this may cause issues if used from a dask worker.
  2. Using dask. If you would like a dask worker to execute this function, you likely will want this approach. It can be executed from a dask worker regardless of the number of threads per worker (see above), but has the downside that the timeout clock begins once func is submitted, rather than when it begins executing.
Parameters:
  • func (callable) – The function you would like to execute with a timeout backoff.
  • retry_freq (float) – The number of seconds to wait between successive retries of func.
  • n_tries (int) – The number of retries to attempt before raising an error if none were successful
  • use_dask (bool) – If true, will try to use the dask-based implementation (see description above). If no Client instance is present, will fall back to use_dask=False.
Returns:

Return type:

The return value of func

Raises:
  • dask.distributed.TimeoutError : – If the function does not execute successfully in the specified retry_freq, after trying n_tries times.
  • ValueError : – If use_dask=True, and a Client instance is present, but this fucntion is executed from the client (rather than as a task submitted to a worker), you will get ValueError("No workers found").

Examples

>>> import time
>>> @retry_with_timeout(retry_freq=.5, n_tries=1)
... def wait_func(timeout):
...     time.sleep(timeout)
>>> wait_func(.1)
>>> wait_func(1)
Traceback (most recent call last):
    ...
asyncio.exceptions.TimeoutError: Func did not complete successfully in allowed time/number of retries.

rhg_compute_tools.xarray module

rhg_compute_tools.xarray.choose_along_axis(arr, axis=-1, replace=True, nchoices=1, p=None)[source]

Wrapper on np.random.choice, but along a single dimension within a larger array

Parameters:
  • arr (np.array) – Array with more than one dimension. Choices will be drawn from along the axis dimension.
  • axis (integer, optional) – Dimension along which to draw samples
  • replace (bool, optional) – Whether to sample with replacement. Passed to np.random.choice(). Default 1.
  • nchoices (int, optional) – Number of samples to draw. Must be less than or equal to the number of valid options if replace is False. Default 1.
  • p (np.array) – Array with the same shape as arr with weights for each choice. Each dimension is sampled independently, so weights will be normalized to 1 along the axis dimension.
Returns:

sampled – Array with the same shape as arr but with length nchoices along axis axis and with values chosen from the values of arr along dimension axis with weights p.

Return type:

np.array

Examples

Let’s say we have an array with NaNs in it:

>>> arr = np.arange(40).reshape(4, 2, 5).astype(float)
>>> for i in range(4):
...     arr[i, :, i+1:] = np.nan
>>> arr  
array([[[ 0., nan, nan, nan, nan],
        [ 5., nan, nan, nan, nan]],
       [[10., 11., nan, nan, nan],
        [15., 16., nan, nan, nan]],
       [[20., 21., 22., nan, nan],
        [25., 26., 27., nan, nan]],
       [[30., 31., 32., 33., nan],
        [35., 36., 37., 38., nan]]])

We can set weights such that we only select from non-nan values

>>> p = (~np.isnan(arr))
>>> p = p / p.sum(axis=2).reshape(4, 2, 1)

Now, sampling from this along the second dimension will draw from these values:

>>> np.random.seed(1)
>>> choose_along_axis(arr, 2, p=p, nchoices=10)  
array([[[ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.]],
       [[11., 11., 10., 11., 11., 11., 10., 10., 10., 11.],
        [15., 15., 16., 16., 16., 15., 16., 16., 15., 16.]],
       [[22., 22., 20., 22., 20., 21., 22., 20., 20., 20.],
        [25., 27., 25., 25., 26., 25., 26., 25., 26., 27.]],
       [[30., 31., 32., 31., 30., 32., 32., 32., 33., 32.],
        [38., 35., 35., 38., 36., 35., 38., 36., 38., 37.]]])

See also

np.random.choice() : 1-d version of this function

rhg_compute_tools.xarray.choose_along_dim(da, dim, samples=1, expand=None, new_dim_name=None)[source]

Sample values from a DataArray along a dimension

Wraps np.random.choice() to sample a different random index (or set of indices) from along dimension dim for each combination of elements along the other dimensions. This is very different from block resampling - to block resample along a dimension simply choose a set of indices and draw these from the array using xr.DataArray.sel().

Parameters:
  • da (xr.DataArray) – DataArray from which to sample values.
  • dim (str) – Dimension along which to sample. Sampling will draw from elements along this dimension for all combinations of other dimensions.
  • samples (int, optional) – Number of samples to take from the dimension dim. If greater than 1, expand is ignored (and set to True).
  • expand (bool, optional) – Whether to expand the array along the sampled dimension.
  • new_dim_name (str, optoinal) – Name for the new dimension. If not provided, will use dim.
Returns:

sampled – DataArray with sampled values chosen along dimension dim

Return type:

xr.DataArray

Examples

>>> da = xr.DataArray(
...     np.arange(40).reshape(4, 2, 5),
...     dims=['x', 'y', 'z'],
...     coords=[np.arange(4), np.arange(2), np.arange(5)],
... )

>>> da 
<xarray.DataArray (x: 4, y: 2, z: 5)>
array([[[ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9]],
       [[10, 11, 12, 13, 14],
        [15, 16, 17, 18, 19]],
       [[20, 21, 22, 23, 24],
        [25, 26, 27, 28, 29]],
       [[30, 31, 32, 33, 34],
        [35, 36, 37, 38, 39]]])
Coordinates:
  * x        (x) int64 0 1 2 3
  * y        (y) int64 0 1
  * z        (z) int64 0 1 2 3 4

We can take a random value along the 'z' dimension:

>>> np.random.seed(1)
>>> choose_along_dim(da, 'z')
<xarray.DataArray (x: 4, y: 2)>
array([[ 2,  8],
       [10, 16],
       [20, 25],
       [30, 36]])
Coordinates:
  * x        (x) int64 0 1 2 3
  * y        (y) int64 0 1

If you provide a sample argument greater than one (or set expand=True) the array will be expanded to a new dimension:

>>> np.random.seed(1)
>>> choose_along_dim(da, 'z', samples=3) 
<xarray.DataArray (x: 4, y: 2, z: 3)>
array([[[ 2,  3,  0],
        [ 6,  5,  5]],
       [[10, 11, 11],
        [17, 17, 18]],
       [[21, 24, 20],
        [28, 27, 27]],
       [[30, 30, 34],
        [39, 36, 38]]])
Coordinates:
  * x        (x) int64 0 1 2 3
  * y        (y) int64 0 1
  * z        (z) int64 0 1 2
rhg_compute_tools.xarray.dataarray_from_delayed(futures, dim=None, client=None, **client_kwargs)[source]

Returns a DataArray from a list of futures

Parameters:
  • futures (list) – list of dask.delayed.Future objects holding xarray.DataArray objects.
  • dim (str, optional) – dimension along which to concat xarray.DataArray. Inferred by default.
  • client (object, optional) – dask.distributed.Client to use in gathering metadata on futures. If not provided, client is inferred from context.
  • client_kwargs (optional) – kwargs to pass to client.map and client.gather commands (e.g. priority)
Returns:

arrayxarray.DataArray concatenated along dim with a dask.array.Array backend.

Return type:

object

Examples

Given a mapped xarray DataArray, pull the metadata into memory while leaving the data on the workers:

>>> import numpy as np, pandas as pd

>>> def build_arr(multiplier):
...     return multiplier * xr.DataArray(
...         np.arange(2), dims=['x'], coords=[['a', 'b']])
...

>>> client = dd.Client()
>>> fut = client.map(build_arr, range(3))
>>> da = dataarray_from_delayed(
...     fut,
...     dim=pd.Index(range(3), name='simulation'),
...     priority=1
... )
...

>>> da  
<xarray.DataArray ...(simulation: 3, x: 2)>
dask.array<...shape=(3, 2), dtype=int64, chunksize=(1, 2), chunktype=numpy.ndarray>
Coordinates:
  * x           (x) <U1 'a' 'b'
  * simulation  (simulation) int64 0 1 2

>>> client.close()
rhg_compute_tools.xarray.dataarrays_from_delayed(futures, client=None, **client_kwargs)[source]

Returns a list of xarray dataarrays from a list of futures of dataarrays

Parameters:
  • futures (list) – list of dask.delayed.Future objects holding xarray.DataArray objects.
  • client (object, optional) – dask.distributed.Client to use in gathering metadata on futures. If not provided, client is inferred from context.
  • client_kwargs (optional) – kwargs to pass to client.map and client.gather commands (e.g. priority)
Returns:

arrays – list of xarray.DataArray objects with dask.array.Array backends.

Return type:

list

Examples

Given a mapped xarray DataArray, pull the metadata into memory while leaving the data on the workers:

>>> import numpy as np

>>> def build_arr(multiplier):
...     return multiplier * xr.DataArray(
...         np.arange(2), dims=['x'], coords=[['a', 'b']])
...

>>> client = dd.Client()
>>> fut = client.map(build_arr, range(3))
>>> arrs = dataarrays_from_delayed(fut, priority=1)
>>> arrs[-1]  
<xarray.DataArray ...(x: 2)>
dask.array<...shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) <U1 'a' 'b'

This list of arrays can now be manipulated using normal xarray tools:

>>> xr.concat(arrs, dim='simulation') 
<xarray.DataArray ...(simulation: 3, x: 2)>
dask.array<...shape=(3, 2), dtype=int64, chunksize=(1, 2), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) <U1 'a' 'b'
Dimensions without coordinates: simulation

>>> client.close()
rhg_compute_tools.xarray.dataset_from_delayed(futures, dim=None, client=None, **client_kwargs)[source]

Returns an xarray.Dataset from a list of futures

Parameters:
  • futures (list) – list of dask.delayed.Future objects holding xarray.Dataset objects.
  • dim (str, optional) – dimension along which to concat xarray.Dataset. Inferred by default.
  • client (object, optional) – dask.distributed.Client to use in gathering metadata on futures. If not provided, client is inferred from context.
  • client_kwargs (optional) – kwargs to pass to client.map and client.gather commands (e.g. priority)
Returns:

datasetxarray.Dataset concatenated along dim with dask.array.Array backends for each variable.

Return type:

object

Examples

Given a mapped xarray.Dataset, pull the metadata into memory while leaving the data on the workers:

>>> import numpy as np, pandas as pd

>>> def build_ds(multiplier):
...     return multiplier * xr.Dataset({
...         'var1': xr.DataArray(
...             np.arange(2), dims=['x'], coords=[['a', 'b']])})
...

>>> client = dd.Client()
>>> fut = client.map(build_ds, range(3))
>>> ds = dataset_from_delayed(fut, dim=pd.Index(range(3), name='y'), priority=1)
>>> ds 
<xarray.Dataset>
Dimensions:  (x: 2, y: 3)
Coordinates:
  * x        (x) <U1 'a' 'b'
  * y        (y) int64 0 1 2
Data variables:
    var1     (y, x) int64 dask.array<chunksize=(1, 2), meta=np.ndarray>

>>> client.close()
rhg_compute_tools.xarray.datasets_from_delayed(futures, client=None, **client_kwargs)[source]

Returns a list of xarray datasets from a list of futures of datasets

Parameters:
  • futures (list) – list of dask.delayed.Future objects holding xarray.Dataset objects.
  • client (object, optional) – dask.distributed.Client to use in gathering metadata on futures. If not provided, client is inferred from context.
  • client_kwargs (optional) – kwargs to pass to client.map and client.gather commands (e.g. priority)
Returns:

datasets – list of xarray.Dataset objects with dask.array.Array backends for each variable.

Return type:

list

Examples

Given a mapped xarray.Dataset, pull the metadata into memory while leaving the data on the workers:

>>> import numpy as np

>>> def build_ds(multiplier):
...     return multiplier * xr.Dataset({
...         'var1': xr.DataArray(
...             np.arange(2), dims=['x'], coords=[['a', 'b']])})
...

>>> client = dd.Client()
>>> fut = client.map(build_ds, range(3))
>>> arrs = datasets_from_delayed(fut, priority=1)
>>> arrs[-1]  
<xarray.Dataset>
Dimensions:  (x: 2)
Coordinates:
  * x        (x) <U1 'a' 'b'
Data variables:
    var1     (x) int64 dask.array<chunksize=(2,), meta=np.ndarray>

This list of arrays can now be manipulated using normal xarray tools:

>>> xr.concat(arrs, dim='y') 
<xarray.Dataset>
Dimensions:  (x: 2, y: 3)
Coordinates:
  * x        (x) <U1 'a' 'b'
Dimensions without coordinates: y
Data variables:
    var1     (y, x) int64 dask.array<chunksize=(1, 2), meta=np.ndarray>

>>> client.close()
rhg_compute_tools.xarray.document_dataset(ds: xarray.core.dataset.Dataset, repository_root: [<class 'str'>, None] = None, tz: str = 'UTC', inplace: bool = True) → xarray.core.dataset.Dataset[source]

Add repository state and timestamp to dataset attrs

Parameters:
  • ds (xr.Dataset) – Dataset to document
  • repository_root (str or None, optional) – Path to the root of the repository to document. If None (default), the current directory will be used, and will search parent directories for a git repository. If a string is passed, parent directories will not be searched - the directory must be a repository root which conatins a .git directory.
  • tz (str, optional) – time zone string parseable by datetime.datetime (e.g. “US/Pacific”). Default “UTC”.
  • inplace (bool, optional) – Whether to update the dataset’s attributes in place (default) or to return a copy of the dataset.
Returns:

ds – Dataset with updated attribute information. A dataset is returned regardless of arguments - the inplace argument determines whether the returned dataset will be a shallow copy or the original object (default).

Return type:

xr.Dataset

class rhg_compute_tools.xarray.random(xarray_obj)[source]

Bases: object

choice(dim, samples=1, expand=None, new_dim_name=None)[source]

Sample values from a DataArray along a dimension

Wraps np.random.choice() to sample a different random index (or set of indices) from along dimension dim for each combination of elements along the other dimensions. This is very different from block resampling - to block resample along a dimension simply choose a set of indices and draw these from the array using xr.DataArray.sel().

Parameters:
  • da (xr.DataArray) – DataArray from which to sample values.
  • dim (str) – Dimension along which to sample. Sampling will draw from elements along this dimension for all combinations of other dimensions.
  • samples (int, optional) – Number of samples to take from the dimension dim. If greater than 1, expand is ignored (and set to True).
  • expand (bool, optional) – Whether to expand the array along the sampled dimension.
  • new_dim_name (str, optoinal) – Name for the new dimension. If not provided, will use dim.
Returns:

sampled – DataArray with sampled values chosen along dimension dim

Return type:

xr.DataArray

Examples

>>> da = xr.DataArray(
...     np.arange(40).reshape(4, 2, 5),
...     dims=['x', 'y', 'z'],
...     coords=[np.arange(4), np.arange(2), np.arange(5)],
... )

>>> da 
<xarray.DataArray (x: 4, y: 2, z: 5)>
array([[[ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9]],
       [[10, 11, 12, 13, 14],
        [15, 16, 17, 18, 19]],
       [[20, 21, 22, 23, 24],
        [25, 26, 27, 28, 29]],
       [[30, 31, 32, 33, 34],
        [35, 36, 37, 38, 39]]])
Coordinates:
  * x        (x) int64 0 1 2 3
  * y        (y) int64 0 1
  * z        (z) int64 0 1 2 3 4

We can take a random value along the 'z' dimension:

>>> np.random.seed(1)
>>> choose_along_dim(da, 'z')
<xarray.DataArray (x: 4, y: 2)>
array([[ 2,  8],
       [10, 16],
       [20, 25],
       [30, 36]])
Coordinates:
  * x        (x) int64 0 1 2 3
  * y        (y) int64 0 1

If you provide a sample argument greater than one (or set expand=True) the array will be expanded to a new dimension:

>>> np.random.seed(1)
>>> choose_along_dim(da, 'z', samples=3) 
<xarray.DataArray (x: 4, y: 2, z: 3)>
array([[[ 2,  3,  0],
        [ 6,  5,  5]],
       [[10, 11, 11],
        [17, 17, 18]],
       [[21, 24, 20],
        [28, 27, 27]],
       [[30, 30, 34],
        [39, 36, 38]]])
Coordinates:
  * x        (x) int64 0 1 2 3
  * y        (y) int64 0 1
  * z        (z) int64 0 1 2

Module contents

Top-level package for RHG Compute Tools.