Skip to content

climate_ref.models.execution #

Execution #

Bases: CreatedUpdatedMixin, Base

Represents a single execution of a diagnostic

Each result is part of a group of executions that share similar input datasets.

An execution group might be run multiple times as new data becomes available, each run will create a Execution.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class Execution(CreatedUpdatedMixin, Base):
    """
    Represents a single execution of a diagnostic

    Each result is part of a group of executions that share similar input datasets.

    An execution group might be run multiple times as new data becomes available,
    each run will create a `Execution`.
    """

    __tablename__ = "execution"

    id: Mapped[int] = mapped_column(primary_key=True)

    output_fragment: Mapped[str] = mapped_column()
    """
    Relative directory to store the output of the execution.

    During execution this directory is relative to the temporary directory.
    If the diagnostic execution is successful, the executions will be moved to the final output directory
    and the temporary directory will be cleaned up.
    This directory may contain multiple input and output files.
    """

    execution_group_id: Mapped[int] = mapped_column(
        ForeignKey(
            "execution_group.id",
            name="fk_execution_id",
        ),
        index=True,
    )
    """
    The execution group that this execution belongs to
    """

    dataset_hash: Mapped[str] = mapped_column(index=True)
    """
    Hash of the datasets used to calculate the diagnostic

    This is used to verify if an existing diagnostic execution has been run with the same datasets.
    """

    successful: Mapped[bool] = mapped_column(nullable=True, index=True)
    """
    Was the run successful
    """

    path: Mapped[str] = mapped_column(nullable=True)
    """
    Path to the output bundle

    Relative to the diagnostic execution result output directory
    """

    retracted: Mapped[bool] = mapped_column(default=False)
    """
    Whether the diagnostic execution result has been retracted or not

    This may happen if a dataset has been retracted, or if the diagnostic execution was incorrect.
    Rather than delete the values, they are marked as retracted.
    These data may still be visible in the UI, but should be marked as retracted.
    """

    execution_group: Mapped["ExecutionGroup"] = relationship(back_populates="executions")
    outputs: Mapped[list["ExecutionOutput"]] = relationship(back_populates="execution")
    values: Mapped[list["MetricValue"]] = relationship(back_populates="execution")

    datasets: Mapped[list[Dataset]] = relationship(secondary=execution_datasets)
    """
    The datasets used in this execution
    """

    def register_datasets(self, db: "Database", execution_dataset: ExecutionDatasetCollection) -> None:
        """
        Register the datasets used in the diagnostic calculation with the execution
        """
        for _, dataset in execution_dataset.items():
            db.session.execute(
                execution_datasets.insert(),
                [{"execution_id": self.id, "dataset_id": idx} for idx in dataset.index],
            )

    def mark_successful(self, path: pathlib.Path | str) -> None:
        """
        Mark the diagnostic execution as successful
        """
        # TODO: this needs to accept both a diagnostic and output bundle
        self.successful = True
        self.path = str(path)

    def mark_failed(self) -> None:
        """
        Mark the diagnostic execution as unsuccessful
        """
        self.successful = False

dataset_hash = mapped_column(index=True) class-attribute instance-attribute #

Hash of the datasets used to calculate the diagnostic

This is used to verify if an existing diagnostic execution has been run with the same datasets.

datasets = relationship(secondary=execution_datasets) class-attribute instance-attribute #

The datasets used in this execution

execution_group_id = mapped_column(ForeignKey('execution_group.id', name='fk_execution_id'), index=True) class-attribute instance-attribute #

The execution group that this execution belongs to

output_fragment = mapped_column() class-attribute instance-attribute #

Relative directory to store the output of the execution.

During execution this directory is relative to the temporary directory. If the diagnostic execution is successful, the executions will be moved to the final output directory and the temporary directory will be cleaned up. This directory may contain multiple input and output files.

path = mapped_column(nullable=True) class-attribute instance-attribute #

Path to the output bundle

Relative to the diagnostic execution result output directory

retracted = mapped_column(default=False) class-attribute instance-attribute #

Whether the diagnostic execution result has been retracted or not

This may happen if a dataset has been retracted, or if the diagnostic execution was incorrect. Rather than delete the values, they are marked as retracted. These data may still be visible in the UI, but should be marked as retracted.

successful = mapped_column(nullable=True, index=True) class-attribute instance-attribute #

Was the run successful

mark_failed() #

Mark the diagnostic execution as unsuccessful

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def mark_failed(self) -> None:
    """
    Mark the diagnostic execution as unsuccessful
    """
    self.successful = False

mark_successful(path) #

Mark the diagnostic execution as successful

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def mark_successful(self, path: pathlib.Path | str) -> None:
    """
    Mark the diagnostic execution as successful
    """
    # TODO: this needs to accept both a diagnostic and output bundle
    self.successful = True
    self.path = str(path)

register_datasets(db, execution_dataset) #

Register the datasets used in the diagnostic calculation with the execution

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def register_datasets(self, db: "Database", execution_dataset: ExecutionDatasetCollection) -> None:
    """
    Register the datasets used in the diagnostic calculation with the execution
    """
    for _, dataset in execution_dataset.items():
        db.session.execute(
            execution_datasets.insert(),
            [{"execution_id": self.id, "dataset_id": idx} for idx in dataset.index],
        )

ExecutionGroup #

Bases: CreatedUpdatedMixin, Base

Represents a group of executions with a shared set of input datasets.

When solving, the ExecutionGroups are derived from the available datasets, the defined diagnostics and their data requirements. From the information in the group an execution can be triggered, which is an actual run of a diagnostic calculation with a specific set of input datasets.

When the ExecutionGroup is created, it is marked dirty, meaning there are no current executions available. When an Execution was run successfully for a ExecutionGroup, the dirty mark is removed. After ingesting new data and solving again and if new versions of the input datasets are available, the ExecutionGroup will be marked dirty again.

The diagnostic_id and key form a unique identifier for ExecutionGroups.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class ExecutionGroup(CreatedUpdatedMixin, Base):
    """
    Represents a group of executions with a shared set of input datasets.

    When solving, the `ExecutionGroup`s are derived from the available datasets,
    the defined diagnostics and their data requirements. From the information in the
    group an execution can be triggered, which is an actual run of a diagnostic calculation
    with a specific set of input datasets.

    When the `ExecutionGroup` is created, it is marked dirty, meaning there are no
    current executions available. When an Execution was run successfully for a
    ExecutionGroup, the dirty mark is removed. After ingesting new data and
    solving again and if new versions of the input datasets are available, the
    ExecutionGroup will be marked dirty again.

    The diagnostic_id and key form a unique identifier for `ExecutionGroup`s.
    """

    __tablename__ = "execution_group"
    __table_args__ = (UniqueConstraint("diagnostic_id", "key", name="execution_ident"),)

    id: Mapped[int] = mapped_column(primary_key=True)

    diagnostic_id: Mapped[int] = mapped_column(ForeignKey("diagnostic.id"), index=True)
    """
    The diagnostic that this execution group belongs to
    """

    key: Mapped[str] = mapped_column(index=True)
    """
    Key for the datasets in this Execution group.
    """

    dirty: Mapped[bool] = mapped_column(default=False)
    """
    Whether the execution group should be rerun

    An execution group is dirty if the diagnostic or any of the input datasets has been
    updated since the last execution.
    """

    selectors: Mapped[dict[str, Any]] = mapped_column(default=dict)
    """
    Collection of selectors that define the group

    These selectors are the unique key, value pairs that were selected during the initial groupby
    operation.
    These are also used to define the dataset key.
    """

    diagnostic: Mapped["Diagnostic"] = relationship(back_populates="execution_groups")
    executions: Mapped[list["Execution"]] = relationship(
        back_populates="execution_group", order_by="Execution.created_at"
    )

    def should_run(self, dataset_hash: str) -> bool:
        """
        Check if the diagnostic execution group needs to be executed.

        The diagnostic execution group should be run if:

        * the execution group is marked as dirty
        * no executions have been performed ever
        * the dataset hash is different from the last run
        """
        if not self.executions:
            logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} was never executed")
            return True

        if self.executions[-1].dataset_hash != dataset_hash:
            logger.debug(
                f"Execution group {self.diagnostic.slug}/{self.key} hash mismatch:"
                f" {self.executions[-1].dataset_hash} != {dataset_hash}"
            )
            return True

        if self.dirty:
            logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} is dirty")
            return True

        return False

diagnostic_id = mapped_column(ForeignKey('diagnostic.id'), index=True) class-attribute instance-attribute #

The diagnostic that this execution group belongs to

dirty = mapped_column(default=False) class-attribute instance-attribute #

Whether the execution group should be rerun

An execution group is dirty if the diagnostic or any of the input datasets has been updated since the last execution.

key = mapped_column(index=True) class-attribute instance-attribute #

Key for the datasets in this Execution group.

selectors = mapped_column(default=dict) class-attribute instance-attribute #

Collection of selectors that define the group

These selectors are the unique key, value pairs that were selected during the initial groupby operation. These are also used to define the dataset key.

should_run(dataset_hash) #

Check if the diagnostic execution group needs to be executed.

The diagnostic execution group should be run if:

  • the execution group is marked as dirty
  • no executions have been performed ever
  • the dataset hash is different from the last run
Source code in packages/climate-ref/src/climate_ref/models/execution.py
def should_run(self, dataset_hash: str) -> bool:
    """
    Check if the diagnostic execution group needs to be executed.

    The diagnostic execution group should be run if:

    * the execution group is marked as dirty
    * no executions have been performed ever
    * the dataset hash is different from the last run
    """
    if not self.executions:
        logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} was never executed")
        return True

    if self.executions[-1].dataset_hash != dataset_hash:
        logger.debug(
            f"Execution group {self.diagnostic.slug}/{self.key} hash mismatch:"
            f" {self.executions[-1].dataset_hash} != {dataset_hash}"
        )
        return True

    if self.dirty:
        logger.debug(f"Execution group {self.diagnostic.slug}/{self.key} is dirty")
        return True

    return False

ExecutionOutput #

Bases: DimensionMixin, CreatedUpdatedMixin, Base

An output generated as part of an execution.

This output may be a plot, data file or HTML file. These outputs are defined in the CMEC output bundle.

Outputs can be tagged with dimensions from the controlled vocabulary to enable filtering and organization.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class ExecutionOutput(DimensionMixin, CreatedUpdatedMixin, Base):
    """
    An output generated as part of an execution.

    This output may be a plot, data file or HTML file.
    These outputs are defined in the CMEC output bundle.

    Outputs can be tagged with dimensions from the controlled vocabulary
    to enable filtering and organization.
    """

    __tablename__ = "execution_output"

    _cv_dimensions: ClassVar[list[str]] = []

    id: Mapped[int] = mapped_column(primary_key=True)

    execution_id: Mapped[int] = mapped_column(ForeignKey("execution.id"), index=True)

    output_type: Mapped[ResultOutputType] = mapped_column(index=True)
    """
    Type of the output

    This will determine how the output is displayed
    """

    filename: Mapped[str] = mapped_column(nullable=True)
    """
    Path to the output

    Relative to the diagnostic execution result output directory
    """

    short_name: Mapped[str] = mapped_column(nullable=True)
    """
    Short key of the output

    This is unique for a given result and output type
    """

    long_name: Mapped[str] = mapped_column(nullable=True)
    """
    Human readable name describing the plot
    """

    description: Mapped[str] = mapped_column(nullable=True)
    """
    Long description describing the plot
    """

    execution: Mapped["Execution"] = relationship(back_populates="outputs")

    @classmethod
    def build(  # noqa: PLR0913
        cls,
        *,
        execution_id: int,
        output_type: ResultOutputType,
        dimensions: dict[str, str],
        filename: str | None = None,
        short_name: str | None = None,
        long_name: str | None = None,
        description: str | None = None,
    ) -> "ExecutionOutput":
        """
        Build an ExecutionOutput from dimensions and metadata

        This is a helper method that validates the dimensions supplied.

        Parameters
        ----------
        execution_id
            Execution that created the output
        output_type
            Type of the output
        dimensions
            Dimensions that describe the output
        filename
            Path to the output
        short_name
            Short key of the output
        long_name
            Human readable name
        description
            Long description

        Raises
        ------
        KeyError
            If an unknown dimension was supplied.

            Dimensions must exist in the controlled vocabulary.

        Returns
        -------
            Newly created ExecutionOutput
        """
        for k in dimensions:
            if k not in cls._cv_dimensions:
                raise KeyError(f"Unknown dimension column '{k}'")

        return ExecutionOutput(
            execution_id=execution_id,
            output_type=output_type,
            filename=filename,
            short_name=short_name,
            long_name=long_name,
            description=description,
            **dimensions,
        )

description = mapped_column(nullable=True) class-attribute instance-attribute #

Long description describing the plot

filename = mapped_column(nullable=True) class-attribute instance-attribute #

Path to the output

Relative to the diagnostic execution result output directory

long_name = mapped_column(nullable=True) class-attribute instance-attribute #

Human readable name describing the plot

output_type = mapped_column(index=True) class-attribute instance-attribute #

Type of the output

This will determine how the output is displayed

short_name = mapped_column(nullable=True) class-attribute instance-attribute #

Short key of the output

This is unique for a given result and output type

build(*, execution_id, output_type, dimensions, filename=None, short_name=None, long_name=None, description=None) classmethod #

Build an ExecutionOutput from dimensions and metadata

This is a helper method that validates the dimensions supplied.

Parameters:

Name Type Description Default
execution_id int

Execution that created the output

required
output_type ResultOutputType

Type of the output

required
dimensions dict[str, str]

Dimensions that describe the output

required
filename str | None

Path to the output

None
short_name str | None

Short key of the output

None
long_name str | None

Human readable name

None
description str | None

Long description

None

Raises:

Type Description
KeyError

If an unknown dimension was supplied.

Dimensions must exist in the controlled vocabulary.

Returns:

Type Description
Newly created ExecutionOutput
Source code in packages/climate-ref/src/climate_ref/models/execution.py
@classmethod
def build(  # noqa: PLR0913
    cls,
    *,
    execution_id: int,
    output_type: ResultOutputType,
    dimensions: dict[str, str],
    filename: str | None = None,
    short_name: str | None = None,
    long_name: str | None = None,
    description: str | None = None,
) -> "ExecutionOutput":
    """
    Build an ExecutionOutput from dimensions and metadata

    This is a helper method that validates the dimensions supplied.

    Parameters
    ----------
    execution_id
        Execution that created the output
    output_type
        Type of the output
    dimensions
        Dimensions that describe the output
    filename
        Path to the output
    short_name
        Short key of the output
    long_name
        Human readable name
    description
        Long description

    Raises
    ------
    KeyError
        If an unknown dimension was supplied.

        Dimensions must exist in the controlled vocabulary.

    Returns
    -------
        Newly created ExecutionOutput
    """
    for k in dimensions:
        if k not in cls._cv_dimensions:
            raise KeyError(f"Unknown dimension column '{k}'")

    return ExecutionOutput(
        execution_id=execution_id,
        output_type=output_type,
        filename=filename,
        short_name=short_name,
        long_name=long_name,
        description=description,
        **dimensions,
    )

ResultOutputType #

Bases: Enum

Types of supported outputs

These map to the categories of output in the CMEC output bundle

Source code in packages/climate-ref/src/climate_ref/models/execution.py
class ResultOutputType(enum.Enum):
    """
    Types of supported outputs

    These map to the categories of output in the CMEC output bundle
    """

    Plot = "plot"
    Data = "data"
    HTML = "html"

get_execution_group_and_latest(session) #

Query to get the most recent result for each execution group

Parameters:

Name Type Description Default
session Session

The database session to use for the query.

required

Returns:

Type Description
Query to get the most recent result for each execution group.

The result is a tuple of the execution group and the most recent result, which can be None.

Source code in packages/climate-ref/src/climate_ref/models/execution.py
def get_execution_group_and_latest(
    session: Session,
) -> RowReturningQuery[tuple[ExecutionGroup, Execution | None]]:
    """
    Query to get the most recent result for each execution group

    Parameters
    ----------
    session
        The database session to use for the query.

    Returns
    -------
        Query to get the most recent result for each execution group.
        The result is a tuple of the execution group and the most recent result,
        which can be None.
    """
    # Find the most recent result for each execution group
    # This uses an aggregate function because it is more efficient than order by
    subquery = (
        session.query(
            Execution.execution_group_id,
            func.max(Execution.created_at).label("latest_created_at"),
        )
        .group_by(Execution.execution_group_id)
        .subquery()
    )

    # Join the diagnostic execution with the latest result
    query = (
        session.query(ExecutionGroup, Execution)
        .outerjoin(subquery, ExecutionGroup.id == subquery.c.execution_group_id)
        .outerjoin(
            Execution,
            (Execution.execution_group_id == ExecutionGroup.id)
            & (Execution.created_at == subquery.c.latest_created_at),
        )
    )

    return query  # type: ignore

get_execution_group_and_latest_filtered(session, diagnostic_filters=None, provider_filters=None, facet_filters=None, dirty=None, successful=None) #

Query execution groups with filtering capabilities.

Parameters:

Name Type Description Default
session Session

Database session

required
diagnostic_filters list[str] | None

List of diagnostic slug substrings (OR logic, case-insensitive)

None
provider_filters list[str] | None

List of provider slug substrings (OR logic, case-insensitive)

None
facet_filters dict[str, str] | None

Dictionary of facet key-value pairs (AND logic, exact match)

None
dirty bool | None

If True, only return dirty execution groups. If False, only return clean execution groups. If None, do not filter by dirty status.

None
successful bool | None

If True, only return execution groups whose latest execution was successful. If False, only return execution groups whose latest execution was unsuccessful or has no executions. If None, do not filter by execution success.

None

Returns:

Type Description
Query returning tuples of (ExecutionGroup, latest Execution or None)
Notes
  • Diagnostic and provider filters use substring matching (case-insensitive)
  • Multiple values within same filter type use OR logic
  • Different filter types use AND logic
  • Facet filters can either be key=value (searches all dataset types) or dataset_type.key=value (searches specific dataset type)
Source code in packages/climate-ref/src/climate_ref/models/execution.py
def get_execution_group_and_latest_filtered(  # noqa: PLR0913
    session: Session,
    diagnostic_filters: list[str] | None = None,
    provider_filters: list[str] | None = None,
    facet_filters: dict[str, str] | None = None,
    dirty: bool | None = None,
    successful: bool | None = None,
) -> list[tuple[ExecutionGroup, Execution | None]]:
    """
    Query execution groups with filtering capabilities.

    Parameters
    ----------
    session
        Database session
    diagnostic_filters
        List of diagnostic slug substrings (OR logic, case-insensitive)
    provider_filters
        List of provider slug substrings (OR logic, case-insensitive)
    facet_filters
        Dictionary of facet key-value pairs (AND logic, exact match)
    dirty
        If True, only return dirty execution groups.
        If False, only return clean execution groups.
        If None, do not filter by dirty status.
    successful
        If True, only return execution groups whose latest execution was successful.
        If False, only return execution groups whose latest execution was unsuccessful or has no executions.
        If None, do not filter by execution success.

    Returns
    -------
        Query returning tuples of (ExecutionGroup, latest Execution or None)

    Notes
    -----
    - Diagnostic and provider filters use substring matching (case-insensitive)
    - Multiple values within same filter type use OR logic
    - Different filter types use AND logic
    - Facet filters can either be key=value (searches all dataset types)
      or dataset_type.key=value (searches specific dataset type)
    """
    # Start with base query
    query = get_execution_group_and_latest(session)

    if diagnostic_filters or provider_filters:
        # Join through to the Diagnostic table
        query = query.join(Diagnostic, ExecutionGroup.diagnostic_id == Diagnostic.id)

    # Apply diagnostic filter (OR logic for multiple values)
    if diagnostic_filters:
        diagnostic_conditions = [
            Diagnostic.slug.ilike(f"%{filter_value.lower()}%") for filter_value in diagnostic_filters
        ]
        query = query.filter(or_(*diagnostic_conditions))

    # Apply provider filter (OR logic for multiple values)
    if provider_filters:
        # Need to join through Diagnostic to Provider
        query = query.join(Provider, Diagnostic.provider_id == Provider.id)

        provider_conditions = [
            Provider.slug.ilike(f"%{filter_value.lower()}%") for filter_value in provider_filters
        ]
        query = query.filter(or_(*provider_conditions))

    if successful is not None:
        if successful:
            query = query.filter(Execution.successful.is_(True))
        else:
            query = query.filter(or_(Execution.successful.is_(False), Execution.successful.is_(None)))

    if dirty is not None:
        if dirty:
            query = query.filter(ExecutionGroup.dirty.is_(True))
        else:
            query = query.filter(or_(ExecutionGroup.dirty.is_(False), ExecutionGroup.dirty.is_(None)))

    if facet_filters:
        # Load all results into memory for Python-based filtering
        # TODO: Update once we have normalised the selector
        results = [r._tuple() for r in query.all()]
        return _filter_executions_by_facets(results, facet_filters)
    else:
        return [r._tuple() for r in query.all()]