Skip to content

cds


CDSDownloadArgParser

Bases: DownloadArgParser

Source code in download_toolbox/data/cds.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class CDSDownloadArgParser(DownloadArgParser):
    def __init__(self,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)

        self.add_argument("-i", "--identifier",
                          help="Name of the output dataset where it's stored, overriding default",
                          default="cds",
                          type=str)

    def add_cds_specs(self):
        """Arguments for dataset and product_type"""
        self.add_argument("-ds", "--dataset",
                          help="Dataset to download",
                          type=str)
        self.add_argument("-pt", "--product-type",
                          help="Product type for the dataset",
                          type=csv_arg,
                          default=None)
        self.add_argument("--time",
                          help="Comma separated list of times for the dataset ('00:00,01:00'...), or 'all' for all 24 hours",
                          type=csv_arg,
                          default=[])

        # TODO: Pull this to constructor and update other downloaders
        self.add_argument("--compress",
                          help="Provide an integer from 1-9 (low to high) on how much to compress the output netCDF",
                          default=None,
                          type=int)
        return self

    def add_derived_specs(self):
        """Arguments for derived datasets"""
        self.add_argument("--daily-statistic",
                          help="Daily statistic for derived datasets",
                          type=str,
                          default="daily_mean")
        self.add_argument("--time-zone",
                          help="Time zone for derived datasets",
                          type=str,
                          default="utc+00:00")
        self.add_argument("--derived-frequency",
                          help="Frequency for derived datasets",
                          type=str,
                          default="1_hourly")
        return self

    def add_var_specs(self):
        super().add_var_specs()
        # TODO: short_names is experimental for CDS downloads only, but maybe should be moved to DatasetConfig
        self.add_argument("-l", "--long-names",
                          help="If provided, this will override name mappings for the given variable prefixes",
                          default=None,
                          type=csv_arg)
        return self

add_cds_specs()

Arguments for dataset and product_type

Source code in download_toolbox/data/cds.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def add_cds_specs(self):
    """Arguments for dataset and product_type"""
    self.add_argument("-ds", "--dataset",
                      help="Dataset to download",
                      type=str)
    self.add_argument("-pt", "--product-type",
                      help="Product type for the dataset",
                      type=csv_arg,
                      default=None)
    self.add_argument("--time",
                      help="Comma separated list of times for the dataset ('00:00,01:00'...), or 'all' for all 24 hours",
                      type=csv_arg,
                      default=[])

    # TODO: Pull this to constructor and update other downloaders
    self.add_argument("--compress",
                      help="Provide an integer from 1-9 (low to high) on how much to compress the output netCDF",
                      default=None,
                      type=int)
    return self

add_derived_specs()

Arguments for derived datasets

Source code in download_toolbox/data/cds.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def add_derived_specs(self):
    """Arguments for derived datasets"""
    self.add_argument("--daily-statistic",
                      help="Daily statistic for derived datasets",
                      type=str,
                      default="daily_mean")
    self.add_argument("--time-zone",
                      help="Time zone for derived datasets",
                      type=str,
                      default="utc+00:00")
    self.add_argument("--derived-frequency",
                      help="Frequency for derived datasets",
                      type=str,
                      default="1_hourly")
    return self

CDSDownloader

Bases: ThreadedDownloader

Source code in download_toolbox/data/cds.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
class CDSDownloader(ThreadedDownloader):
    def __init__(self,
                 dataset: CDSDatasetConfig,
                 *args,
                 show_progress: bool = False,
                 start_date: object,
                 dataset_name: Union[str, None] = None,
                 product_type: Union[list, None] = None,
                 time: Union[list, None] = None,
                 daily_statistic: str = "daily_mean",
                 time_zone: str = "utc+00:00",
                 derived_frequency: str = "1_hourly",
                 compress: Union[int, None] = None,
                 request_args: [dict, None] = None,
                 zipped: bool = False,
                 **kwargs):
        self._client = cds.Client(progress=show_progress)
        self._dataset_name = dataset_name
        self._product_type = product_type
        self._time = time
        # Variables for derived daily statistics
        self._daily_statistic = daily_statistic
        self._time_zone = time_zone
        self._derived_frequency = derived_frequency
        self._compress = compress
        self._request_args = request_args
        self._zipped = zipped

        super().__init__(dataset,
                         *args,
                         source_min_frequency=Frequency.YEAR,
                         # TODO: validate handling of hourly data, but it is
                         #  possible as a temporal resolution
                         source_max_frequency=Frequency.HOUR,
                         start_date=start_date,
                         **kwargs)

        self.download_method = self._single_api_download

        if self.max_threads > 10:
            logging.info("Upping connection limit for max_threads > 10")
            adapter = requests.adapters.HTTPAdapter(
                pool_connections=self.max_threads,
                pool_maxsize=self.max_threads
            )
            self._client.session.mount("https://", adapter)

    def _single_api_download(self,
                             var_config: object,
                             req_dates: object) -> list:
        """Implements a single download from CDS API

        Args:
            var_config:
            req_dates: The requested dates
        """

        logging.debug("Processing {} dates for {}".format(len(req_dates), var_config))
        monthly_request = self.dataset.frequency < Frequency.DAY

        retrieve_ext = "zip" if self._zipped else "nc"
        temp_download_path = os.path.join(var_config.root_path,
                                          self.dataset.location.name,
                                          "temp.{}".format(os.path.basename(
                                              self.dataset.var_filepath(var_config, req_dates,
                                                                        file_extension=retrieve_ext))))
        download_path = os.path.join(var_config.root_path,
                                     self.dataset.location.name,
                                     os.path.basename(self.dataset.var_filepath(var_config, req_dates)))
        os.makedirs(os.path.dirname(download_path), exist_ok=True)

        if os.path.exists(download_path):
            logging.warning(f"We have a downloaded file available, skipping: {download_path}")
            return [download_path]

        # Default to legacy values if not provided
        if not self._product_type:
            product_type = ["reanalysis",] if not monthly_request else ["monthly_averaged_reanalysis_by_hour_of_day",]
        else:
            product_type = self._product_type

        retrieve_dict = {
            "product_type": product_type,
            "variable": self.dataset.cdi_map[var_config.prefix],
            "year": [int(req_dates[0].year),],
            "month": list(set(["{:02d}".format(rd.month)
                               for rd in sorted(req_dates)])),
        }

        if not self._zipped:
            retrieve_dict.update({
                "format": "netcdf",
                "grid": [0.25, 0.25],
                "area": self.dataset.location.bounds,
                "download_format": "unarchived"
            })

        # Add derived dataset-specific keys
        stats_dataset = False
        if self._dataset_name in [
            "derived-era5-pressure-levels-daily-statistics",
            "derived-era5-single-levels-daily-statistics"
        ]:
            stats_dataset = True
            retrieve_dict.update({
                "daily_statistic": self._daily_statistic,
                "time_zone": self._time_zone,
                "frequency": self._derived_frequency
            })

        level_id = "single-levels"
        if var_config.level:
            level_id = "pressure-levels"
            retrieve_dict["pressure_level"] = [var_config.level]

        # Default to legacy values if not provided
        if not self._product_type:
            dataset = "reanalysis-era5-{}{}".format(level_id, "-monthly-means" if monthly_request else "")
        else:
            # TODO: this is a bit of a hack, but it works for now
            # Updating dataset name if multiple pressure levels are requested
            if var_config.level and "single-levels" in self._dataset_name:
                dataset = self._dataset_name.replace("single-levels", "pressure-levels")
            else:
                dataset = self._dataset_name

        # FIXME: this is quite shaky, not using at present
        #    # _, date_end = get_era5_available_date_range(dataset)

        #    # TODO: This updates to dates available for download, prevents
        #    #       redundant downloads but, requires work to prevent
        #    #       postprocess method from running if no downloaded file.
        #    # req_dates = [date for date in req_dates if date <= date_end]
        # END

        if not monthly_request:
            retrieve_dict["day"] = ["{:02d}".format(d) for d in range(1, 32)]

            # No time key required for daily stats dataset, instead uses `time_zone`
            if not stats_dataset:
                if self._time and isinstance(self._time, list):
                    if self._time[0] == "all":
                        time = ["{:02d}:00".format(h) for h in range(0, 24)]
                    else:
                        time = self._time
                else:
                    time = ["12:00",]
                retrieve_dict["time"] = time

        if self._request_args is not None:
            logging.debug("Updating request with dictionary {}".format(self._request_args))
            retrieve_dict.update(self._request_args)

        if os.path.exists(temp_download_path):
            # TODO: we need a better mechanism for keeping temp files and reprocessing
            if self._zipped:
                logging.warning("{} already exists, all we can do is assume to try and reprocess".format(temp_download_path))
            else:
                raise DownloaderError("{} already exists, this shouldn't be the case".format(temp_download_path))
        else:
            try:
                logging.info("Downloading data for {}...".format(var_config.name))
                logging.debug("Request dataset {} with:\n".format(pformat(retrieve_dict)))
                self._client.retrieve(
                    dataset,
                    retrieve_dict,
                    temp_download_path)
                logging.info("Download completed: {}".format(temp_download_path))
            # cdsapi uses raise Exception in many places, so having a catch-all is appropriate
            except Exception as e:
                logging.exception("{} not downloaded, look at the problem".format(temp_download_path))
                self.missing_dates.extend(req_dates)
                return []

        if self._zipped:
            zf = zipfile.ZipFile(temp_download_path)
            zip_output_path = os.path.join(var_config.root_path,
                                           self.dataset.location.name)
            zipped_data_files = [df_name for df_name in zf.namelist()
                                 if df_name.endswith(".nc")
                                 and not os.path.exists(os.path.join(zip_output_path, df_name))]
            zf.extractall(path=zip_output_path,
                          members=zipped_data_files)

            # For the moment we'll keep the zips
            temp_download_path = [os.path.join(zip_output_path, df_name)
                                  for df_name in zf.namelist()
                                  if df_name.endswith(".nc")]
            ds = xr.open_mfdataset(temp_download_path)
        else:
            ds = xr.open_dataset(temp_download_path)

        # TODO: there is duplicated / messy code here from CDS API alterations, clean it up
        # New CDSAPI file holds more data_vars than just variable.
        # Omit them when figuring out default CDS variable name.
        omit_vars = {"number", "expver", "time", "date", "valid_time", "latitude", "longitude", "time_counter_bnds"}
        data_vars = set(ds.data_vars)
        var_list = list(data_vars.difference(omit_vars))
        if not var_list:
            raise ValueError("No variables found in file")
        elif len(var_list) > 1:
            raise ValueError(f"""Multiple variables found in data file!
                                 There should only be one variable.
                                 {var_list}""")
        src_var_name = var_list[0]
        var_name = var_config.name

        # Rename time and variable names for consistency
        rename_vars = {
                       src_var_name: var_name,
                       }
        if "date" in ds:
            rename_vars.update({"date": "time"})
        elif "valid_time" in ds:
            rename_vars.update({"valid_time": "time"})
        elif "time_counter" in ds:
            rename_vars.update({"time_counter": "time"})

        if all([f in ds for f in ["nav_lat", "nav_lon"]]):
            logging.warning("We have nav_lat and nav_lon which suggests a tripolar grid"
                            " which we cannot convert within the downloader yet")
            northing, westing, southing, easting = self.dataset.location.bounds
            # FIXME: naively grab the output based on the latitude
            ds = ds.where(((ds.nav_lat <= northing) & (ds.nav_lat >= southing)).compute(), drop=True)

        da = getattr(ds.rename(rename_vars), var_name)

        # This data downloader handles different pressure_levels in independent
        # files rather than storing them all in separate dimension of one array/file.
        if "pressure_level" in da.dims:
            da = da.squeeze(dim="pressure_level").drop_vars("pressure_level")

        if "number" in da.coords:
            da = da.drop_vars("number")

        # Updating coord attribute definitions (needs file read in with `decode_cf=False`)
        if "coordinates" in da.attrs:
            omit_attrs = ["number", "expver", "isobaricInhPa"]
            attributes = re.sub(r"valid_time|date", "time", da.attrs["coordinates"]).split()
            attributes = [attr for attr in attributes if attr not in omit_attrs]
            da.attrs["coordinates"] = " ".join(attributes)

        # Bryn Note:
        # expver = 1: ERA5
        # expver = 5: ERA5T
        # The latest 3 months of data is ERA5T and may be subject to changes.
        # Data prior to this is from ERA5.
        # The new CDSAPI returns combined data when `reanalysis` is requested.
        if 'expver' in da.coords:
            logging.warning("expver in coordinates, new cdsapi returns ERA5 and "
                            "ERA5T combined, this needs further work: expver needs "
                            "storing for later overwriting")
            # Ref: https://confluence.ecmwf.int/pages/viewpage.action?pageId=173385064
            # da = da.sel(expver=1).combine_first(da.sel(expver=5))
            da = da.drop_vars("expver")
        logging.info("Saving corrected CDS file to {}".format(download_path))
        xr_save_netcdf(da, download_path, complevel=self._compress)
        da.close()

        if type(temp_download_path) is not list:
            temp_download_path = [temp_download_path,]
        for tdp in temp_download_path:
            if os.path.exists(tdp):
                logging.info("Removing {}".format(tdp))
                os.unlink(tdp)

        return [download_path]

    def _single_download(self,
                         var_config: object,
                         req_dates: object) -> list:
        logging.warning("You're not going to get data by calling this! "
                        "Set download_method to an actual implementation.")

ERA5DatasetConfig

Bases: CDSDatasetConfig

ERA5DatasetConfig - replaced now by CDSDatasetConfig

Provided for backwards compatibility only

Source code in download_toolbox/data/cds.py
127
128
129
130
131
132
133
class ERA5DatasetConfig(CDSDatasetConfig):
    """
    ERA5DatasetConfig - replaced now by CDSDatasetConfig

    Provided for backwards compatibility only
    """
    pass

get_era5_available_date_range(dataset='reanalysis-era5-single-levels')

Returns the time range for which ERA5(T) data is available. Args: dataset: Dataset for which available time range should be returned. Returns: date_start: Earliest time data is available from. date_end: Latest time data available.

Source code in download_toolbox/data/cds.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def get_era5_available_date_range(dataset: str = "reanalysis-era5-single-levels"):
    """Returns the time range for which ERA5(T) data is available.
    Args:
        dataset: Dataset for which available time range should be returned.
    Returns:
        date_start: Earliest time data is available from.
        date_end: Latest time data available.
    """
    location = f"https://cds.climate.copernicus.eu/api/catalogue/v1/collections/{dataset}"
    res = requests.get(location)

    temporal_interval = res.json()["extent"]["temporal"]["interval"][0]
    time_start, time_end = temporal_interval

    date_start = pd.Timestamp(pd.to_datetime(time_start).date())
    date_end = pd.Timestamp(pd.to_datetime(time_end).date())
    return date_start, date_end