Skip to content

climate_ref.models #

Declaration of the models used by the REF.

These models are used to represent the data that is stored in the database.

Base #

Bases: DeclarativeBase

Base class for all models

Source code in packages/climate-ref/src/climate_ref/models/base.py
class Base(DeclarativeBase):
    """
    Base class for all models
    """

    type_annotation_map = {  # noqa: RUF012
        dict[str, Any]: JSON,
        list[float | int]: JSON,
        list[float | int | str]: JSON,
    }
    metadata = MetaData(
        # Enforce a common naming convention for constraints
        # https://alembic.sqlalchemy.org/en/latest/naming.html
        naming_convention={
            "ix": "ix_%(column_0_label)s",
            "uq": "uq_%(table_name)s_%(column_0_name)s",
            "ck": "ck_%(table_name)s_`%(constraint_name)s`",
            "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
            "pk": "pk_%(table_name)s",
        }
    )

Dataset #

Bases: Base

Represents a dataset

A dataset is a collection of data files, that is used as an input to the benchmarking process. Adding/removing or updating a dataset will trigger a new diagnostic calculation.

A polymorphic association is used to capture the different types of datasets as each dataset type may have different metadata fields. This enables the use of a single table to store all datasets, but still allows for querying specific metadata fields for each dataset type.

Source code in packages/climate-ref/src/climate_ref/models/dataset.py
class Dataset(Base):
    """
    Represents a dataset

    A dataset is a collection of data files, that is used as an input to the benchmarking process.
    Adding/removing or updating a dataset will trigger a new diagnostic calculation.

    A polymorphic association is used to capture the different types of datasets as each
    dataset type may have different metadata fields.
    This enables the use of a single table to store all datasets,
    but still allows for querying specific metadata fields for each dataset type.
    """

    __tablename__ = "dataset"

    id: Mapped[int] = mapped_column(primary_key=True)
    slug: Mapped[str] = mapped_column(unique=True)
    """
    Globally unique identifier for the dataset.

    In the case of CMIP6 datasets, this is the instance_id.
    """
    dataset_type: Mapped[SourceDatasetType] = mapped_column(nullable=False, index=True)
    """
    Type of dataset
    """
    created_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    """
    When the dataset was added to the database
    """
    updated_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now(), onupdate=func.now())
    """
    When the dataset was updated.

    Updating a dataset will trigger a new diagnostic calculation.
    """

    # Universal finalisation flag for all dataset types
    # Only CMIP6 currently uses unfinalised datasets in practice; other types should be finalised on creation.
    finalised: Mapped[bool] = mapped_column(default=True, nullable=False)
    """
    Whether the complete set of metadata for the dataset has been finalised.

    For CMIP6, ingestion may initially create unfinalised datasets (False) until all metadata is extracted.
    For other dataset types (e.g., obs4MIPs, PMP climatology), this should be True upon creation.
    """

    def __repr__(self) -> str:
        return f"<Dataset slug={self.slug} dataset_type={self.dataset_type} >"

    __mapper_args__: ClassVar[Any] = {"polymorphic_on": dataset_type}  # type: ignore

created_at = mapped_column(server_default=func.now()) class-attribute instance-attribute #

When the dataset was added to the database

dataset_type = mapped_column(nullable=False, index=True) class-attribute instance-attribute #

Type of dataset

finalised = mapped_column(default=True, nullable=False) class-attribute instance-attribute #

Whether the complete set of metadata for the dataset has been finalised.

For CMIP6, ingestion may initially create unfinalised datasets (False) until all metadata is extracted. For other dataset types (e.g., obs4MIPs, PMP climatology), this should be True upon creation.

slug = mapped_column(unique=True) class-attribute instance-attribute #

Globally unique identifier for the dataset.

In the case of CMIP6 datasets, this is the instance_id.

updated_at = mapped_column(server_default=func.now(), onupdate=func.now()) class-attribute instance-attribute #

When the dataset was updated.

Updating a dataset will trigger a new diagnostic calculation.

Diagnostic #

Bases: CreatedUpdatedMixin, Base

Represents a diagnostic that can be calculated

Source code in packages/climate-ref/src/climate_ref/models/diagnostic.py
class Diagnostic(CreatedUpdatedMixin, Base):
    """
    Represents a diagnostic that can be calculated
    """

    __tablename__ = "diagnostic"
    __table_args__ = (UniqueConstraint("provider_id", "slug", name="diagnostic_ident"),)

    id: Mapped[int] = mapped_column(primary_key=True)
    slug: Mapped[str] = mapped_column()
    """
    Unique identifier for the diagnostic

    This will be used to reference the diagnostic in the benchmarking process
    """

    name: Mapped[str] = mapped_column()
    """
    Long name of the diagnostic
    """

    provider_id: Mapped[int] = mapped_column(ForeignKey("provider.id"))
    """
    The provider that provides the diagnostic
    """

    enabled: Mapped[bool] = mapped_column(default=True)
    """
    Whether the diagnostic is enabled or not

    If a diagnostic is not enabled, it will not be used for any calculations.
    """

    provider: Mapped["Provider"] = relationship(back_populates="diagnostics")
    execution_groups: Mapped[list["ExecutionGroup"]] = relationship(back_populates="diagnostic")

    def __repr__(self) -> str:
        return f"<Metric slug={self.slug}>"

    def full_slug(self) -> str:
        """
        Get the full slug of the diagnostic, including the provider slug

        Returns
        -------
        str
            Full slug of the diagnostic
        """
        return f"{self.provider.slug}/{self.slug}"

enabled = mapped_column(default=True) class-attribute instance-attribute #

Whether the diagnostic is enabled or not

If a diagnostic is not enabled, it will not be used for any calculations.

name = mapped_column() class-attribute instance-attribute #

Long name of the diagnostic

provider_id = mapped_column(ForeignKey('provider.id')) class-attribute instance-attribute #

The provider that provides the diagnostic

slug = mapped_column() class-attribute instance-attribute #

Unique identifier for the diagnostic

This will be used to reference the diagnostic in the benchmarking process

full_slug() #

Get the full slug of the diagnostic, including the provider slug

Returns:

Type Description
str

Full slug of the diagnostic

Source code in packages/climate-ref/src/climate_ref/models/diagnostic.py
def full_slug(self) -> str:
    """
    Get the full slug of the diagnostic, including the provider slug

    Returns
    -------
    str
        Full slug of the diagnostic
    """
    return f"{self.provider.slug}/{self.slug}"

Execution #

Bases: CreatedUpdatedMixin, Base

Represents a single execution of a diagnostic

Each result is part of a group of executions that share similar input datasets.

An execution group might be run multiple times as new data becomes available, each run will create a Execution.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class Execution(CreatedUpdatedMixin, Base):
    """
    Represents a single execution of a diagnostic

    Each result is part of a group of executions that share similar input datasets.

    An execution group might be run multiple times as new data becomes available,
    each run will create a `Execution`.
    """

    __tablename__ = "execution"

    id: Mapped[int] = mapped_column(primary_key=True)

    output_fragment: Mapped[str] = mapped_column()
    """
    Relative directory to store the output of the execution.

    During execution this directory is relative to the temporary directory.
    If the diagnostic execution is successful, the executions will be moved to the final output directory
    and the temporary directory will be cleaned up.
    This directory may contain multiple input and output files.
    """

    execution_group_id: Mapped[int] = mapped_column(
        ForeignKey(
            "execution_group.id",
            name="fk_execution_id",
        ),
        index=True,
    )
    """
    The execution group that this execution belongs to
    """

    dataset_hash: Mapped[str] = mapped_column(index=True)
    """
    Hash of the datasets used to calculate the diagnostic

    This is used to verify if an existing diagnostic execution has been run with the same datasets.
    """

    successful: Mapped[bool] = mapped_column(nullable=True, index=True)
    """
    Was the run successful
    """

    path: Mapped[str] = mapped_column(nullable=True)
    """
    Path to the output bundle

    Relative to the diagnostic execution result output directory
    """

    retracted: Mapped[bool] = mapped_column(default=False)
    """
    Whether the diagnostic execution result has been retracted or not

    This may happen if a dataset has been retracted, or if the diagnostic execution was incorrect.
    Rather than delete the values, they are marked as retracted.
    These data may still be visible in the UI, but should be marked as retracted.
    """

    execution_group: Mapped["ExecutionGroup"] = relationship(back_populates="executions")
    outputs: Mapped[list["ExecutionOutput"]] = relationship(back_populates="execution")
    values: Mapped[list["MetricValue"]] = relationship(back_populates="execution")

    datasets: Mapped[list[Dataset]] = relationship(secondary=execution_datasets)
    """
    The datasets used in this execution
    """

    def register_datasets(self, db: "Database", execution_dataset: ExecutionDatasetCollection) -> None:
        """
        Register the datasets used in the diagnostic calculation with the execution
        """
        for _, dataset in execution_dataset.items():
            db.session.execute(
                execution_datasets.insert(),
                [{"execution_id": self.id, "dataset_id": idx} for idx in dataset.index],
            )

    def mark_successful(self, path: pathlib.Path | str) -> None:
        """
        Mark the diagnostic execution as successful
        """
        # TODO: this needs to accept both a diagnostic and output bundle
        self.successful = True
        self.path = str(path)

    def mark_failed(self) -> None:
        """
        Mark the diagnostic execution as unsuccessful
        """
        self.successful = False

dataset_hash = mapped_column(index=True) class-attribute instance-attribute #

Hash of the datasets used to calculate the diagnostic

This is used to verify if an existing diagnostic execution has been run with the same datasets.

datasets = relationship(secondary=execution_datasets) class-attribute instance-attribute #

The datasets used in this execution

execution_group_id = mapped_column(ForeignKey('execution_group.id', name='fk_execution_id'), index=True) class-attribute instance-attribute #

The execution group that this execution belongs to

output_fragment = mapped_column() class-attribute instance-attribute #

Relative directory to store the output of the execution.

During execution this directory is relative to the temporary directory. If the diagnostic execution is successful, the executions will be moved to the final output directory and the temporary directory will be cleaned up. This directory may contain multiple input and output files.

path = mapped_column(nullable=True) class-attribute instance-attribute #

Path to the output bundle

Relative to the diagnostic execution result output directory

retracted = mapped_column(default=False) class-attribute instance-attribute #

Whether the diagnostic execution result has been retracted or not

This may happen if a dataset has been retracted, or if the diagnostic execution was incorrect. Rather than delete the values, they are marked as retracted. These data may still be visible in the UI, but should be marked as retracted.

successful = mapped_column(nullable=True, index=True) class-attribute instance-attribute #

Was the run successful

mark_failed() #

Mark the diagnostic execution as unsuccessful

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def mark_failed(self) -> None:
    """
    Mark the diagnostic execution as unsuccessful
    """
    self.successful = False

mark_successful(path) #

Mark the diagnostic execution as successful

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def mark_successful(self, path: pathlib.Path | str) -> None:
    """
    Mark the diagnostic execution as successful
    """
    # TODO: this needs to accept both a diagnostic and output bundle
    self.successful = True
    self.path = str(path)

register_datasets(db, execution_dataset) #

Register the datasets used in the diagnostic calculation with the execution

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def register_datasets(self, db: "Database", execution_dataset: ExecutionDatasetCollection) -> None:
    """
    Register the datasets used in the diagnostic calculation with the execution
    """
    for _, dataset in execution_dataset.items():
        db.session.execute(
            execution_datasets.insert(),
            [{"execution_id": self.id, "dataset_id": idx} for idx in dataset.index],
        )

ExecutionGroup #

Bases: CreatedUpdatedMixin, Base

Represents a group of executions with a shared set of input datasets.

When solving, the ExecutionGroups are derived from the available datasets, the defined diagnostics and their data requirements. From the information in the group an execution can be triggered, which is an actual run of a diagnostic calculation with a specific set of input datasets.

When the ExecutionGroup is created, it is marked dirty, meaning there are no current executions available. When an Execution was run successfully for a ExecutionGroup, the dirty mark is removed. After ingesting new data and solving again and if new versions of the input datasets are available, the ExecutionGroup will be marked dirty again.

The diagnostic_id and key form a unique identifier for ExecutionGroups.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class ExecutionGroup(CreatedUpdatedMixin, Base):
    """
    Represents a group of executions with a shared set of input datasets.

    When solving, the `ExecutionGroup`s are derived from the available datasets,
    the defined diagnostics and their data requirements. From the information in the
    group an execution can be triggered, which is an actual run of a diagnostic calculation
    with a specific set of input datasets.

    When the `ExecutionGroup` is created, it is marked dirty, meaning there are no
    current executions available. When an Execution was run successfully for a
    ExecutionGroup, the dirty mark is removed. After ingesting new data and
    solving again and if new versions of the input datasets are available, the
    ExecutionGroup will be marked dirty again.

    The diagnostic_id and key form a unique identifier for `ExecutionGroup`s.
    """

    __tablename__ = "execution_group"
    __table_args__ = (UniqueConstraint("diagnostic_id", "key", name="execution_ident"),)

    id: Mapped[int] = mapped_column(primary_key=True)

    diagnostic_id: Mapped[int] = mapped_column(ForeignKey("diagnostic.id"), index=True)
    """
    The diagnostic that this execution group belongs to
    """

    key: Mapped[str] = mapped_column(index=True)
    """
    Key for the datasets in this Execution group.
    """

    dirty: Mapped[bool] = mapped_column(default=False)
    """
    Whether the execution group should be rerun

    An execution group is dirty if the diagnostic or any of the input datasets has been
    updated since the last execution.
    """

    selectors: Mapped[dict[str, Any]] = mapped_column(default=dict)
    """
    Collection of selectors that define the group

    These selectors are the unique key, value pairs that were selected during the initial groupby
    operation.
    These are also used to define the dataset key.
    """

    diagnostic: Mapped["Diagnostic"] = relationship(back_populates="execution_groups")
    executions: Mapped[list["Execution"]] = relationship(
        back_populates="execution_group", order_by="Execution.created_at"
    )

    def should_run(self, dataset_hash: str) -> bool:
        """
        Check if the diagnostic execution group needs to be executed.

        The diagnostic execution group should be run if:

        * the execution group is marked as dirty
        * no executions have been performed ever
        * the dataset hash is different from the last run
        """
        if not self.executions:
            logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} was never executed")
            return True

        if self.executions[-1].dataset_hash != dataset_hash:
            logger.debug(
                f"Execution group {self.diagnostic.slug}/{self.key} hash mismatch:"
                f" {self.executions[-1].dataset_hash} != {dataset_hash}"
            )
            return True

        if self.dirty:
            logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} is dirty")
            return True

        return False

diagnostic_id = mapped_column(ForeignKey('diagnostic.id'), index=True) class-attribute instance-attribute #

The diagnostic that this execution group belongs to

dirty = mapped_column(default=False) class-attribute instance-attribute #

Whether the execution group should be rerun

An execution group is dirty if the diagnostic or any of the input datasets has been updated since the last execution.

key = mapped_column(index=True) class-attribute instance-attribute #

Key for the datasets in this Execution group.

selectors = mapped_column(default=dict) class-attribute instance-attribute #

Collection of selectors that define the group

These selectors are the unique key, value pairs that were selected during the initial groupby operation. These are also used to define the dataset key.

should_run(dataset_hash) #

Check if the diagnostic execution group needs to be executed.

The diagnostic execution group should be run if:

  • the execution group is marked as dirty
  • no executions have been performed ever
  • the dataset hash is different from the last run
Source code in packages/climate-ref/src/climate_ref/models/execution.py
def should_run(self, dataset_hash: str) -> bool:
    """
    Check if the diagnostic execution group needs to be executed.

    The diagnostic execution group should be run if:

    * the execution group is marked as dirty
    * no executions have been performed ever
    * the dataset hash is different from the last run
    """
    if not self.executions:
        logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} was never executed")
        return True

    if self.executions[-1].dataset_hash != dataset_hash:
        logger.debug(
            f"Execution group {self.diagnostic.slug}/{self.key} hash mismatch:"
            f" {self.executions[-1].dataset_hash} != {dataset_hash}"
        )
        return True

    if self.dirty:
        logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} is dirty")
        return True

    return False

ExecutionOutput #

Bases: DimensionMixin, CreatedUpdatedMixin, Base

An output generated as part of an execution.

This output may be a plot, data file or HTML file. These outputs are defined in the CMEC output bundle.

Outputs can be tagged with dimensions from the controlled vocabulary to enable filtering and organization.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class ExecutionOutput(DimensionMixin, CreatedUpdatedMixin, Base):
    """
    An output generated as part of an execution.

    This output may be a plot, data file or HTML file.
    These outputs are defined in the CMEC output bundle.

    Outputs can be tagged with dimensions from the controlled vocabulary
    to enable filtering and organization.
    """

    __tablename__ = "execution_output"

    _cv_dimensions: ClassVar[list[str]] = []

    id: Mapped[int] = mapped_column(primary_key=True)

    execution_id: Mapped[int] = mapped_column(ForeignKey("execution.id"), index=True)

    output_type: Mapped[ResultOutputType] = mapped_column(index=True)
    """
    Type of the output

    This will determine how the output is displayed
    """

    filename: Mapped[str] = mapped_column(nullable=True)
    """
    Path to the output

    Relative to the diagnostic execution result output directory
    """

    short_name: Mapped[str] = mapped_column(nullable=True)
    """
    Short key of the output

    This is unique for a given result and output type
    """

    long_name: Mapped[str] = mapped_column(nullable=True)
    """
    Human readable name describing the plot
    """

    description: Mapped[str] = mapped_column(nullable=True)
    """
    Long description describing the plot
    """

    execution: Mapped["Execution"] = relationship(back_populates="outputs")

    @classmethod
    def build(  # noqa: PLR0913
        cls,
        *,
        execution_id: int,
        output_type: ResultOutputType,
        dimensions: dict[str, str],
        filename: str | None = None,
        short_name: str | None = None,
        long_name: str | None = None,
        description: str | None = None,
    ) -> "ExecutionOutput":
        """
        Build an ExecutionOutput from dimensions and metadata

        This is a helper method that validates the dimensions supplied.

        Parameters
        ----------
        execution_id
            Execution that created the output
        output_type
            Type of the output
        dimensions
            Dimensions that describe the output
        filename
            Path to the output
        short_name
            Short key of the output
        long_name
            Human readable name
        description
            Long description

        Raises
        ------
        KeyError
            If an unknown dimension was supplied.

            Dimensions must exist in the controlled vocabulary.

        Returns
        -------
            Newly created ExecutionOutput
        """
        for k in dimensions:
            if k not in cls._cv_dimensions:
                raise KeyError(f"Unknown dimension column '{k}'")

        return ExecutionOutput(
            execution_id=execution_id,
            output_type=output_type,
            filename=filename,
            short_name=short_name,
            long_name=long_name,
            description=description,
            **dimensions,
        )

description = mapped_column(nullable=True) class-attribute instance-attribute #

Long description describing the plot

filename = mapped_column(nullable=True) class-attribute instance-attribute #

Path to the output

Relative to the diagnostic execution result output directory

long_name = mapped_column(nullable=True) class-attribute instance-attribute #

Human readable name describing the plot

output_type = mapped_column(index=True) class-attribute instance-attribute #

Type of the output

This will determine how the output is displayed

short_name = mapped_column(nullable=True) class-attribute instance-attribute #

Short key of the output

This is unique for a given result and output type

build(*, execution_id, output_type, dimensions, filename=None, short_name=None, long_name=None, description=None) classmethod #

Build an ExecutionOutput from dimensions and metadata

This is a helper method that validates the dimensions supplied.

Parameters:

Name Type Description Default
execution_id int

Execution that created the output

required
output_type ResultOutputType

Type of the output

required
dimensions dict[str, str]

Dimensions that describe the output

required
filename str | None

Path to the output

None
short_name str | None

Short key of the output

None
long_name str | None

Human readable name

None
description str | None

Long description

None

Raises:

Type Description
KeyError

If an unknown dimension was supplied.

Dimensions must exist in the controlled vocabulary.

Returns:

Type Description
Newly created ExecutionOutput
Source code in packages/climate-ref/src/climate_ref/models/execution.py
@classmethod
def build(  # noqa: PLR0913
    cls,
    *,
    execution_id: int,
    output_type: ResultOutputType,
    dimensions: dict[str, str],
    filename: str | None = None,
    short_name: str | None = None,
    long_name: str | None = None,
    description: str | None = None,
) -> "ExecutionOutput":
    """
    Build an ExecutionOutput from dimensions and metadata

    This is a helper method that validates the dimensions supplied.

    Parameters
    ----------
    execution_id
        Execution that created the output
    output_type
        Type of the output
    dimensions
        Dimensions that describe the output
    filename
        Path to the output
    short_name
        Short key of the output
    long_name
        Human readable name
    description
        Long description

    Raises
    ------
    KeyError
        If an unknown dimension was supplied.

        Dimensions must exist in the controlled vocabulary.

    Returns
    -------
        Newly created ExecutionOutput
    """
    for k in dimensions:
        if k not in cls._cv_dimensions:
            raise KeyError(f"Unknown dimension column '{k}'")

    return ExecutionOutput(
        execution_id=execution_id,
        output_type=output_type,
        filename=filename,
        short_name=short_name,
        long_name=long_name,
        description=description,
        **dimensions,
    )

MetricValue #

Bases: DimensionMixin, CreatedUpdatedMixin, Base

Represents a single metric value

This is a base class for different types of metric values (e.g. scalar, series) which are stored in a single table using single table inheritance.

This value has a number of dimensions which are used to query the diagnostic values. These dimensions describe aspects such as the type of statistic being measured, the region of interest or the model from which the statistic is being measured.

The columns in this table are not known statically because the REF can track an arbitrary set of dimensions depending on the controlled vocabulary that will be used. A call to register_cv_dimensions must be made before using this class.

Source code in packages/climate-ref/src/climate_ref/models/metric_value.py
class MetricValue(DimensionMixin, CreatedUpdatedMixin, Base):
    """
    Represents a single metric value

    This is a base class for different types of metric values (e.g. scalar, series) which
    are stored in a single table using single table inheritance.

    This value has a number of dimensions which are used to query the diagnostic values.
    These dimensions describe aspects such as the type of statistic being measured,
    the region of interest or the model from which the statistic is being measured.

    The columns in this table are not known statically because the REF can track an arbitrary
    set of dimensions depending on the controlled vocabulary that will be used.
    A call to `register_cv_dimensions` must be made before using this class.
    """

    __tablename__ = "metric_value"

    __mapper_args__: ClassVar[Mapping[str, str]] = {  # type: ignore
        "polymorphic_on": "type",
    }

    _cv_dimensions: ClassVar[list[str]] = []

    id: Mapped[int] = mapped_column(primary_key=True)
    execution_id: Mapped[int] = mapped_column(ForeignKey("execution.id"), index=True)

    attributes: Mapped[dict[str, Any]] = mapped_column()

    execution: Mapped["Execution"] = relationship(back_populates="values")

    type: Mapped[MetricValueType] = mapped_column(index=True)
    """
    Type of metric value

    This value is used to determine how the metric value should be interpreted.
    """

    def __repr__(self) -> str:
        return f"<MetricValue id={self.id} execution={self.execution} dimensions={self.dimensions}>"

type = mapped_column(index=True) class-attribute instance-attribute #

Type of metric value

This value is used to determine how the metric value should be interpreted.

Provider #

Bases: CreatedUpdatedMixin, Base

Represents a provider that can provide diagnostic calculations

Source code in packages/climate-ref/src/climate_ref/models/provider.py
class Provider(CreatedUpdatedMixin, Base):
    """
    Represents a provider that can provide diagnostic calculations
    """

    __tablename__ = "provider"

    id: Mapped[int] = mapped_column(primary_key=True)
    slug: Mapped[str] = mapped_column(unique=True)
    """
    Globally unique identifier for the provider.
    """

    name: Mapped[str] = mapped_column()
    """
    Long name of the provider
    """

    version: Mapped[str] = mapped_column(nullable=False)
    """
    Version of the provider.

    This should map to the package version.
    """

    diagnostics: Mapped[list["Diagnostic"]] = relationship(back_populates="provider")

    def __repr__(self) -> str:
        return f"<Provider slug={self.slug} version={self.version}>"

name = mapped_column() class-attribute instance-attribute #

Long name of the provider

slug = mapped_column(unique=True) class-attribute instance-attribute #

Globally unique identifier for the provider.

version = mapped_column(nullable=False) class-attribute instance-attribute #

Version of the provider.

This should map to the package version.

ScalarMetricValue #

Bases: MetricValue

A scalar value with an associated dimensions

This is a subclass of MetricValue that is used to represent a scalar value.

Source code in packages/climate-ref/src/climate_ref/models/metric_value.py
class ScalarMetricValue(MetricValue):
    """
    A scalar value with an associated dimensions

    This is a subclass of MetricValue that is used to represent a scalar value.
    """

    __mapper_args__: ClassVar[Mapping[str, Any]] = {  # type: ignore
        "polymorphic_identity": MetricValueType.SCALAR,
    }

    # This is a scalar value
    value: Mapped[float] = mapped_column(nullable=True)

    def __repr__(self) -> str:
        return (
            f"<ScalarMetricValue "
            f"id={self.id} execution={self.execution} dimensions={self.dimensions} value={self.value}>"
        )

    @classmethod
    def build(
        cls,
        *,
        execution_id: int,
        value: float,
        dimensions: dict[str, str],
        attributes: dict[str, Any] | None,
    ) -> "MetricValue":
        """
        Build a MetricValue from a collection of dimensions and a value

        This is a helper method that validates the dimensions supplied and provides an interface
        similar to [climate_ref_core.metric_values.ScalarMetricValue][].

        Parameters
        ----------
        execution_id
            Execution that created the diagnostic value
        value
            The value of the diagnostic
        dimensions
            Dimensions that describe the diagnostic execution result
        attributes
            Optional additional attributes to describe the value,
            but are not in the controlled vocabulary.

        Raises
        ------
        KeyError
            If an unknown dimension was supplied.

            Dimensions must exist in the controlled vocabulary.

        Returns
        -------
            Newly created MetricValue
        """
        for k in dimensions:
            if k not in cls._cv_dimensions:
                raise KeyError(f"Unknown dimension column '{k}'")

        return ScalarMetricValue(
            execution_id=execution_id,
            value=value,
            attributes=attributes,
            **dimensions,
        )

build(*, execution_id, value, dimensions, attributes) classmethod #

Build a MetricValue from a collection of dimensions and a value

This is a helper method that validates the dimensions supplied and provides an interface similar to climate_ref_core.metric_values.ScalarMetricValue.

Parameters:

Name Type Description Default
execution_id int

Execution that created the diagnostic value

required
value float

The value of the diagnostic

required
dimensions dict[str, str]

Dimensions that describe the diagnostic execution result

required
attributes dict[str, Any] | None

Optional additional attributes to describe the value, but are not in the controlled vocabulary.

required

Raises:

Type Description
KeyError

If an unknown dimension was supplied.

Dimensions must exist in the controlled vocabulary.

Returns:

Type Description
Newly created MetricValue
Source code in packages/climate-ref/src/climate_ref/models/metric_value.py
@classmethod
def build(
    cls,
    *,
    execution_id: int,
    value: float,
    dimensions: dict[str, str],
    attributes: dict[str, Any] | None,
) -> "MetricValue":
    """
    Build a MetricValue from a collection of dimensions and a value

    This is a helper method that validates the dimensions supplied and provides an interface
    similar to [climate_ref_core.metric_values.ScalarMetricValue][].

    Parameters
    ----------
    execution_id
        Execution that created the diagnostic value
    value
        The value of the diagnostic
    dimensions
        Dimensions that describe the diagnostic execution result
    attributes
        Optional additional attributes to describe the value,
        but are not in the controlled vocabulary.

    Raises
    ------
    KeyError
        If an unknown dimension was supplied.

        Dimensions must exist in the controlled vocabulary.

    Returns
    -------
        Newly created MetricValue
    """
    for k in dimensions:
        if k not in cls._cv_dimensions:
            raise KeyError(f"Unknown dimension column '{k}'")

    return ScalarMetricValue(
        execution_id=execution_id,
        value=value,
        attributes=attributes,
        **dimensions,
    )

SeriesMetricValue #

Bases: MetricValue

A 1d series with associated dimensions

This is a subclass of MetricValue that is used to represent a series. This can be used to represent time series, vertical profiles or other 1d data.

Source code in packages/climate-ref/src/climate_ref/models/metric_value.py
class SeriesMetricValue(MetricValue):
    """
    A 1d series with associated dimensions

    This is a subclass of MetricValue that is used to represent a series.
    This can be used to represent time series, vertical profiles or other 1d data.
    """

    __mapper_args__: ClassVar[Mapping[str, Any]] = {  # type: ignore
        "polymorphic_identity": MetricValueType.SERIES,
    }

    # This is a scalar value
    values: Mapped[list[float | int]] = mapped_column(nullable=True)
    index: Mapped[list[float | int | str]] = mapped_column(nullable=True)
    index_name: Mapped[str] = mapped_column(nullable=True)

    def __repr__(self) -> str:
        return (
            f"<SeriesMetricValue id={self.id} execution={self.execution} "
            f"dimensions={self.dimensions} index_name={self.index_name}>"
        )

    @classmethod
    def build(  # noqa: PLR0913
        cls,
        *,
        execution_id: int,
        values: list[float | int],
        index: list[float | int | str],
        index_name: str,
        dimensions: dict[str, str],
        attributes: dict[str, Any] | None,
    ) -> "MetricValue":
        """
        Build a database object from a series

        Parameters
        ----------
        execution_id
            Execution that created the diagnostic value
        values
            1-d array of values
        index
            1-d array of index values
        index_name
            Name of the index. Used for presentation purposes
        dimensions
            Dimensions that describe the diagnostic execution result
        attributes
            Optional additional attributes to describe the value,
            but are not in the controlled vocabulary.

        Raises
        ------
        KeyError
            If an unknown dimension was supplied.

            Dimensions must exist in the controlled vocabulary.
        ValueError
            If the length of values and index do not match.

        Returns
        -------
            Newly created MetricValue
        """
        for k in dimensions:
            if k not in cls._cv_dimensions:
                raise KeyError(f"Unknown dimension column '{k}'")

        if len(values) != len(index):
            raise ValueError(f"Index length ({len(index)}) must match values length ({len(values)})")

        return SeriesMetricValue(
            execution_id=execution_id,
            values=values,
            index=index,
            index_name=index_name,
            attributes=attributes,
            **dimensions,
        )

build(*, execution_id, values, index, index_name, dimensions, attributes) classmethod #

Build a database object from a series

Parameters:

Name Type Description Default
execution_id int

Execution that created the diagnostic value

required
values list[float | int]

1-d array of values

required
index list[float | int | str]

1-d array of index values

required
index_name str

Name of the index. Used for presentation purposes

required
dimensions dict[str, str]

Dimensions that describe the diagnostic execution result

required
attributes dict[str, Any] | None

Optional additional attributes to describe the value, but are not in the controlled vocabulary.

required

Raises:

Type Description
KeyError

If an unknown dimension was supplied.

Dimensions must exist in the controlled vocabulary.

ValueError

If the length of values and index do not match.

Returns:

Type Description
Newly created MetricValue
Source code in packages/climate-ref/src/climate_ref/models/metric_value.py
@classmethod
def build(  # noqa: PLR0913
    cls,
    *,
    execution_id: int,
    values: list[float | int],
    index: list[float | int | str],
    index_name: str,
    dimensions: dict[str, str],
    attributes: dict[str, Any] | None,
) -> "MetricValue":
    """
    Build a database object from a series

    Parameters
    ----------
    execution_id
        Execution that created the diagnostic value
    values
        1-d array of values
    index
        1-d array of index values
    index_name
        Name of the index. Used for presentation purposes
    dimensions
        Dimensions that describe the diagnostic execution result
    attributes
        Optional additional attributes to describe the value,
        but are not in the controlled vocabulary.

    Raises
    ------
    KeyError
        If an unknown dimension was supplied.

        Dimensions must exist in the controlled vocabulary.
    ValueError
        If the length of values and index do not match.

    Returns
    -------
        Newly created MetricValue
    """
    for k in dimensions:
        if k not in cls._cv_dimensions:
            raise KeyError(f"Unknown dimension column '{k}'")

    if len(values) != len(index):
        raise ValueError(f"Index length ({len(index)}) must match values length ({len(values)})")

    return SeriesMetricValue(
        execution_id=execution_id,
        values=values,
        index=index,
        index_name=index_name,
        attributes=attributes,
        **dimensions,
    )

sub-packages#

Sub-package Description
base
dataset
diagnostic
execution
metric_value
mixins Model mixins for shared functionality
provider