Skip to content

utils


DaskWrapper

Parameters:

Name Type Description Default
dask_port int
8888
dask_timeouts int
60
dask_tmp_dir object
'/tmp'
workers int
8
Source code in download_toolbox/utils.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class DaskWrapper:
    """

    :param dask_port:
    :param dask_timeouts:
    :param dask_tmp_dir:
    :param workers:
    """

    def __init__(self,
                 dask_port: int = 8888,
                 dask_timeouts: int = 60,
                 dask_tmp_dir: object = "/tmp",
                 workers: int = 8,
                 scheduler: str = "single-threaded",
                 ):

        self._dashboard_port = dask_port
        self._timeout = dask_timeouts
        self._tmp_dir = dask_tmp_dir
        self._workers = workers
        self._scheduler = scheduler

        self._cluster = None
        self._client = None

    def __enter__(self):
        """

        """
        dashboard = "localhost:{}".format(self._dashboard_port)

        with dask.config.set({
            "temporary_directory": self._tmp_dir,
            "distributed.comm.timeouts.connect": self._timeout,
            "distributed.comm.timeouts.tcp": self._timeout,
            # "scheduler": self._scheduler, # Fix to "single-threaded" for netCDF4 >=1.6.1 not thread-safe.
        }):
            self._cluster = LocalCluster(
                dashboard_address=dashboard,
                n_workers=self._workers,
                threads_per_worker=1,
                scheduler_port=0,
            )
            logging.info("Dashboard at {}".format(dashboard))

            self._client = Client(self._cluster)
            logging.info("Using dask client {}".format(self._client))

        return self

    def __exit__(self, *exc_details):
        self._client.close()
        self._client = None

__enter__()

Source code in download_toolbox/utils.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __enter__(self):
    """

    """
    dashboard = "localhost:{}".format(self._dashboard_port)

    with dask.config.set({
        "temporary_directory": self._tmp_dir,
        "distributed.comm.timeouts.connect": self._timeout,
        "distributed.comm.timeouts.tcp": self._timeout,
        # "scheduler": self._scheduler, # Fix to "single-threaded" for netCDF4 >=1.6.1 not thread-safe.
    }):
        self._cluster = LocalCluster(
            dashboard_address=dashboard,
            n_workers=self._workers,
            threads_per_worker=1,
            scheduler_port=0,
        )
        logging.info("Dashboard at {}".format(dashboard))

        self._client = Client(self._cluster)
        logging.info("Using dask client {}".format(self._client))

    return self

run_command(command, dry=False)

Run a shell command

A wrapper in case we want some additional handling to go in here

Parameters:

Name Type Description Default
command str
required
dry bool
False

Returns:

Type Description
Source code in download_toolbox/utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def run_command(command: str, dry: bool = False):
    """Run a shell command

    A wrapper in case we want some additional handling to go in here

    :param command:
    :param dry:
    :return:

    """
    if dry:
        logging.info("Skipping dry commaand: {}".format(command))
        return 0

    ret = sp.run(command, shell=True)
    if ret.returncode < 0:
        logging.warning("Child was terminated by signal: {}".
                        format(-ret.returncode))
    else:
        logging.info("Child returned: {}".format(-ret.returncode))

    return ret