WeatherBench-X Quickstart

Open In Colab

This notebook goes through the basic components of WeatherBench-X.

Imports

# Note that pip might complain about some versions but the notebook should still work as expected.
!pip install git+https://github.com/google-research/weatherbenchX.git
import apache_beam as beam
import numpy as np
import xarray as xr
import weatherbenchX
from weatherbenchX.data_loaders import xarray_loaders
from weatherbenchX.metrics import deterministic
from weatherbenchX.metrics import base as metrics_base
from weatherbenchX import aggregation
from weatherbenchX import weighting
from weatherbenchX import binning
from weatherbenchX import time_chunks
from weatherbenchX import beam_pipeline

IMPORTANT: If you are running this on Colab, uncomment the cell below to access the cloud datasets.

# from google.colab import auth
# auth.authenticate_user()

Data Loaders

First, we define the data loaders for the data we would like to use. Data loaders can be implemented to read from any source. The only requirement is that they return data as an Xarray Dataset (or a dictionary of DataArrays).

It is the data loaders’ job to make sure that the returned target and prediction datasets are aligned, i.e. have the same variable names and coordinates that can be broadcast against each other. If this is not the case (e.g. for sparse observations), interpolators can be used to align the data (see How To).

In this example, we will evaluate gridded HRES predictions against ERA5 targets. We will load the public Zarr datasets on the WeatherBench cloud bucket.

prediction_path = 'gs://weatherbench2/datasets/hres/2016-2022-0012-64x32_equiangular_conservative.zarr'
target_path = 'gs://weatherbench2/datasets/era5/1959-2022-6h-64x32_equiangular_conservative.zarr'
variables = ['2m_temperature', 'geopotential']
target_data_loader = xarray_loaders.TargetsFromXarray(
    path=target_path,
    variables=variables,
)
prediction_data_loader = xarray_loaders.PredictionsFromXarray(
    path=prediction_path,
    variables=variables,
)

Now we define the initialization and lead times we would like to load data for. In the beam pipeline, this would be the job of the TimeChunks instance. For now, let’s load two init and three lead times, defined as numpy datetime64/timedelta64 objects.

init_times = np.array(['2020-01-01T00', '2020-01-01T12'], dtype='datetime64[ns]')
lead_times = np.array([6, 12, 18], dtype='timedelta64[h]').astype('timedelta64[ns]')   # To silence xr warnings.
target_chunk = target_data_loader.load_chunk(init_times, lead_times)
prediction_chunk = prediction_data_loader.load_chunk(init_times, lead_times)
target_chunk
<xarray.Dataset> Size: 689kB
Dimensions:         (latitude: 32, longitude: 64, init_time: 2, lead_time: 3,
                     level: 13)
Coordinates:
  * latitude        (latitude) float64 256B -87.19 -81.56 -75.94 ... 81.56 87.19
  * longitude       (longitude) float64 512B 0.0 5.625 11.25 ... 348.8 354.4
    valid_time      (init_time, lead_time) datetime64[ns] 48B 2020-01-01T06:0...
  * init_time       (init_time) datetime64[ns] 16B 2020-01-01 2020-01-01T12:0...
  * lead_time       (lead_time) timedelta64[ns] 24B 06:00:00 12:00:00 18:00:00
  * level           (level) int64 104B 50 100 150 200 250 ... 700 850 925 1000
Data variables:
    2m_temperature  (init_time, lead_time, longitude, latitude) float32 49kB ...
    geopotential    (init_time, lead_time, level, longitude, latitude) float32 639kB ...
Attributes:
    long_name:   2 metre temperature
    short_name:  t2m
    units:       K
prediction_chunk
<xarray.Dataset> Size: 689kB
Dimensions:         (latitude: 32, longitude: 64, lead_time: 3, init_time: 2,
                     level: 13)
Coordinates:
  * latitude        (latitude) float64 256B -87.19 -81.56 -75.94 ... 81.56 87.19
  * longitude       (longitude) float64 512B 0.0 5.625 11.25 ... 348.8 354.4
  * lead_time       (lead_time) timedelta64[ns] 24B 06:00:00 12:00:00 18:00:00
  * init_time       (init_time) datetime64[ns] 16B 2020-01-01 2020-01-01T12:0...
  * level           (level) int32 52B 50 100 150 200 250 ... 700 850 925 1000
Data variables:
    2m_temperature  (init_time, lead_time, longitude, latitude) float32 49kB ...
    geopotential    (init_time, lead_time, level, longitude, latitude) float32 639kB ...
Attributes:
    long_name:      2 metre temperature
    short_name:     t2m
    standard_name:  unknown
    units:          K

Here we can see that the data loader took care of aligning the datasets, i.e. the target data (ERA5) has already been assigned an init and lead time coordinate.

Metrics

Next, we define the metrics to compute.

metrics = {
  'rmse': deterministic.RMSE(),
  'mae': deterministic.MAE(),
}

Computing metrics happens in two steps. First, each metric defines one or several statistics that are required for computing the metric. A statistic is defined for each element of the prediction and target arrays, so e.g. for every init time, lead time, latitude and longitude.

In the case of RMSE, the statistic would be the Squared Error.

metrics['rmse'].statistics
{'SquaredError': <weatherbenchX.metrics.deterministic.SquaredError at 0x15fe93890>}

The helper function below computes all the statistics for a dictionary of metrics. If several metrics use the same underlying statistic (e.g. RMSE and MSE), the statistic is only computed once. This requires all statistics to have unique names, which they define themselves (more on that later).

statistics = metrics_base.compute_unique_statistics_for_all_metrics(
  metrics, prediction_chunk, target_chunk
)
statistics.keys()
dict_keys(['SquaredError', 'AbsoluteError'])

Aggregation

Then we average the statistics over the desired dimensions. In this simple case, we could just call stat.mean(dims). However, eventually the aggegation will have to happen over many chunks in the beam pipeline.

To allow for multi-step aggregation, we first define an aggregator to reduce over a set of dimensions reduce_dims.

aggregator = aggregation.Aggregator(
  reduce_dims=['init_time', 'latitude', 'longitude'],
)
aggregation_state = aggregator.aggregate_statistics(statistics)

The aggregator then aggregates the statistics and produces an aggregation state. An aggregation state contains the sum of the aggregated statistics and the sum of the aggregated weights (without any additional weighting, this will just be 1 for each element in the original statistic arrays). These two can later be summed over many beam chunks.

To get the final averaged statistics, we then divide the aggregated statistics over the aggregated weights. We can simple use the .mean_statistics() method for this.

aggregation_state.mean_statistics()
{'AbsoluteError': <xarray.Dataset> Size: 244B
 Dimensions:         (lead_time: 3, level: 13)
 Coordinates:
   * lead_time       (lead_time) timedelta64[ns] 24B 06:00:00 12:00:00 18:00:00
   * level           (level) int32 52B 50 100 150 200 250 ... 700 850 925 1000
 Data variables:
     2m_temperature  (lead_time) float32 12B 0.4815 0.5126 0.5184
     geopotential    (lead_time, level) float32 156B 63.8 29.22 ... 27.43 29.05,
 'SquaredError': <xarray.Dataset> Size: 244B
 Dimensions:         (lead_time: 3, level: 13)
 Coordinates:
   * lead_time       (lead_time) timedelta64[ns] 24B 06:00:00 12:00:00 18:00:00
   * level           (level) int32 52B 50 100 150 200 250 ... 700 850 925 1000
 Data variables:
     2m_temperature  (lead_time) float32 12B 0.6213 0.7352 0.7081
     geopotential    (lead_time, level) float32 156B 5.699e+03 ... 1.586e+03}

The final step in computing the metrics is to now call the .value_from_mean_statistics() method for each metric, that takes the averaged statistics and converts it to the final metric. In the case of the RMSE, this would be taking the square root of the averaged squared error.

The aggregation state has a handy shortcut for this that also packs up all metrics into a single Dataset with naming convention: <metric>.<variable>

aggregation_state.metric_values(metrics)
<xarray.Dataset> Size: 412B
Dimensions:              (lead_time: 3, level: 13)
Coordinates:
  * lead_time            (lead_time) timedelta64[ns] 24B 06:00:00 ... 18:00:00
  * level                (level) int32 52B 50 100 150 200 ... 700 850 925 1000
Data variables:
    rmse.geopotential    (lead_time, level) float32 156B 75.49 37.59 ... 39.83
    rmse.2m_temperature  (lead_time) float32 12B 0.7882 0.8575 0.8415
    mae.geopotential     (lead_time, level) float32 156B 63.8 29.22 ... 29.05
    mae.2m_temperature   (lead_time) float32 12B 0.4815 0.5126 0.5184

This may seem like a lot of separate steps to get to the final result. This is necessary because, in many use cases, the computation will be parallelized over many chunks. There is a shortcut function for a single chunks that includes the steps above:

aggregation.compute_metric_values_for_single_chunk(
    metrics,
    aggregator,
    prediction_chunk,
    target_chunk
)
<xarray.Dataset> Size: 412B
Dimensions:              (lead_time: 3, level: 13)
Coordinates:
  * lead_time            (lead_time) timedelta64[ns] 24B 06:00:00 ... 18:00:00
  * level                (level) int32 52B 50 100 150 200 ... 700 850 925 1000
Data variables:
    rmse.geopotential    (lead_time, level) float32 156B 75.49 37.59 ... 39.83
    rmse.2m_temperature  (lead_time) float32 12B 0.7882 0.8575 0.8415
    mae.geopotential     (lead_time, level) float32 156B 63.8 29.22 ... 29.05
    mae.2m_temperature   (lead_time) float32 12B 0.4815 0.5126 0.5184

Weighting and Binning

This is already it for the simplest example. However, in many cases, we might want more fine-grained aggregation.

One common case is weighting each element differently in the aggregation. For lat-lon datasets, for example, it is common to weigh each grid point by area. This can be done using a GridAreaWeighting object.

weigh_by = [weighting.GridAreaWeighting()]

Another common case is futher subdividing the aggregation, e.g. computing metrics for several regions. This is done using binning instances.

Important: Make sure the longitude conventions (-180 to 180 or 0 to 360) match between the data and the regions.

regions = {
    # ((lat_min, lat_max), (lon_min, lon_max))
    'global': ((-90, 90), (0, 360)),
    'na': ((24.08, 50), (360 - 126, 360 - 65)),
    'europe': ((35, 71), (360 - 10, 36)),
}
bin_by = [binning.Regions(regions)]
aggregator = aggregation.Aggregator(
  reduce_dims=['init_time', 'latitude', 'longitude'],
  bin_by=bin_by,
  weigh_by=weigh_by,
)
aggregation.compute_metric_values_for_single_chunk(
    metrics,
    aggregator,
    prediction_chunk,
    target_chunk
)
<xarray.Dataset> Size: 2kB
Dimensions:              (region: 3, lead_time: 3, level: 13)
Coordinates:
  * region               (region) <U6 72B 'global' 'na' 'europe'
  * lead_time            (lead_time) timedelta64[ns] 24B 06:00:00 ... 18:00:00
  * level                (level) int32 52B 50 100 150 200 ... 700 850 925 1000
Data variables:
    rmse.geopotential    (region, lead_time, level) float64 936B 79.77 ... 31.93
    rmse.2m_temperature  (region, lead_time) float64 72B 0.6428 ... 0.6352
    mae.geopotential     (region, lead_time, level) float64 936B 68.75 ... 24.72
    mae.2m_temperature   (region, lead_time) float64 72B 0.3706 0.397 ... 0.4744

The results will now have an additional dimension for the region bins.

Beam pipeline

Now let’s put this same example into a beam pipeline that could be scaled to much larger datasets.

The first step in defining a beam pipeline is to define the time chunks. The beam computation will be split into chunks according to init/lead time chunks. Currently, only chunking over the two time dimensions is supported (i.e. not over other coordinates like latitude or longitude).

To define these, there is a TimeChunks class that handles the chunking.

Let’s define a range of 4 init and 3 lead times.

init_times = np.arange('2020-01-01T00', '2020-01-03T00', np.timedelta64(12, 'h'), dtype='datetime64[ns]')
lead_times = np.arange(0, 18, 6, dtype='timedelta64[h]').astype('timedelta64[ns]')
init_times, lead_times
(array(['2020-01-01T00:00:00.000000000', '2020-01-01T12:00:00.000000000',
        '2020-01-02T00:00:00.000000000', '2020-01-02T12:00:00.000000000'],
       dtype='datetime64[ns]'),
 array([             0, 21600000000000, 43200000000000],
       dtype='timedelta64[ns]'))

Now we need to tell the time chunker what chunk sizes to use in init/lead time.

The time chunker is an iterator that returns the appropriate init/lead time chunks for the chosen chunk sizes.

times = time_chunks.TimeChunks(
  init_times,
  lead_times,
  init_time_chunk_size=2,
  lead_time_chunk_size=1
)
next(iter(times))
(array(['2020-01-01T00:00:00.000000000', '2020-01-01T12:00:00.000000000'],
       dtype='datetime64[ns]'),
 array([0], dtype='timedelta64[ns]'))

Finally we can pass all these arguments to define_pipeline(). This will set up the beam pipline. The metric results will be saved as a NetCDF file.

root = beam.Pipeline(runner='DirectRunner')
beam_pipeline.define_pipeline(
    root=root,
    times=times,
    predictions_loader=prediction_data_loader,
    targets_loader=target_data_loader,
    metrics=metrics,
    aggregator=aggregator,
    out_path='./out.nc',
)
root.run()
xr.open_dataset('./out.nc').compute()
<xarray.Dataset> Size: 2kB
Dimensions:              (level: 13, region: 3, lead_time: 3)
Coordinates:
  * level                (level) int32 52B 50 100 150 200 ... 700 850 925 1000
  * region               (region) object 24B 'global' 'na' 'europe'
  * lead_time            (lead_time) timedelta64[ns] 24B 00:00:00 ... 12:00:00
Data variables:
    rmse.geopotential    (region, lead_time, level) float64 936B 82.88 ... 29.48
    mae.geopotential     (region, lead_time, level) float64 936B 74.44 ... 22.35
    rmse.2m_temperature  (region, lead_time) float64 72B 0.6832 ... 0.6497
    mae.2m_temperature   (region, lead_time) float64 72B 0.3887 ... 0.4813

Voila! To see an example of a full pipeline, see run_example_evaluation.py

This was it for the simple example. For more advanced topics see the HOW TO guides.