Skip to content

climate_ref_pmp.diagnostics #

PMP diagnostics.

AnnualCycle #

Bases: CommandLineDiagnostic

Calculate the annual cycle for a dataset

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/annual_cycle.py
class AnnualCycle(CommandLineDiagnostic):
    """
    Calculate the annual cycle for a dataset
    """

    name = "Annual Cycle"
    slug = "annual-cycle"
    facets = (
        "source_id",
        "member_id",
        "experiment_id",
        "variable_id",
        "reference_source_id",
        "region",
        "statistic",
        "season",
    )

    data_requirements = (
        # ERA-5 as reference dataset, spatial 2-D variables
        make_data_requirement("ts", "ERA-5"),
        make_data_requirement("uas", "ERA-5"),
        make_data_requirement("vas", "ERA-5"),
        make_data_requirement("psl", "ERA-5"),
        # ERA-5 as reference dataset, spatial 3-D variables
        make_data_requirement("ta", "ERA-5"),
        make_data_requirement("ua", "ERA-5"),
        make_data_requirement("va", "ERA-5"),
        make_data_requirement("zg", "ERA-5"),
        # Other reference datasets, spatial 2-D variables
        make_data_requirement("pr", "GPCP-Monthly-3-2"),
        make_data_requirement("rlds", "CERES-EBAF-4-2"),
        make_data_requirement("rlus", "CERES-EBAF-4-2"),
        make_data_requirement("rlut", "CERES-EBAF-4-2"),
        make_data_requirement("rsds", "CERES-EBAF-4-2"),
        make_data_requirement("rsdt", "CERES-EBAF-4-2"),
        make_data_requirement("rsus", "CERES-EBAF-4-2"),
        make_data_requirement("rsut", "CERES-EBAF-4-2"),
    )

    def __init__(self) -> None:
        self.parameter_file_1 = "pmp_param_annualcycle_1-clims.py"
        self.parameter_file_2 = "pmp_param_annualcycle_2-metrics.py"

    def build_cmds(self, definition: ExecutionDefinition) -> list[list[str]]:
        """
        Build the command to run the diagnostic

        Parameters
        ----------
        definition
            Definition of the diagnostic execution

        Returns
        -------
            Command arguments to execute in the PMP environment
        """
        input_datasets = definition.datasets[SourceDatasetType.CMIP6]
        reference_datasets = definition.datasets[SourceDatasetType.PMPClimatology]

        source_id = input_datasets["source_id"].unique()[0]
        experiment_id = input_datasets["experiment_id"].unique()[0]
        member_id = input_datasets["member_id"].unique()[0]
        variable_id = input_datasets["variable_id"].unique()[0]

        model_files_raw = input_datasets.path.to_list()
        if len(model_files_raw) == 1:
            model_files = model_files_raw[0]  # If only one file, use it directly
        elif len(model_files_raw) > 1:
            model_files = build_glob_pattern(model_files_raw)  # If multiple files, build a glob pattern
        else:
            raise ValueError("No model files found")

        logger.debug("build_cmd start")

        logger.debug(f"input_datasets: {input_datasets}")
        logger.debug(f"input_datasets.keys(): {input_datasets.keys()}")

        reference_dataset_name = reference_datasets["source_id"].unique()[0]
        reference_dataset_path = reference_datasets.datasets.iloc[0]["path"]

        logger.debug(f"reference_dataset.datasets: {reference_datasets.datasets}")
        logger.debug(f"reference_dataset_name: {reference_dataset_name}")
        logger.debug(f"reference_dataset_path: {reference_dataset_path}")

        output_directory_path = str(definition.output_directory)

        cmds = []

        # ----------------------------------------------
        # PART 1: Build the command to get climatologies
        # ----------------------------------------------
        # Model
        data_name = f"{source_id}_{experiment_id}_{member_id}"
        data_path = model_files
        params = {
            "vars": variable_id,
            "infile": data_path,
            "outfile": f"{output_directory_path}/{variable_id}_{data_name}_clims.nc",
        }

        cmds.append(
            build_pmp_command(
                driver_file="pcmdi_compute_climatologies.py",
                parameter_file=self.parameter_file_1,
                **params,
            )
        )

        # --------------------------------------------------
        # PART 2: Build the command to calculate diagnostics
        # --------------------------------------------------
        # Reference
        obs_dict = {
            variable_id: {
                reference_dataset_name: {
                    "template": reference_dataset_path,
                },
                "default": reference_dataset_name,
            }
        }

        # Generate a JSON file based on the obs_dict
        with open(f"{output_directory_path}/obs_dict.json", "w") as f:
            json.dump(obs_dict, f)

        date = datetime.datetime.now().strftime("%Y%m%d")

        if variable_id in ["ua", "va", "ta"]:
            levels = ["200", "850"]
        elif variable_id in ["zg"]:
            levels = ["500"]
        else:
            levels = None

        variables = []
        if levels is not None:
            for level in levels:
                variable_id_with_level = f"{variable_id}-{level}"
                variables.append(variable_id_with_level)
        else:
            variables = [variable_id]

        logger.debug(f"variables: {variables}")
        logger.debug(f"levels: {levels}")

        # Build the command for each level
        params = {
            "vars": variables,
            "custom_observations": f"{output_directory_path}/obs_dict.json",
            "test_data_path": output_directory_path,
            "test_data_set": source_id,
            "realization": member_id,
            "filename_template": f"%(variable)_{data_name}_clims.198101-200512.AC.v{date}.nc",
            "metrics_output_path": output_directory_path,
            "cmec": "",
        }

        cmds.append(
            build_pmp_command(
                driver_file="mean_climate_driver.py",
                parameter_file=self.parameter_file_2,
                **params,
            )
        )

        logger.debug("build_cmd end")
        logger.debug(f"cmds: {cmds}")

        return cmds

    def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
        """
        Build a diagnostic result from the output of the PMP driver

        Parameters
        ----------
        definition
            Definition of the diagnostic execution

        Returns
        -------
            Result of the diagnostic execution
        """
        input_datasets = definition.datasets[SourceDatasetType.CMIP6]
        variable_id = input_datasets["variable_id"].unique()[0]

        if variable_id in ["ua", "va", "ta"]:
            variable_dir_pattern = f"{variable_id}-???"
        else:
            variable_dir_pattern = variable_id

        results_directory = definition.output_directory
        png_directory = results_directory / variable_dir_pattern
        data_directory = results_directory / variable_dir_pattern

        logger.debug(f"results_directory: {results_directory}")
        logger.debug(f"png_directory: {png_directory}")
        logger.debug(f"data_directory: {data_directory}")

        # Find the CMEC JSON file(s)
        results_files = transform_results_files(list(results_directory.glob("*_cmec.json")))

        if len(results_files) == 1:
            # If only one file, use it directly
            results_file = results_files[0]
            logger.debug(f"results_file: {results_file}")
        elif len(results_files) > 1:
            logger.info(f"More than one cmec file found: {results_files}")
            results_file = combine_results_files(results_files, definition.output_directory)
        else:
            logger.error("Unexpected case: no cmec file found")
            return ExecutionResult.build_from_failure(definition)

        # Find the other outputs: PNG and NetCDF files
        png_files = list(png_directory.glob("*.png"))
        data_files = list(data_directory.glob("*.nc"))

        # Prepare the output bundles
        cmec_output_bundle, cmec_metric_bundle = process_json_result(results_file, png_files, data_files)

        # Add missing dimensions to the output
        input_selectors = input_datasets.selector_dict()
        reference_selectors = definition.datasets[SourceDatasetType.PMPClimatology].selector_dict()
        cmec_metric_bundle = cmec_metric_bundle.prepend_dimensions(
            {
                "source_id": input_selectors["source_id"],
                "member_id": input_selectors["member_id"],
                "experiment_id": input_selectors["experiment_id"],
                "variable_id": input_selectors["variable_id"],
                "reference_source_id": reference_selectors["source_id"],
            }
        )

        return ExecutionResult.build_from_output_bundle(
            definition,
            cmec_output_bundle=cmec_output_bundle,
            cmec_metric_bundle=cmec_metric_bundle,
        )

    def execute(self, definition: ExecutionDefinition) -> None:
        """
        Run the diagnostic on the given configuration.

        Parameters
        ----------
        definition : ExecutionDefinition
            The configuration to run the diagnostic on.

        Returns
        -------
        :
            The result of running the diagnostic.
        """
        cmds = self.build_cmds(definition)

        runs = [self.provider.run(cmd) for cmd in cmds]
        logger.debug(f"runs: {runs}")

build_cmds(definition) #

Build the command to run the diagnostic

Parameters:

Name Type Description Default
definition ExecutionDefinition

Definition of the diagnostic execution

required

Returns:

Type Description
Command arguments to execute in the PMP environment
Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/annual_cycle.py
def build_cmds(self, definition: ExecutionDefinition) -> list[list[str]]:
    """
    Build the command to run the diagnostic

    Parameters
    ----------
    definition
        Definition of the diagnostic execution

    Returns
    -------
        Command arguments to execute in the PMP environment
    """
    input_datasets = definition.datasets[SourceDatasetType.CMIP6]
    reference_datasets = definition.datasets[SourceDatasetType.PMPClimatology]

    source_id = input_datasets["source_id"].unique()[0]
    experiment_id = input_datasets["experiment_id"].unique()[0]
    member_id = input_datasets["member_id"].unique()[0]
    variable_id = input_datasets["variable_id"].unique()[0]

    model_files_raw = input_datasets.path.to_list()
    if len(model_files_raw) == 1:
        model_files = model_files_raw[0]  # If only one file, use it directly
    elif len(model_files_raw) > 1:
        model_files = build_glob_pattern(model_files_raw)  # If multiple files, build a glob pattern
    else:
        raise ValueError("No model files found")

    logger.debug("build_cmd start")

    logger.debug(f"input_datasets: {input_datasets}")
    logger.debug(f"input_datasets.keys(): {input_datasets.keys()}")

    reference_dataset_name = reference_datasets["source_id"].unique()[0]
    reference_dataset_path = reference_datasets.datasets.iloc[0]["path"]

    logger.debug(f"reference_dataset.datasets: {reference_datasets.datasets}")
    logger.debug(f"reference_dataset_name: {reference_dataset_name}")
    logger.debug(f"reference_dataset_path: {reference_dataset_path}")

    output_directory_path = str(definition.output_directory)

    cmds = []

    # ----------------------------------------------
    # PART 1: Build the command to get climatologies
    # ----------------------------------------------
    # Model
    data_name = f"{source_id}_{experiment_id}_{member_id}"
    data_path = model_files
    params = {
        "vars": variable_id,
        "infile": data_path,
        "outfile": f"{output_directory_path}/{variable_id}_{data_name}_clims.nc",
    }

    cmds.append(
        build_pmp_command(
            driver_file="pcmdi_compute_climatologies.py",
            parameter_file=self.parameter_file_1,
            **params,
        )
    )

    # --------------------------------------------------
    # PART 2: Build the command to calculate diagnostics
    # --------------------------------------------------
    # Reference
    obs_dict = {
        variable_id: {
            reference_dataset_name: {
                "template": reference_dataset_path,
            },
            "default": reference_dataset_name,
        }
    }

    # Generate a JSON file based on the obs_dict
    with open(f"{output_directory_path}/obs_dict.json", "w") as f:
        json.dump(obs_dict, f)

    date = datetime.datetime.now().strftime("%Y%m%d")

    if variable_id in ["ua", "va", "ta"]:
        levels = ["200", "850"]
    elif variable_id in ["zg"]:
        levels = ["500"]
    else:
        levels = None

    variables = []
    if levels is not None:
        for level in levels:
            variable_id_with_level = f"{variable_id}-{level}"
            variables.append(variable_id_with_level)
    else:
        variables = [variable_id]

    logger.debug(f"variables: {variables}")
    logger.debug(f"levels: {levels}")

    # Build the command for each level
    params = {
        "vars": variables,
        "custom_observations": f"{output_directory_path}/obs_dict.json",
        "test_data_path": output_directory_path,
        "test_data_set": source_id,
        "realization": member_id,
        "filename_template": f"%(variable)_{data_name}_clims.198101-200512.AC.v{date}.nc",
        "metrics_output_path": output_directory_path,
        "cmec": "",
    }

    cmds.append(
        build_pmp_command(
            driver_file="mean_climate_driver.py",
            parameter_file=self.parameter_file_2,
            **params,
        )
    )

    logger.debug("build_cmd end")
    logger.debug(f"cmds: {cmds}")

    return cmds

build_execution_result(definition) #

Build a diagnostic result from the output of the PMP driver

Parameters:

Name Type Description Default
definition ExecutionDefinition

Definition of the diagnostic execution

required

Returns:

Type Description
Result of the diagnostic execution
Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/annual_cycle.py
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
    """
    Build a diagnostic result from the output of the PMP driver

    Parameters
    ----------
    definition
        Definition of the diagnostic execution

    Returns
    -------
        Result of the diagnostic execution
    """
    input_datasets = definition.datasets[SourceDatasetType.CMIP6]
    variable_id = input_datasets["variable_id"].unique()[0]

    if variable_id in ["ua", "va", "ta"]:
        variable_dir_pattern = f"{variable_id}-???"
    else:
        variable_dir_pattern = variable_id

    results_directory = definition.output_directory
    png_directory = results_directory / variable_dir_pattern
    data_directory = results_directory / variable_dir_pattern

    logger.debug(f"results_directory: {results_directory}")
    logger.debug(f"png_directory: {png_directory}")
    logger.debug(f"data_directory: {data_directory}")

    # Find the CMEC JSON file(s)
    results_files = transform_results_files(list(results_directory.glob("*_cmec.json")))

    if len(results_files) == 1:
        # If only one file, use it directly
        results_file = results_files[0]
        logger.debug(f"results_file: {results_file}")
    elif len(results_files) > 1:
        logger.info(f"More than one cmec file found: {results_files}")
        results_file = combine_results_files(results_files, definition.output_directory)
    else:
        logger.error("Unexpected case: no cmec file found")
        return ExecutionResult.build_from_failure(definition)

    # Find the other outputs: PNG and NetCDF files
    png_files = list(png_directory.glob("*.png"))
    data_files = list(data_directory.glob("*.nc"))

    # Prepare the output bundles
    cmec_output_bundle, cmec_metric_bundle = process_json_result(results_file, png_files, data_files)

    # Add missing dimensions to the output
    input_selectors = input_datasets.selector_dict()
    reference_selectors = definition.datasets[SourceDatasetType.PMPClimatology].selector_dict()
    cmec_metric_bundle = cmec_metric_bundle.prepend_dimensions(
        {
            "source_id": input_selectors["source_id"],
            "member_id": input_selectors["member_id"],
            "experiment_id": input_selectors["experiment_id"],
            "variable_id": input_selectors["variable_id"],
            "reference_source_id": reference_selectors["source_id"],
        }
    )

    return ExecutionResult.build_from_output_bundle(
        definition,
        cmec_output_bundle=cmec_output_bundle,
        cmec_metric_bundle=cmec_metric_bundle,
    )

execute(definition) #

Run the diagnostic on the given configuration.

Parameters:

Name Type Description Default
definition ExecutionDefinition

The configuration to run the diagnostic on.

required

Returns:

Type Description
None

The result of running the diagnostic.

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/annual_cycle.py
def execute(self, definition: ExecutionDefinition) -> None:
    """
    Run the diagnostic on the given configuration.

    Parameters
    ----------
    definition : ExecutionDefinition
        The configuration to run the diagnostic on.

    Returns
    -------
    :
        The result of running the diagnostic.
    """
    cmds = self.build_cmds(definition)

    runs = [self.provider.run(cmd) for cmd in cmds]
    logger.debug(f"runs: {runs}")

ENSO #

Bases: CommandLineDiagnostic

Calculate the ENSO performance metrics for a dataset

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/enso.py
class ENSO(CommandLineDiagnostic):
    """
    Calculate the ENSO performance metrics for a dataset
    """

    facets = ("source_id", "member_id", "grid_label", "experiment_id", "metric", "reference_datasets")

    def __init__(self, metrics_collection: str, experiments: Collection[str] = ("historical",)) -> None:
        self.name = metrics_collection
        self.slug = metrics_collection.lower()
        self.metrics_collection = metrics_collection
        self.parameter_file = "pmp_param_enso.py"
        self.obs_sources: tuple[str, ...]
        self.model_variables: tuple[str, ...]

        if metrics_collection == "ENSO_perf":  # pragma: no cover
            self.model_variables = ("pr", "ts", "tauu")
            self.obs_sources = ("GPCP-Monthly-3-2", "TropFlux-1-0", "HadISST-1-1")
        elif metrics_collection == "ENSO_tel":
            self.model_variables = ("pr", "ts")
            self.obs_sources = ("GPCP-Monthly-3-2", "TropFlux-1-0", "HadISST-1-1")
        elif metrics_collection == "ENSO_proc":
            self.model_variables = ("ts", "tauu", "hfls", "hfss", "rlds", "rlus", "rsds", "rsus")
            self.obs_sources = (
                "GPCP-Monthly-3-2",
                "TropFlux-1-0",
                "HadISST-1-1",
                "CERES-EBAF-4-2",
            )
        else:
            raise ValueError(
                f"Unknown metrics collection: {metrics_collection}. "
                "Valid options are: ENSO_perf, ENSO_tel, ENSO_proc"
            )

        self.data_requirements = self._get_data_requirements(experiments)

    def _get_data_requirements(
        self,
        experiments: Collection[str] = ("historical",),
    ) -> tuple[DataRequirement, DataRequirement]:
        filters = [
            FacetFilter(
                facets={
                    "frequency": "mon",
                    "experiment_id": tuple(experiments),
                    "variable_id": self.model_variables,
                }
            )
        ]

        return (
            DataRequirement(
                source_type=SourceDatasetType.obs4MIPs,
                filters=(
                    FacetFilter(facets={"source_id": self.obs_sources, "variable_id": self.model_variables}),
                ),
                group_by=("activity_id",),
            ),
            DataRequirement(
                source_type=SourceDatasetType.CMIP6,
                filters=tuple(filters),
                group_by=("source_id", "experiment_id", "member_id", "grid_label"),
                constraints=(
                    AddSupplementaryDataset.from_defaults("areacella", SourceDatasetType.CMIP6),
                    AddSupplementaryDataset.from_defaults("sftlf", SourceDatasetType.CMIP6),
                ),
            ),
        )

    def build_cmd(self, definition: ExecutionDefinition) -> Iterable[str]:
        """
        Run the diagnostic on the given configuration.

        Parameters
        ----------
        definition : ExecutionDefinition
            The configuration to run the diagnostic on.

        Returns
        -------
        :
            The result of running the diagnostic.
        """
        mc_name = self.metrics_collection

        # ------------------------------------------------
        # Get the input datasets information for the model
        # ------------------------------------------------
        input_datasets = definition.datasets[SourceDatasetType.CMIP6]
        input_selectors = input_datasets.selector_dict()
        source_id = input_selectors["source_id"]
        member_id = input_selectors["member_id"]
        experiment_id = input_selectors["experiment_id"]
        variable_ids = set(input_datasets["variable_id"].unique()) - {"areacella", "sftlf"}
        mod_run = f"{source_id}_{member_id}"

        # We only need one entry for the model run
        dict_mod: dict[str, dict[str, Any]] = {mod_run: {}}

        def extract_variable(dc: DatasetCollection, variable: str) -> list[str]:
            return dc.datasets[input_datasets["variable_id"] == variable]["path"].to_list()  # type: ignore

        # TO DO: Get the path to the files per variable
        for variable in variable_ids:
            list_files = extract_variable(input_datasets, variable)
            list_areacella = extract_variable(input_datasets, "areacella")
            list_sftlf = extract_variable(input_datasets, "sftlf")

            if len(list_files) > 0:
                dict_mod[mod_run][variable] = {
                    "path + filename": list_files,
                    "varname": variable,
                    "path + filename_area": list_areacella,
                    "areaname": "areacella",
                    "path + filename_landmask": list_sftlf,
                    "landmaskname": "sftlf",
                }

        # -------------------------------------------------------
        # Get the input datasets information for the observations
        # -------------------------------------------------------
        reference_dataset = definition.datasets[SourceDatasetType.obs4MIPs]
        reference_dataset_names = reference_dataset["source_id"].unique()

        dict_obs: dict[str, dict[str, Any]] = {}

        # TO DO: Get the path to the files per variable and per source
        for obs_name in reference_dataset_names:
            dict_obs[obs_name] = {}
            for variable in variable_ids:
                # Get the list of files for the current variable and observation source
                list_files = reference_dataset.datasets[
                    (reference_dataset["variable_id"] == variable)
                    & (reference_dataset["source_id"] == obs_name)
                ]["path"].to_list()
                # If the list is not empty, add it to the dictionary
                if len(list_files) > 0:
                    dict_obs[obs_name][variable] = {
                        "path + filename": list_files,
                        "varname": variable,
                    }

        # Create input directory
        dict_datasets = {
            "model": dict_mod,
            "observations": dict_obs,
            "metricsCollection": mc_name,
            "experiment_id": experiment_id,
        }

        # Create JSON file for dictDatasets
        json_file = os.path.join(
            definition.output_directory, f"input_{mc_name}_{source_id}_{experiment_id}_{member_id}.json"
        )
        with open(json_file, "w") as f:
            json.dump(dict_datasets, f, indent=4)
        logger.debug(f"JSON file created: {json_file}")

        driver_file = _get_resource("climate_ref_pmp.drivers", "enso_driver.py", use_resources=True)
        return [
            "python",
            driver_file,
            "--metrics_collection",
            mc_name,
            "--experiment_id",
            experiment_id,
            "--input_json_path",
            json_file,
            "--output_directory",
            str(definition.output_directory),
        ]

    def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
        """
        Build a diagnostic result from the output of the PMP driver

        Parameters
        ----------
        definition
            Definition of the diagnostic execution

        Returns
        -------
            Result of the diagnostic execution
        """
        input_datasets = definition.datasets[SourceDatasetType.CMIP6]
        source_id = input_datasets["source_id"].unique()[0]
        experiment_id = input_datasets["experiment_id"].unique()[0]
        member_id = input_datasets["member_id"].unique()[0]
        mc_name = self.metrics_collection
        pattern = f"{mc_name}_{source_id}_{experiment_id}_{member_id}"

        # Find the results files
        results_files = list(definition.output_directory.glob(f"{pattern}_cmec.json"))
        logger.debug(f"Results files: {results_files}")

        if len(results_files) != 1:  # pragma: no cover
            logger.warning(f"A single cmec output file not found: {results_files}")
            return ExecutionResult.build_from_failure(definition)

        # Find the other outputs
        png_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.png")]
        data_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.nc")]

        cmec_output, cmec_metric = process_json_result(results_files[0], png_files, data_files)

        input_selectors = definition.datasets[SourceDatasetType.CMIP6].selector_dict()
        cmec_metric_bundle = cmec_metric.remove_dimensions(
            [
                "model",
                "realization",
            ],
        ).prepend_dimensions(
            {
                "source_id": input_selectors["source_id"],
                "member_id": input_selectors["member_id"],
                "grid_label": input_selectors["grid_label"],
                "experiment_id": input_selectors["experiment_id"],
            }
        )

        return ExecutionResult.build_from_output_bundle(
            definition,
            cmec_output_bundle=cmec_output,
            cmec_metric_bundle=cmec_metric_bundle,
        )

build_cmd(definition) #

Run the diagnostic on the given configuration.

Parameters:

Name Type Description Default
definition ExecutionDefinition

The configuration to run the diagnostic on.

required

Returns:

Type Description
Iterable[str]

The result of running the diagnostic.

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/enso.py
def build_cmd(self, definition: ExecutionDefinition) -> Iterable[str]:
    """
    Run the diagnostic on the given configuration.

    Parameters
    ----------
    definition : ExecutionDefinition
        The configuration to run the diagnostic on.

    Returns
    -------
    :
        The result of running the diagnostic.
    """
    mc_name = self.metrics_collection

    # ------------------------------------------------
    # Get the input datasets information for the model
    # ------------------------------------------------
    input_datasets = definition.datasets[SourceDatasetType.CMIP6]
    input_selectors = input_datasets.selector_dict()
    source_id = input_selectors["source_id"]
    member_id = input_selectors["member_id"]
    experiment_id = input_selectors["experiment_id"]
    variable_ids = set(input_datasets["variable_id"].unique()) - {"areacella", "sftlf"}
    mod_run = f"{source_id}_{member_id}"

    # We only need one entry for the model run
    dict_mod: dict[str, dict[str, Any]] = {mod_run: {}}

    def extract_variable(dc: DatasetCollection, variable: str) -> list[str]:
        return dc.datasets[input_datasets["variable_id"] == variable]["path"].to_list()  # type: ignore

    # TO DO: Get the path to the files per variable
    for variable in variable_ids:
        list_files = extract_variable(input_datasets, variable)
        list_areacella = extract_variable(input_datasets, "areacella")
        list_sftlf = extract_variable(input_datasets, "sftlf")

        if len(list_files) > 0:
            dict_mod[mod_run][variable] = {
                "path + filename": list_files,
                "varname": variable,
                "path + filename_area": list_areacella,
                "areaname": "areacella",
                "path + filename_landmask": list_sftlf,
                "landmaskname": "sftlf",
            }

    # -------------------------------------------------------
    # Get the input datasets information for the observations
    # -------------------------------------------------------
    reference_dataset = definition.datasets[SourceDatasetType.obs4MIPs]
    reference_dataset_names = reference_dataset["source_id"].unique()

    dict_obs: dict[str, dict[str, Any]] = {}

    # TO DO: Get the path to the files per variable and per source
    for obs_name in reference_dataset_names:
        dict_obs[obs_name] = {}
        for variable in variable_ids:
            # Get the list of files for the current variable and observation source
            list_files = reference_dataset.datasets[
                (reference_dataset["variable_id"] == variable)
                & (reference_dataset["source_id"] == obs_name)
            ]["path"].to_list()
            # If the list is not empty, add it to the dictionary
            if len(list_files) > 0:
                dict_obs[obs_name][variable] = {
                    "path + filename": list_files,
                    "varname": variable,
                }

    # Create input directory
    dict_datasets = {
        "model": dict_mod,
        "observations": dict_obs,
        "metricsCollection": mc_name,
        "experiment_id": experiment_id,
    }

    # Create JSON file for dictDatasets
    json_file = os.path.join(
        definition.output_directory, f"input_{mc_name}_{source_id}_{experiment_id}_{member_id}.json"
    )
    with open(json_file, "w") as f:
        json.dump(dict_datasets, f, indent=4)
    logger.debug(f"JSON file created: {json_file}")

    driver_file = _get_resource("climate_ref_pmp.drivers", "enso_driver.py", use_resources=True)
    return [
        "python",
        driver_file,
        "--metrics_collection",
        mc_name,
        "--experiment_id",
        experiment_id,
        "--input_json_path",
        json_file,
        "--output_directory",
        str(definition.output_directory),
    ]

build_execution_result(definition) #

Build a diagnostic result from the output of the PMP driver

Parameters:

Name Type Description Default
definition ExecutionDefinition

Definition of the diagnostic execution

required

Returns:

Type Description
Result of the diagnostic execution
Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/enso.py
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
    """
    Build a diagnostic result from the output of the PMP driver

    Parameters
    ----------
    definition
        Definition of the diagnostic execution

    Returns
    -------
        Result of the diagnostic execution
    """
    input_datasets = definition.datasets[SourceDatasetType.CMIP6]
    source_id = input_datasets["source_id"].unique()[0]
    experiment_id = input_datasets["experiment_id"].unique()[0]
    member_id = input_datasets["member_id"].unique()[0]
    mc_name = self.metrics_collection
    pattern = f"{mc_name}_{source_id}_{experiment_id}_{member_id}"

    # Find the results files
    results_files = list(definition.output_directory.glob(f"{pattern}_cmec.json"))
    logger.debug(f"Results files: {results_files}")

    if len(results_files) != 1:  # pragma: no cover
        logger.warning(f"A single cmec output file not found: {results_files}")
        return ExecutionResult.build_from_failure(definition)

    # Find the other outputs
    png_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.png")]
    data_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.nc")]

    cmec_output, cmec_metric = process_json_result(results_files[0], png_files, data_files)

    input_selectors = definition.datasets[SourceDatasetType.CMIP6].selector_dict()
    cmec_metric_bundle = cmec_metric.remove_dimensions(
        [
            "model",
            "realization",
        ],
    ).prepend_dimensions(
        {
            "source_id": input_selectors["source_id"],
            "member_id": input_selectors["member_id"],
            "grid_label": input_selectors["grid_label"],
            "experiment_id": input_selectors["experiment_id"],
        }
    )

    return ExecutionResult.build_from_output_bundle(
        definition,
        cmec_output_bundle=cmec_output,
        cmec_metric_bundle=cmec_metric_bundle,
    )

ExtratropicalModesOfVariability #

Bases: CommandLineDiagnostic

Calculate the extratropical modes of variability for a given area

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/variability_modes.py
class ExtratropicalModesOfVariability(CommandLineDiagnostic):
    """
    Calculate the extratropical modes of variability for a given area
    """

    ts_modes = ("PDO", "NPGO", "AMO")
    psl_modes = ("NAO", "NAM", "PNA", "NPO", "SAM")

    facets = (
        "source_id",
        "member_id",
        "experiment_id",
        "reference_source_id",
        "mode",
        "season",
        "method",
        "statistic",
    )

    def __init__(self, mode_id: str):
        super().__init__()
        self.mode_id = mode_id.upper()
        self.name = f"Extratropical modes of variability: {mode_id}"
        self.slug = f"extratropical-modes-of-variability-{mode_id.lower()}"

        def _get_data_requirements(
            obs_source: str,
            obs_variable: str,
            model_variable: str,
            extra_experiments: str | tuple[str, ...] | list[str] = (),
        ) -> tuple[DataRequirement, DataRequirement]:
            filters = [
                FacetFilter(
                    facets={
                        "frequency": "mon",
                        "experiment_id": ("historical", "hist-GHG", *extra_experiments),
                        "variable_id": model_variable,
                    }
                )
            ]

            return (
                DataRequirement(
                    source_type=SourceDatasetType.obs4MIPs,
                    filters=(
                        FacetFilter(facets={"source_id": (obs_source,), "variable_id": (obs_variable,)}),
                    ),
                    group_by=("source_id", "variable_id"),
                ),
                DataRequirement(
                    source_type=SourceDatasetType.CMIP6,
                    filters=tuple(filters),
                    group_by=("source_id", "experiment_id", "member_id", "grid_label"),
                ),
            )

        if self.mode_id in self.ts_modes:
            self.parameter_file = "pmp_param_MoV-ts.py"
            self.data_requirements = _get_data_requirements("HadISST-1-1", "ts", "ts")
        elif self.mode_id in self.psl_modes:
            self.parameter_file = "pmp_param_MoV-psl.py"
            self.data_requirements = _get_data_requirements("20CR", "psl", "psl", extra_experiments=("amip",))
        else:
            raise ValueError(
                f"Unknown mode_id '{self.mode_id}'. Must be one of {self.ts_modes + self.psl_modes}"
            )

    def build_cmd(self, definition: ExecutionDefinition) -> Iterable[str]:
        """
        Build the command to run the diagnostic

        Parameters
        ----------
        definition
            Definition of the diagnostic execution

        Returns
        -------
            Command arguments to execute in the PMP environment
        """
        input_datasets = definition.datasets[SourceDatasetType.CMIP6]
        source_id = input_datasets["source_id"].unique()[0]
        experiment_id = input_datasets["experiment_id"].unique()[0]
        member_id = input_datasets["member_id"].unique()[0]

        logger.debug(f"input_datasets: {input_datasets}")
        logger.debug(f"source_id: {source_id}")
        logger.debug(f"experiment_id: {experiment_id}")
        logger.debug(f"member_id: {member_id}")

        reference_dataset = definition.datasets[SourceDatasetType.obs4MIPs]
        reference_dataset_name = reference_dataset["source_id"].unique()[0]
        reference_dataset_path = reference_dataset.datasets.iloc[0]["path"]

        logger.debug(f"reference_dataset: {reference_dataset}")
        logger.debug(f"reference_dataset_name: {reference_dataset_name}")
        logger.debug(f"reference_dataset_path: {reference_dataset_path}")

        model_files = input_datasets.path.to_list()

        if len(model_files) != 1:
            # Have some logic to replace the dates in the filename with a wildcard
            raise NotImplementedError("Only one model file is supported at this time.")

        if isinstance(model_files, list):
            modpath = " ".join([str(p) for p in model_files])
        else:
            modpath = model_files

        if isinstance(reference_dataset_path, list):
            reference_data_path = " ".join([str(p) for p in reference_dataset_path])
        else:
            reference_data_path = reference_dataset_path

        # Build the command to run the PMP driver script
        params: dict[str, str | int | None] = {
            "variability_mode": self.mode_id,
            "modpath": modpath,
            "modpath_lf": "none",
            "exp": experiment_id,
            "realization": member_id,
            "modnames": source_id,
            "reference_data_name": reference_dataset_name,
            "reference_data_path": reference_data_path,
            "results_dir": str(definition.output_directory),
            "cmec": None,
            "no_provenance": None,
        }

        # Add conditional parameters
        if self.mode_id in ["SAM"]:  # pragma: no cover
            params["osyear"] = 1950
            params["oeyear"] = 2005

        if self.mode_id in ["NPO", "NPGO"]:
            params["eofn_obs"] = 2
            params["eofn_mod"] = 2
            params["eofn_mod_max"] = 2

        # Pass the parameters using **kwargs
        return build_pmp_command(
            driver_file="variability_modes_driver.py",
            parameter_file=self.parameter_file,
            **params,
        )

    def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
        """
        Build a diagnostic result from the output of the PMP driver

        Parameters
        ----------
        definition
            Definition of the diagnostic execution

        Returns
        -------
            Result of the diagnostic execution
        """
        results_files = list(definition.output_directory.glob("*_cmec.json"))
        if len(results_files) != 1:  # pragma: no cover
            logger.warning(f"A single cmec output file not found: {results_files}")
            return ExecutionResult.build_from_failure(definition)

        clean_up_json(results_files[0])

        # Find the other outputs
        png_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.png")]
        data_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.nc")]

        cmec_output_bundle, cmec_metric_bundle = process_json_result(results_files[0], png_files, data_files)

        # Add additional metadata to the metrics
        input_selectors = definition.datasets[SourceDatasetType.CMIP6].selector_dict()
        reference_selectors = definition.datasets[SourceDatasetType.obs4MIPs].selector_dict()
        cmec_metric_bundle = cmec_metric_bundle.remove_dimensions(
            [
                "model",
                "realization",
                "reference",
            ],
        ).prepend_dimensions(
            {
                "source_id": input_selectors["source_id"],
                "member_id": input_selectors["member_id"],
                "experiment_id": input_selectors["experiment_id"],
                "reference_source_id": reference_selectors["source_id"],
            }
        )

        return ExecutionResult.build_from_output_bundle(
            definition,
            cmec_output_bundle=cmec_output_bundle,
            cmec_metric_bundle=cmec_metric_bundle,
        )

build_cmd(definition) #

Build the command to run the diagnostic

Parameters:

Name Type Description Default
definition ExecutionDefinition

Definition of the diagnostic execution

required

Returns:

Type Description
Command arguments to execute in the PMP environment
Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/variability_modes.py
def build_cmd(self, definition: ExecutionDefinition) -> Iterable[str]:
    """
    Build the command to run the diagnostic

    Parameters
    ----------
    definition
        Definition of the diagnostic execution

    Returns
    -------
        Command arguments to execute in the PMP environment
    """
    input_datasets = definition.datasets[SourceDatasetType.CMIP6]
    source_id = input_datasets["source_id"].unique()[0]
    experiment_id = input_datasets["experiment_id"].unique()[0]
    member_id = input_datasets["member_id"].unique()[0]

    logger.debug(f"input_datasets: {input_datasets}")
    logger.debug(f"source_id: {source_id}")
    logger.debug(f"experiment_id: {experiment_id}")
    logger.debug(f"member_id: {member_id}")

    reference_dataset = definition.datasets[SourceDatasetType.obs4MIPs]
    reference_dataset_name = reference_dataset["source_id"].unique()[0]
    reference_dataset_path = reference_dataset.datasets.iloc[0]["path"]

    logger.debug(f"reference_dataset: {reference_dataset}")
    logger.debug(f"reference_dataset_name: {reference_dataset_name}")
    logger.debug(f"reference_dataset_path: {reference_dataset_path}")

    model_files = input_datasets.path.to_list()

    if len(model_files) != 1:
        # Have some logic to replace the dates in the filename with a wildcard
        raise NotImplementedError("Only one model file is supported at this time.")

    if isinstance(model_files, list):
        modpath = " ".join([str(p) for p in model_files])
    else:
        modpath = model_files

    if isinstance(reference_dataset_path, list):
        reference_data_path = " ".join([str(p) for p in reference_dataset_path])
    else:
        reference_data_path = reference_dataset_path

    # Build the command to run the PMP driver script
    params: dict[str, str | int | None] = {
        "variability_mode": self.mode_id,
        "modpath": modpath,
        "modpath_lf": "none",
        "exp": experiment_id,
        "realization": member_id,
        "modnames": source_id,
        "reference_data_name": reference_dataset_name,
        "reference_data_path": reference_data_path,
        "results_dir": str(definition.output_directory),
        "cmec": None,
        "no_provenance": None,
    }

    # Add conditional parameters
    if self.mode_id in ["SAM"]:  # pragma: no cover
        params["osyear"] = 1950
        params["oeyear"] = 2005

    if self.mode_id in ["NPO", "NPGO"]:
        params["eofn_obs"] = 2
        params["eofn_mod"] = 2
        params["eofn_mod_max"] = 2

    # Pass the parameters using **kwargs
    return build_pmp_command(
        driver_file="variability_modes_driver.py",
        parameter_file=self.parameter_file,
        **params,
    )

build_execution_result(definition) #

Build a diagnostic result from the output of the PMP driver

Parameters:

Name Type Description Default
definition ExecutionDefinition

Definition of the diagnostic execution

required

Returns:

Type Description
Result of the diagnostic execution
Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/variability_modes.py
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
    """
    Build a diagnostic result from the output of the PMP driver

    Parameters
    ----------
    definition
        Definition of the diagnostic execution

    Returns
    -------
        Result of the diagnostic execution
    """
    results_files = list(definition.output_directory.glob("*_cmec.json"))
    if len(results_files) != 1:  # pragma: no cover
        logger.warning(f"A single cmec output file not found: {results_files}")
        return ExecutionResult.build_from_failure(definition)

    clean_up_json(results_files[0])

    # Find the other outputs
    png_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.png")]
    data_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.nc")]

    cmec_output_bundle, cmec_metric_bundle = process_json_result(results_files[0], png_files, data_files)

    # Add additional metadata to the metrics
    input_selectors = definition.datasets[SourceDatasetType.CMIP6].selector_dict()
    reference_selectors = definition.datasets[SourceDatasetType.obs4MIPs].selector_dict()
    cmec_metric_bundle = cmec_metric_bundle.remove_dimensions(
        [
            "model",
            "realization",
            "reference",
        ],
    ).prepend_dimensions(
        {
            "source_id": input_selectors["source_id"],
            "member_id": input_selectors["member_id"],
            "experiment_id": input_selectors["experiment_id"],
            "reference_source_id": reference_selectors["source_id"],
        }
    )

    return ExecutionResult.build_from_output_bundle(
        definition,
        cmec_output_bundle=cmec_output_bundle,
        cmec_metric_bundle=cmec_metric_bundle,
    )

sub-packages#

Sub-package Description
annual_cycle
enso
variability_modes