136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408 | class CDSDownloader(ThreadedDownloader):
def __init__(self,
dataset: CDSDatasetConfig,
*args,
show_progress: bool = False,
start_date: object,
dataset_name: Union[str, None] = None,
product_type: Union[list, None] = None,
time: Union[list, None] = None,
daily_statistic: str = "daily_mean",
time_zone: str = "utc+00:00",
derived_frequency: str = "1_hourly",
compress: Union[int, None] = None,
request_args: [dict, None] = None,
zipped: bool = False,
**kwargs):
self._client = cds.Client(progress=show_progress)
self._dataset_name = dataset_name
self._product_type = product_type
self._time = time
# Variables for derived daily statistics
self._daily_statistic = daily_statistic
self._time_zone = time_zone
self._derived_frequency = derived_frequency
self._compress = compress
self._request_args = request_args
self._zipped = zipped
super().__init__(dataset,
*args,
source_min_frequency=Frequency.YEAR,
# TODO: validate handling of hourly data, but it is
# possible as a temporal resolution
source_max_frequency=Frequency.HOUR,
start_date=start_date,
**kwargs)
self.download_method = self._single_api_download
if self.max_threads > 10:
logging.info("Upping connection limit for max_threads > 10")
adapter = requests.adapters.HTTPAdapter(
pool_connections=self.max_threads,
pool_maxsize=self.max_threads
)
self._client.session.mount("https://", adapter)
def _single_api_download(self,
var_config: object,
req_dates: object) -> list:
"""Implements a single download from CDS API
Args:
var_config:
req_dates: The requested dates
"""
logging.debug("Processing {} dates for {}".format(len(req_dates), var_config))
monthly_request = self.dataset.frequency < Frequency.DAY
retrieve_ext = "zip" if self._zipped else "nc"
temp_download_path = os.path.join(var_config.root_path,
self.dataset.location.name,
"temp.{}".format(os.path.basename(
self.dataset.var_filepath(var_config, req_dates,
file_extension=retrieve_ext))))
download_path = os.path.join(var_config.root_path,
self.dataset.location.name,
os.path.basename(self.dataset.var_filepath(var_config, req_dates)))
os.makedirs(os.path.dirname(download_path), exist_ok=True)
if os.path.exists(download_path):
logging.warning(f"We have a downloaded file available, skipping: {download_path}")
return [download_path]
# Default to legacy values if not provided
if not self._product_type:
product_type = ["reanalysis",] if not monthly_request else ["monthly_averaged_reanalysis_by_hour_of_day",]
else:
product_type = self._product_type
retrieve_dict = {
"product_type": product_type,
"variable": self.dataset.cdi_map[var_config.prefix],
"year": [int(req_dates[0].year),],
"month": list(set(["{:02d}".format(rd.month)
for rd in sorted(req_dates)])),
}
if not self._zipped:
retrieve_dict.update({
"format": "netcdf",
"grid": [0.25, 0.25],
"area": self.dataset.location.bounds,
"download_format": "unarchived"
})
# Add derived dataset-specific keys
stats_dataset = False
if self._dataset_name in [
"derived-era5-pressure-levels-daily-statistics",
"derived-era5-single-levels-daily-statistics"
]:
stats_dataset = True
retrieve_dict.update({
"daily_statistic": self._daily_statistic,
"time_zone": self._time_zone,
"frequency": self._derived_frequency
})
level_id = "single-levels"
if var_config.level:
level_id = "pressure-levels"
retrieve_dict["pressure_level"] = [var_config.level]
# Default to legacy values if not provided
if not self._product_type:
dataset = "reanalysis-era5-{}{}".format(level_id, "-monthly-means" if monthly_request else "")
else:
# TODO: this is a bit of a hack, but it works for now
# Updating dataset name if multiple pressure levels are requested
if var_config.level and "single-levels" in self._dataset_name:
dataset = self._dataset_name.replace("single-levels", "pressure-levels")
else:
dataset = self._dataset_name
# FIXME: this is quite shaky, not using at present
# # _, date_end = get_era5_available_date_range(dataset)
# # TODO: This updates to dates available for download, prevents
# # redundant downloads but, requires work to prevent
# # postprocess method from running if no downloaded file.
# # req_dates = [date for date in req_dates if date <= date_end]
# END
if not monthly_request:
retrieve_dict["day"] = ["{:02d}".format(d) for d in range(1, 32)]
# No time key required for daily stats dataset, instead uses `time_zone`
if not stats_dataset:
if self._time and isinstance(self._time, list):
if self._time[0] == "all":
time = ["{:02d}:00".format(h) for h in range(0, 24)]
else:
time = self._time
else:
time = ["12:00",]
retrieve_dict["time"] = time
if self._request_args is not None:
logging.debug("Updating request with dictionary {}".format(self._request_args))
retrieve_dict.update(self._request_args)
if os.path.exists(temp_download_path):
# TODO: we need a better mechanism for keeping temp files and reprocessing
if self._zipped:
logging.warning("{} already exists, all we can do is assume to try and reprocess".format(temp_download_path))
else:
raise DownloaderError("{} already exists, this shouldn't be the case".format(temp_download_path))
else:
try:
logging.info("Downloading data for {}...".format(var_config.name))
logging.debug("Request dataset {} with:\n".format(pformat(retrieve_dict)))
self._client.retrieve(
dataset,
retrieve_dict,
temp_download_path)
logging.info("Download completed: {}".format(temp_download_path))
# cdsapi uses raise Exception in many places, so having a catch-all is appropriate
except Exception as e:
logging.exception("{} not downloaded, look at the problem".format(temp_download_path))
self.missing_dates.extend(req_dates)
return []
if self._zipped:
zf = zipfile.ZipFile(temp_download_path)
zip_output_path = os.path.join(var_config.root_path,
self.dataset.location.name)
zipped_data_files = [df_name for df_name in zf.namelist()
if df_name.endswith(".nc")
and not os.path.exists(os.path.join(zip_output_path, df_name))]
zf.extractall(path=zip_output_path,
members=zipped_data_files)
# For the moment we'll keep the zips
temp_download_path = [os.path.join(zip_output_path, df_name)
for df_name in zf.namelist()
if df_name.endswith(".nc")]
ds = xr.open_mfdataset(temp_download_path)
else:
ds = xr.open_dataset(temp_download_path)
# TODO: there is duplicated / messy code here from CDS API alterations, clean it up
# New CDSAPI file holds more data_vars than just variable.
# Omit them when figuring out default CDS variable name.
omit_vars = {"number", "expver", "time", "date", "valid_time", "latitude", "longitude", "time_counter_bnds"}
data_vars = set(ds.data_vars)
var_list = list(data_vars.difference(omit_vars))
if not var_list:
raise ValueError("No variables found in file")
elif len(var_list) > 1:
raise ValueError(f"""Multiple variables found in data file!
There should only be one variable.
{var_list}""")
src_var_name = var_list[0]
var_name = var_config.name
# Rename time and variable names for consistency
rename_vars = {
src_var_name: var_name,
}
if "date" in ds:
rename_vars.update({"date": "time"})
elif "valid_time" in ds:
rename_vars.update({"valid_time": "time"})
elif "time_counter" in ds:
rename_vars.update({"time_counter": "time"})
if all([f in ds for f in ["nav_lat", "nav_lon"]]):
logging.warning("We have nav_lat and nav_lon which suggests a tripolar grid"
" which we cannot convert within the downloader yet")
northing, westing, southing, easting = self.dataset.location.bounds
# FIXME: naively grab the output based on the latitude
ds = ds.where(((ds.nav_lat <= northing) & (ds.nav_lat >= southing)).compute(), drop=True)
da = getattr(ds.rename(rename_vars), var_name)
# This data downloader handles different pressure_levels in independent
# files rather than storing them all in separate dimension of one array/file.
if "pressure_level" in da.dims:
da = da.squeeze(dim="pressure_level").drop_vars("pressure_level")
if "number" in da.coords:
da = da.drop_vars("number")
# Updating coord attribute definitions (needs file read in with `decode_cf=False`)
if "coordinates" in da.attrs:
omit_attrs = ["number", "expver", "isobaricInhPa"]
attributes = re.sub(r"valid_time|date", "time", da.attrs["coordinates"]).split()
attributes = [attr for attr in attributes if attr not in omit_attrs]
da.attrs["coordinates"] = " ".join(attributes)
# Bryn Note:
# expver = 1: ERA5
# expver = 5: ERA5T
# The latest 3 months of data is ERA5T and may be subject to changes.
# Data prior to this is from ERA5.
# The new CDSAPI returns combined data when `reanalysis` is requested.
if 'expver' in da.coords:
logging.warning("expver in coordinates, new cdsapi returns ERA5 and "
"ERA5T combined, this needs further work: expver needs "
"storing for later overwriting")
# Ref: https://confluence.ecmwf.int/pages/viewpage.action?pageId=173385064
# da = da.sel(expver=1).combine_first(da.sel(expver=5))
da = da.drop_vars("expver")
logging.info("Saving corrected CDS file to {}".format(download_path))
xr_save_netcdf(da, download_path, complevel=self._compress)
da.close()
if type(temp_download_path) is not list:
temp_download_path = [temp_download_path,]
for tdp in temp_download_path:
if os.path.exists(tdp):
logging.info("Removing {}".format(tdp))
os.unlink(tdp)
return [download_path]
def _single_download(self,
var_config: object,
req_dates: object) -> list:
logging.warning("You're not going to get data by calling this! "
"Set download_method to an actual implementation.")
|