Skip to content

climate_ref_ilamb.standard #

ILAMBStandard #

Bases: Diagnostic

Apply the standard ILAMB analysis with respect to a given reference dataset.

Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
class ILAMBStandard(Diagnostic):
    """
    Apply the standard ILAMB analysis with respect to a given reference dataset.
    """

    def __init__(
        self,
        registry_file: str,
        metric_name: str,
        sources: dict[str, str],
        **ilamb_kwargs: Any,
    ):
        # Setup the diagnostic
        if len(sources) != 1:
            raise ValueError("Only single source ILAMB diagnostics have been implemented.")
        self.variable_id = next(iter(sources.keys()))
        if "sources" not in ilamb_kwargs:  # pragma: no cover
            ilamb_kwargs["sources"] = sources
        if "relationships" not in ilamb_kwargs:
            ilamb_kwargs["relationships"] = {}
        self.ilamb_kwargs = ilamb_kwargs

        # REF stuff
        self.name = metric_name
        self.slug = self.name.lower().replace(" ", "-")
        self.data_requirements = (
            DataRequirement(
                source_type=SourceDatasetType.CMIP6,
                filters=(
                    FacetFilter(
                        facets={
                            "variable_id": (
                                self.variable_id,
                                *ilamb_kwargs.get("alternate_vars", []),
                                *ilamb_kwargs.get("related_vars", []),
                                *ilamb_kwargs.get("relationships", {}).keys(),
                            ),
                            "frequency": "mon",
                            "experiment_id": ("historical", "land-hist"),
                            "table_id": (
                                "AERmonZ",
                                "Amon",
                                "CFmon",
                                "Emon",
                                "EmonZ",
                                "LImon",
                                "Lmon",
                                "Omon",
                                "SImon",
                            ),
                        }
                    ),
                ),
                constraints=(
                    RequireFacets(
                        "variable_id",
                        (
                            self.variable_id,
                            *ilamb_kwargs.get("alternate_vars", []),
                            *ilamb_kwargs.get("related_vars", []),
                        ),
                        operator="any",
                    ),
                    *(
                        [
                            RequireFacets(
                                "variable_id",
                                required_facets=tuple(ilamb_kwargs.get("relationships", {}).keys()),
                            )
                        ]
                        if "relationships" in ilamb_kwargs
                        else []
                    ),
                    *(
                        (
                            AddSupplementaryDataset.from_defaults("areacella", SourceDatasetType.CMIP6),
                            AddSupplementaryDataset.from_defaults("sftlf", SourceDatasetType.CMIP6),
                        )
                        if registry_file == "ilamb"
                        else (
                            AddSupplementaryDataset.from_defaults("volcello", SourceDatasetType.CMIP6),
                            AddSupplementaryDataset.from_defaults("areacello", SourceDatasetType.CMIP6),
                            AddSupplementaryDataset.from_defaults("sftof", SourceDatasetType.CMIP6),
                        )
                    ),
                ),
                group_by=("experiment_id", "source_id", "member_id", "grid_label"),
            ),
        )

        self.facets = (
            "experiment_id",
            "source_id",
            "member_id",
            "grid_label",
            "region",
            "metric",
            "statistic",
        )

        # Setup ILAMB data and options
        self.registry_file = registry_file
        self.registry = dataset_registry_manager[self.registry_file]
        self.ilamb_data = registry_to_collection(
            dataset_registry_manager[self.registry_file],
        )

    def execute(self, definition: ExecutionDefinition) -> None:
        """
        Run the ILAMB standard analysis.
        """
        _set_ilamb3_options(self.registry, self.registry_file)
        ref_datasets = self.ilamb_data.datasets.set_index(self.ilamb_data.slug_column)

        # Run ILAMB in a single-threaded mode to avoid issues with multithreading (#394)
        with dask.config.set(scheduler="synchronous"):
            run.run_single_block(
                self.slug,
                ref_datasets,
                definition.datasets[SourceDatasetType.CMIP6].datasets,
                definition.output_directory,
                **self.ilamb_kwargs,
            )

    def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
        """
        Build the diagnostic result after running ILAMB.

        Parameters
        ----------
        definition
            The definition of the diagnostic execution

        Returns
        -------
            An execution result object
        """
        _set_ilamb3_options(self.registry, self.registry_file)
        # In ILAMB, scalars are saved in CSV files in the output directory. To
        # be compatible with the REF system we will need to add the metadata
        # that is associated with the execution group, called the selector.
        df = _load_csv_and_merge(definition.output_directory)
        selectors = definition.datasets[SourceDatasetType.CMIP6].selector_dict()

        # TODO: Fix reference data once we are using the obs4MIPs dataset
        dataset_source = self.name.split("-")[1] if "-" in self.name else "None"
        common_dimensions = {**selectors, "reference_source_id": dataset_source}
        for key, value in common_dimensions.items():
            df[key] = value
        metric_bundle = CMECMetric.model_validate(_build_cmec_bundle(df))

        # Add each png file plot to the output
        output_bundle = CMECOutput.create_template()
        for plotfile in definition.output_directory.glob("*.png"):
            relative_path = str(definition.as_relative_path(plotfile))
            caption, figure_dimensions = _caption_from_filename(plotfile, common_dimensions)

            output_bundle[OutputCV.PLOTS.value][relative_path] = {
                OutputCV.FILENAME.value: relative_path,
                OutputCV.LONG_NAME.value: caption,
                OutputCV.DESCRIPTION.value: "",
                OutputCV.DIMENSIONS.value: figure_dimensions,
            }

        # Add the html page to the output
        index_html = definition.to_output_path("index.html")
        if index_html.exists():
            relative_path = str(definition.as_relative_path(index_html))
            output_bundle[OutputCV.HTML.value][relative_path] = {
                OutputCV.FILENAME.value: relative_path,
                OutputCV.LONG_NAME.value: "Results page",
                OutputCV.DESCRIPTION.value: "Page displaying scalars and plots from the ILAMB execution.",
                OutputCV.DIMENSIONS.value: common_dimensions,
            }
            output_bundle[OutputCV.INDEX.value] = relative_path

        # Add series to the output based on the time traces we find in the
        # output files
        series = []
        for ncfile in definition.output_directory.glob("*.nc"):
            ds = xr.open_dataset(ncfile, use_cftime=True)
            for name, da in ds.items():
                # Only create series for 1d DataArray's with these dimensions
                if not (da.ndim == 1 and set(da.dims).intersection(["time", "month"])):
                    continue
                # Convert dimension values
                attrs = {
                    "units": da.attrs.get("units", ""),
                    "long_name": da.attrs.get("long_name", str(name)),
                    "standard_name": da.attrs.get("standard_name", ""),
                }
                str_name = str(name)
                index_name = str(da.dims[0])
                index = ds[index_name].values.tolist()
                if hasattr(index[0], "isoformat"):
                    index = [v.isoformat() for v in index]
                if hasattr(index[0], "calendar"):
                    attrs["calendar"] = index[0].calendar

                # Parse out some dimensions
                if ncfile.stem == "Reference":
                    dimensions = {
                        "source_id": "Reference",
                        "metric": str_name,
                    }
                else:
                    dimensions = {"metric": str_name, **common_dimensions}

                # Split the metric into metric and region if possible
                if "_" in str_name:
                    dimensions["metric"] = str_name.split("_")[0]
                    dimensions["region"] = str_name.split("_")[1]
                else:
                    dimensions["region"] = "None"

                series.append(
                    SeriesMetricValue(
                        dimensions=dimensions,
                        values=da.values.tolist(),
                        index=index,
                        index_name=index_name,
                        attributes=attrs,
                    )
                )

        return ExecutionResult.build_from_output_bundle(
            definition, cmec_output_bundle=output_bundle, cmec_metric_bundle=metric_bundle, series=series
        )

build_execution_result(definition) #

Build the diagnostic result after running ILAMB.

Parameters:

Name Type Description Default
definition ExecutionDefinition

The definition of the diagnostic execution

required

Returns:

Type Description
An execution result object
Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
    """
    Build the diagnostic result after running ILAMB.

    Parameters
    ----------
    definition
        The definition of the diagnostic execution

    Returns
    -------
        An execution result object
    """
    _set_ilamb3_options(self.registry, self.registry_file)
    # In ILAMB, scalars are saved in CSV files in the output directory. To
    # be compatible with the REF system we will need to add the metadata
    # that is associated with the execution group, called the selector.
    df = _load_csv_and_merge(definition.output_directory)
    selectors = definition.datasets[SourceDatasetType.CMIP6].selector_dict()

    # TODO: Fix reference data once we are using the obs4MIPs dataset
    dataset_source = self.name.split("-")[1] if "-" in self.name else "None"
    common_dimensions = {**selectors, "reference_source_id": dataset_source}
    for key, value in common_dimensions.items():
        df[key] = value
    metric_bundle = CMECMetric.model_validate(_build_cmec_bundle(df))

    # Add each png file plot to the output
    output_bundle = CMECOutput.create_template()
    for plotfile in definition.output_directory.glob("*.png"):
        relative_path = str(definition.as_relative_path(plotfile))
        caption, figure_dimensions = _caption_from_filename(plotfile, common_dimensions)

        output_bundle[OutputCV.PLOTS.value][relative_path] = {
            OutputCV.FILENAME.value: relative_path,
            OutputCV.LONG_NAME.value: caption,
            OutputCV.DESCRIPTION.value: "",
            OutputCV.DIMENSIONS.value: figure_dimensions,
        }

    # Add the html page to the output
    index_html = definition.to_output_path("index.html")
    if index_html.exists():
        relative_path = str(definition.as_relative_path(index_html))
        output_bundle[OutputCV.HTML.value][relative_path] = {
            OutputCV.FILENAME.value: relative_path,
            OutputCV.LONG_NAME.value: "Results page",
            OutputCV.DESCRIPTION.value: "Page displaying scalars and plots from the ILAMB execution.",
            OutputCV.DIMENSIONS.value: common_dimensions,
        }
        output_bundle[OutputCV.INDEX.value] = relative_path

    # Add series to the output based on the time traces we find in the
    # output files
    series = []
    for ncfile in definition.output_directory.glob("*.nc"):
        ds = xr.open_dataset(ncfile, use_cftime=True)
        for name, da in ds.items():
            # Only create series for 1d DataArray's with these dimensions
            if not (da.ndim == 1 and set(da.dims).intersection(["time", "month"])):
                continue
            # Convert dimension values
            attrs = {
                "units": da.attrs.get("units", ""),
                "long_name": da.attrs.get("long_name", str(name)),
                "standard_name": da.attrs.get("standard_name", ""),
            }
            str_name = str(name)
            index_name = str(da.dims[0])
            index = ds[index_name].values.tolist()
            if hasattr(index[0], "isoformat"):
                index = [v.isoformat() for v in index]
            if hasattr(index[0], "calendar"):
                attrs["calendar"] = index[0].calendar

            # Parse out some dimensions
            if ncfile.stem == "Reference":
                dimensions = {
                    "source_id": "Reference",
                    "metric": str_name,
                }
            else:
                dimensions = {"metric": str_name, **common_dimensions}

            # Split the metric into metric and region if possible
            if "_" in str_name:
                dimensions["metric"] = str_name.split("_")[0]
                dimensions["region"] = str_name.split("_")[1]
            else:
                dimensions["region"] = "None"

            series.append(
                SeriesMetricValue(
                    dimensions=dimensions,
                    values=da.values.tolist(),
                    index=index,
                    index_name=index_name,
                    attributes=attrs,
                )
            )

    return ExecutionResult.build_from_output_bundle(
        definition, cmec_output_bundle=output_bundle, cmec_metric_bundle=metric_bundle, series=series
    )

execute(definition) #

Run the ILAMB standard analysis.

Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
def execute(self, definition: ExecutionDefinition) -> None:
    """
    Run the ILAMB standard analysis.
    """
    _set_ilamb3_options(self.registry, self.registry_file)
    ref_datasets = self.ilamb_data.datasets.set_index(self.ilamb_data.slug_column)

    # Run ILAMB in a single-threaded mode to avoid issues with multithreading (#394)
    with dask.config.set(scheduler="synchronous"):
        run.run_single_block(
            self.slug,
            ref_datasets,
            definition.datasets[SourceDatasetType.CMIP6].datasets,
            definition.output_directory,
            **self.ilamb_kwargs,
        )

format_cmec_output_bundle(dataset, dimensions, metadata_columns, value_column='value') #

Create a CMEC output bundle for the dataset.

Parameters:

Name Type Description Default
dataset DataFrame

Processed dataset

required
dimensions list[str]

The dimensions of the dataset (e.g., ["source_id", "member_id", "region"])

required
metadata_columns list[str]

The columns to be used as metadata (e.g., ["Description", "LongName"])

required
value_column str

The column containing the values

'value'

Returns:

Type Description
A CMEC output bundle ready to be written to disk
Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
def format_cmec_output_bundle(
    dataset: pd.DataFrame,
    dimensions: list[str],
    metadata_columns: list[str],
    value_column: str = "value",
) -> dict[str, Any]:
    """
    Create a CMEC output bundle for the dataset.

    Parameters
    ----------
    dataset
        Processed dataset
    dimensions
        The dimensions of the dataset (e.g., ["source_id", "member_id", "region"])
    metadata_columns
        The columns to be used as metadata (e.g., ["Description", "LongName"])
    value_column
        The column containing the values

    Returns
    -------
        A CMEC output bundle ready to be written to disk
    """
    # Validate that all required columns exist
    required_columns = set(dimensions) | {value_column} | set(metadata_columns)
    missing_columns = required_columns - set(dataset.columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")

    # Build the dimensions section
    dimensions_dict: dict[str, dict[str, dict[str, str]]] = {}

    # For each dimension, create a dictionary of unique values and their metadata
    for dim in dimensions:
        unique_values = dataset[dim].unique()
        dim_dict: dict[str, dict[str, str]] = {}

        for val in unique_values:
            # Get the row for this dimension value

            dim_dict[str(val)] = {}

            if dim == dimensions[-1]:
                # If this is the last dimension, add the value column to the metadata

                dim_dict[str(val)] = dataset[dataset[dim] == val].iloc[0][metadata_columns].to_dict()

        dimensions_dict[dim] = dim_dict

    # Build the results section - create nested structure based on dimensions
    def nest_results(df: pd.DataFrame, dims: list[str]) -> dict[str, Any] | float:
        if not dims:
            return float(df[value_column].iloc[0].item())

        current_dim = dims[0]
        remaining_dims = dims[1:]

        return {
            str(group_name): nest_results(group_df, remaining_dims)
            for group_name, group_df in df.groupby(current_dim)
        }

    results = nest_results(dataset, list(dimensions))

    return {"DIMENSIONS": {"json_structure": list(dimensions), **dimensions_dict}, "RESULTS": results}