Skip to content

climate_ref.datasets.base #

DatasetAdapter #

Bases: Protocol

An adapter to provide a common interface for different dataset types

This allows the same code to work with different dataset types.

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
class DatasetAdapter(Protocol):
    """
    An adapter to provide a common interface for different dataset types

    This allows the same code to work with different dataset types.
    """

    dataset_cls: type[Dataset]
    slug_column: str
    """
    The column in the data catalog that contains the dataset slug.
    The dataset slug is a unique identifier for the dataset that includes the version of the dataset.
    This can be used to group files together that belong to the same dataset.
    """
    dataset_specific_metadata: tuple[str, ...]
    file_specific_metadata: tuple[str, ...] = ()

    version_metadata: str = "version"
    """
    The column in the data catalog that contains the version of the dataset.
    """
    dataset_id_metadata: tuple[str, ...] = ()
    """
    The group of metadata columns that are specific to the dataset excluding the version information.

    Each unique dataset should have the same values for these columns.

    This is generally the columns that describe the `slug` of a dataset,
    excluding the version information.
    """

    def pretty_subset(self, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Get a subset of the data_catalog to pretty print

        Parameters
        ----------
        data_catalog
            Data catalog to subset

        Returns
        -------
        :
            Subset of the data catalog to pretty print

        """
        return data_catalog[
            [
                *self.dataset_id_metadata,
                self.version_metadata,
            ]
        ]

    def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
        """
        Generate a data catalog from the specified file or directory

        This data catalog should contain all the metadata needed by the database.
        The index of the data catalog should be the dataset slug.
        """
        ...

    def validate_data_catalog(self, data_catalog: pd.DataFrame, skip_invalid: bool = False) -> pd.DataFrame:
        """
        Validate a data catalog

        Parameters
        ----------
        data_catalog
            Data catalog to validate
        skip_invalid
            If True, ignore datasets with invalid metadata and remove them from the resulting data catalog.

        Raises
        ------
        ValueError
            If `skip_invalid` is False (default) and the data catalog contains validation errors.

        Returns
        -------
        :
            Validated data catalog
        """
        # Check if the data catalog contains the required columns
        missing_columns = set(self.dataset_specific_metadata + self.file_specific_metadata) - set(
            data_catalog.columns
        )
        if missing_columns:
            raise ValueError(f"Data catalog is missing required columns: {missing_columns}")

        # Verify that the dataset specific columns don't vary by dataset by counting the unique values
        # for each dataset and checking if there are any that have more than one unique value.
        unique_metadata = (
            data_catalog[list(self.dataset_specific_metadata)].groupby(self.slug_column).nunique()
        )
        if unique_metadata.gt(1).any(axis=1).any():
            _log_duplicate_metadata(data_catalog, unique_metadata, self.slug_column)

            if skip_invalid:
                data_catalog = data_catalog[
                    ~data_catalog[self.slug_column].isin(
                        unique_metadata[unique_metadata.gt(1).any(axis=1)].index
                    )
                ]
            else:
                raise ValueError("Dataset specific metadata varies by dataset")

        return data_catalog

    def register_dataset(  # noqa: PLR0915
        self, config: Config, db: Database, data_catalog_dataset: pd.DataFrame
    ) -> DatasetRegistrationResult:
        """
        Register a dataset in the database using the data catalog

        Parameters
        ----------
        config
            Configuration object
        db
            Database instance
        data_catalog_dataset
            A subset of the data catalog containing the metadata for a single dataset

        Returns
        -------
        :
            Registration result with dataset and file change information
        """
        DatasetModel = self.dataset_cls

        self.validate_data_catalog(data_catalog_dataset)
        unique_slugs = data_catalog_dataset[self.slug_column].unique()
        if len(unique_slugs) != 1:
            raise RefException(f"Found multiple datasets in the same directory: {unique_slugs}")
        slug = unique_slugs[0]

        # Upsert the dataset (create a new dataset or update the metadata)
        dataset_metadata = data_catalog_dataset[list(self.dataset_specific_metadata)].iloc[0].to_dict()
        dataset, dataset_state = db.update_or_create(DatasetModel, defaults=dataset_metadata, slug=slug)
        if dataset_state == ModelState.CREATED:
            logger.info(f"Created new dataset: {dataset}")
        elif dataset_state == ModelState.UPDATED:
            logger.info(f"Updating existing dataset: {dataset}")
        db.session.flush()

        # Initialize result tracking
        files_added = []
        files_updated = []
        files_removed = []
        files_unchanged = []

        # Get current files for this dataset
        current_files = db.session.query(DatasetFile).filter_by(dataset_id=dataset.id).all()
        current_file_paths = {f.path: f for f in current_files}

        # Get new file data from data catalog
        new_file_data = data_catalog_dataset.to_dict(orient="records")
        new_file_lookup = {}
        for dataset_file in new_file_data:
            file_path = str(validate_path(dataset_file["path"]))
            new_file_lookup[file_path] = {
                "start_time": dataset_file["start_time"],
                "end_time": dataset_file["end_time"],
            }

        new_file_paths = set(new_file_lookup.keys())
        existing_file_paths = set(current_file_paths.keys())

        # TODO: support removing files that are no longer present
        # We want to keep a record of the dataset if it was used by a diagnostic in the past
        files_to_remove = existing_file_paths - new_file_paths
        if files_to_remove:
            files_removed = list(files_to_remove)
            logger.warning(f"Files to remove: {files_removed}")
            raise NotImplementedError("Removing files is not yet supported")

        # Update existing files if start/end times have changed
        for file_path, existing_file in current_file_paths.items():
            if file_path in new_file_lookup:
                new_times = new_file_lookup[file_path]
                if (
                    existing_file.start_time != new_times["start_time"]
                    or existing_file.end_time != new_times["end_time"]
                ):
                    logger.warning(f"Updating file times for {file_path}")
                    existing_file.start_time = new_times["start_time"]
                    existing_file.end_time = new_times["end_time"]
                    files_updated.append(file_path)
                else:
                    files_unchanged.append(file_path)

        # Add new files (batch operation)
        files_to_add = new_file_paths - existing_file_paths
        if files_to_add:
            files_added = list(files_to_add)
            new_dataset_files = []
            for file_path in files_to_add:
                file_times = new_file_lookup[file_path]
                new_dataset_files.append(
                    DatasetFile(
                        path=file_path,
                        dataset_id=dataset.id,
                        start_time=file_times["start_time"],
                        end_time=file_times["end_time"],
                    )
                )
            db.session.add_all(new_dataset_files)

        # Determine final dataset state
        # If dataset metadata changed, use that state
        # If no metadata changed but files changed, consider it updated
        # If nothing changed, keep the original state (None for existing, CREATED for new)
        final_dataset_state = dataset_state
        if dataset_state is None and (files_added or files_updated or files_removed):
            final_dataset_state = ModelState.UPDATED

        result = DatasetRegistrationResult(
            dataset=dataset,
            dataset_state=final_dataset_state,
            files_added=files_added,
            files_updated=files_updated,
            files_removed=files_removed,
            files_unchanged=files_unchanged,
        )
        change_message = f": ({final_dataset_state.name})" if final_dataset_state else ""
        logger.debug(
            f"Dataset registration complete for {dataset.slug}{change_message} "
            f"{len(files_added)} files added, "
            f"{len(files_updated)} files updated, "
            f"{len(files_removed)} files removed, "
            f"{len(files_unchanged)} files unchanged"
        )

        return result

    def _get_dataset_files(self, db: Database, limit: int | None = None) -> pd.DataFrame:
        dataset_type = self.dataset_cls.__mapper_args__["polymorphic_identity"]

        result = (
            db.session.query(DatasetFile)
            # The join is necessary to be able to order by the dataset columns
            .join(DatasetFile.dataset)
            .where(Dataset.dataset_type == dataset_type)
            # The joinedload is necessary to avoid N+1 queries (one for each dataset)
            # https://docs.sqlalchemy.org/en/14/orm/loading_relationships.html#the-zen-of-joined-eager-loading
            .options(joinedload(DatasetFile.dataset.of_type(self.dataset_cls)))
            .order_by(Dataset.updated_at.desc())
            .limit(limit)
            .all()
        )

        return pd.DataFrame(
            [
                {
                    **{k: getattr(file, k) for k in self.file_specific_metadata},
                    **{k: getattr(file.dataset, k) for k in self.dataset_specific_metadata},
                    "finalised": file.dataset.finalised,
                }
                for file in result
            ],
            index=[file.dataset.id for file in result],
        )

    def _get_datasets(self, db: Database, limit: int | None = None) -> pd.DataFrame:
        result_datasets = (
            db.session.query(self.dataset_cls).order_by(Dataset.updated_at.desc()).limit(limit).all()
        )

        return pd.DataFrame(
            [{k: getattr(dataset, k) for k in self.dataset_specific_metadata} for dataset in result_datasets],
            index=[file.id for file in result_datasets],
        )

    def load_catalog(
        self, db: Database, include_files: bool = True, limit: int | None = None
    ) -> pd.DataFrame:
        """
        Load the data catalog containing the currently tracked datasets/files from the database

        Iterating over different datasets within the data catalog can be done using a `groupby`
        operation for the `instance_id` column.

        Only the latest version of each dataset is returned.

        The index of the data catalog is the primary key of the dataset.
        This should be maintained during any processing.

        Returns
        -------
        :
            Data catalog containing the metadata for the currently ingested datasets
        """
        with db.session.begin():
            # TODO: Paginate this query to avoid loading all the data at once
            if include_files:
                catalog = self._get_dataset_files(db, limit)
            else:
                catalog = self._get_datasets(db, limit)

        def _get_latest_version(dataset_catalog: pd.DataFrame) -> pd.DataFrame:
            """
            Get the latest version of each dataset based on the version metadata.

            This assumes that the version can be sorted lexicographically.
            """
            latest_version = dataset_catalog[self.version_metadata].max()

            return cast(
                pd.DataFrame, dataset_catalog[dataset_catalog[self.version_metadata] == latest_version]
            )

        # If there are no datasets, return an empty DataFrame
        if catalog.empty:
            return pd.DataFrame(columns=self.dataset_specific_metadata + self.file_specific_metadata)

        # Group by the dataset ID and get the latest version for each dataset
        return catalog.groupby(
            list(self.dataset_id_metadata), group_keys=False, as_index=False, sort=False
        ).apply(_get_latest_version)

dataset_id_metadata = () class-attribute instance-attribute #

The group of metadata columns that are specific to the dataset excluding the version information.

Each unique dataset should have the same values for these columns.

This is generally the columns that describe the slug of a dataset, excluding the version information.

slug_column instance-attribute #

The column in the data catalog that contains the dataset slug. The dataset slug is a unique identifier for the dataset that includes the version of the dataset. This can be used to group files together that belong to the same dataset.

version_metadata = 'version' class-attribute instance-attribute #

The column in the data catalog that contains the version of the dataset.

find_local_datasets(file_or_directory) #

Generate a data catalog from the specified file or directory

This data catalog should contain all the metadata needed by the database. The index of the data catalog should be the dataset slug.

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
    """
    Generate a data catalog from the specified file or directory

    This data catalog should contain all the metadata needed by the database.
    The index of the data catalog should be the dataset slug.
    """
    ...

load_catalog(db, include_files=True, limit=None) #

Load the data catalog containing the currently tracked datasets/files from the database

Iterating over different datasets within the data catalog can be done using a groupby operation for the instance_id column.

Only the latest version of each dataset is returned.

The index of the data catalog is the primary key of the dataset. This should be maintained during any processing.

Returns:

Type Description
DataFrame

Data catalog containing the metadata for the currently ingested datasets

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
def load_catalog(
    self, db: Database, include_files: bool = True, limit: int | None = None
) -> pd.DataFrame:
    """
    Load the data catalog containing the currently tracked datasets/files from the database

    Iterating over different datasets within the data catalog can be done using a `groupby`
    operation for the `instance_id` column.

    Only the latest version of each dataset is returned.

    The index of the data catalog is the primary key of the dataset.
    This should be maintained during any processing.

    Returns
    -------
    :
        Data catalog containing the metadata for the currently ingested datasets
    """
    with db.session.begin():
        # TODO: Paginate this query to avoid loading all the data at once
        if include_files:
            catalog = self._get_dataset_files(db, limit)
        else:
            catalog = self._get_datasets(db, limit)

    def _get_latest_version(dataset_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Get the latest version of each dataset based on the version metadata.

        This assumes that the version can be sorted lexicographically.
        """
        latest_version = dataset_catalog[self.version_metadata].max()

        return cast(
            pd.DataFrame, dataset_catalog[dataset_catalog[self.version_metadata] == latest_version]
        )

    # If there are no datasets, return an empty DataFrame
    if catalog.empty:
        return pd.DataFrame(columns=self.dataset_specific_metadata + self.file_specific_metadata)

    # Group by the dataset ID and get the latest version for each dataset
    return catalog.groupby(
        list(self.dataset_id_metadata), group_keys=False, as_index=False, sort=False
    ).apply(_get_latest_version)

pretty_subset(data_catalog) #

Get a subset of the data_catalog to pretty print

Parameters:

Name Type Description Default
data_catalog DataFrame

Data catalog to subset

required

Returns:

Type Description
DataFrame

Subset of the data catalog to pretty print

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
def pretty_subset(self, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Get a subset of the data_catalog to pretty print

    Parameters
    ----------
    data_catalog
        Data catalog to subset

    Returns
    -------
    :
        Subset of the data catalog to pretty print

    """
    return data_catalog[
        [
            *self.dataset_id_metadata,
            self.version_metadata,
        ]
    ]

register_dataset(config, db, data_catalog_dataset) #

Register a dataset in the database using the data catalog

Parameters:

Name Type Description Default
config Config

Configuration object

required
db Database

Database instance

required
data_catalog_dataset DataFrame

A subset of the data catalog containing the metadata for a single dataset

required

Returns:

Type Description
DatasetRegistrationResult

Registration result with dataset and file change information

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
def register_dataset(  # noqa: PLR0915
    self, config: Config, db: Database, data_catalog_dataset: pd.DataFrame
) -> DatasetRegistrationResult:
    """
    Register a dataset in the database using the data catalog

    Parameters
    ----------
    config
        Configuration object
    db
        Database instance
    data_catalog_dataset
        A subset of the data catalog containing the metadata for a single dataset

    Returns
    -------
    :
        Registration result with dataset and file change information
    """
    DatasetModel = self.dataset_cls

    self.validate_data_catalog(data_catalog_dataset)
    unique_slugs = data_catalog_dataset[self.slug_column].unique()
    if len(unique_slugs) != 1:
        raise RefException(f"Found multiple datasets in the same directory: {unique_slugs}")
    slug = unique_slugs[0]

    # Upsert the dataset (create a new dataset or update the metadata)
    dataset_metadata = data_catalog_dataset[list(self.dataset_specific_metadata)].iloc[0].to_dict()
    dataset, dataset_state = db.update_or_create(DatasetModel, defaults=dataset_metadata, slug=slug)
    if dataset_state == ModelState.CREATED:
        logger.info(f"Created new dataset: {dataset}")
    elif dataset_state == ModelState.UPDATED:
        logger.info(f"Updating existing dataset: {dataset}")
    db.session.flush()

    # Initialize result tracking
    files_added = []
    files_updated = []
    files_removed = []
    files_unchanged = []

    # Get current files for this dataset
    current_files = db.session.query(DatasetFile).filter_by(dataset_id=dataset.id).all()
    current_file_paths = {f.path: f for f in current_files}

    # Get new file data from data catalog
    new_file_data = data_catalog_dataset.to_dict(orient="records")
    new_file_lookup = {}
    for dataset_file in new_file_data:
        file_path = str(validate_path(dataset_file["path"]))
        new_file_lookup[file_path] = {
            "start_time": dataset_file["start_time"],
            "end_time": dataset_file["end_time"],
        }

    new_file_paths = set(new_file_lookup.keys())
    existing_file_paths = set(current_file_paths.keys())

    # TODO: support removing files that are no longer present
    # We want to keep a record of the dataset if it was used by a diagnostic in the past
    files_to_remove = existing_file_paths - new_file_paths
    if files_to_remove:
        files_removed = list(files_to_remove)
        logger.warning(f"Files to remove: {files_removed}")
        raise NotImplementedError("Removing files is not yet supported")

    # Update existing files if start/end times have changed
    for file_path, existing_file in current_file_paths.items():
        if file_path in new_file_lookup:
            new_times = new_file_lookup[file_path]
            if (
                existing_file.start_time != new_times["start_time"]
                or existing_file.end_time != new_times["end_time"]
            ):
                logger.warning(f"Updating file times for {file_path}")
                existing_file.start_time = new_times["start_time"]
                existing_file.end_time = new_times["end_time"]
                files_updated.append(file_path)
            else:
                files_unchanged.append(file_path)

    # Add new files (batch operation)
    files_to_add = new_file_paths - existing_file_paths
    if files_to_add:
        files_added = list(files_to_add)
        new_dataset_files = []
        for file_path in files_to_add:
            file_times = new_file_lookup[file_path]
            new_dataset_files.append(
                DatasetFile(
                    path=file_path,
                    dataset_id=dataset.id,
                    start_time=file_times["start_time"],
                    end_time=file_times["end_time"],
                )
            )
        db.session.add_all(new_dataset_files)

    # Determine final dataset state
    # If dataset metadata changed, use that state
    # If no metadata changed but files changed, consider it updated
    # If nothing changed, keep the original state (None for existing, CREATED for new)
    final_dataset_state = dataset_state
    if dataset_state is None and (files_added or files_updated or files_removed):
        final_dataset_state = ModelState.UPDATED

    result = DatasetRegistrationResult(
        dataset=dataset,
        dataset_state=final_dataset_state,
        files_added=files_added,
        files_updated=files_updated,
        files_removed=files_removed,
        files_unchanged=files_unchanged,
    )
    change_message = f": ({final_dataset_state.name})" if final_dataset_state else ""
    logger.debug(
        f"Dataset registration complete for {dataset.slug}{change_message} "
        f"{len(files_added)} files added, "
        f"{len(files_updated)} files updated, "
        f"{len(files_removed)} files removed, "
        f"{len(files_unchanged)} files unchanged"
    )

    return result

validate_data_catalog(data_catalog, skip_invalid=False) #

Validate a data catalog

Parameters:

Name Type Description Default
data_catalog DataFrame

Data catalog to validate

required
skip_invalid bool

If True, ignore datasets with invalid metadata and remove them from the resulting data catalog.

False

Raises:

Type Description
ValueError

If skip_invalid is False (default) and the data catalog contains validation errors.

Returns:

Type Description
DataFrame

Validated data catalog

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
def validate_data_catalog(self, data_catalog: pd.DataFrame, skip_invalid: bool = False) -> pd.DataFrame:
    """
    Validate a data catalog

    Parameters
    ----------
    data_catalog
        Data catalog to validate
    skip_invalid
        If True, ignore datasets with invalid metadata and remove them from the resulting data catalog.

    Raises
    ------
    ValueError
        If `skip_invalid` is False (default) and the data catalog contains validation errors.

    Returns
    -------
    :
        Validated data catalog
    """
    # Check if the data catalog contains the required columns
    missing_columns = set(self.dataset_specific_metadata + self.file_specific_metadata) - set(
        data_catalog.columns
    )
    if missing_columns:
        raise ValueError(f"Data catalog is missing required columns: {missing_columns}")

    # Verify that the dataset specific columns don't vary by dataset by counting the unique values
    # for each dataset and checking if there are any that have more than one unique value.
    unique_metadata = (
        data_catalog[list(self.dataset_specific_metadata)].groupby(self.slug_column).nunique()
    )
    if unique_metadata.gt(1).any(axis=1).any():
        _log_duplicate_metadata(data_catalog, unique_metadata, self.slug_column)

        if skip_invalid:
            data_catalog = data_catalog[
                ~data_catalog[self.slug_column].isin(
                    unique_metadata[unique_metadata.gt(1).any(axis=1)].index
                )
            ]
        else:
            raise ValueError("Dataset specific metadata varies by dataset")

    return data_catalog

DatasetParsingFunction #

Bases: Protocol

Protocol for a function that parses metadata from a file or directory

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
class DatasetParsingFunction(Protocol):
    """
    Protocol for a function that parses metadata from a file or directory
    """

    def __call__(self, file: str, **kwargs: Any) -> dict[str, Any]:
        """
        Parse a file or directory and return metadata for the dataset

        Parameters
        ----------
        file
            File or directory to parse

        kwargs
            Additional keyword arguments to pass to the parsing function.

        Returns
        -------
        :
            Data catalog containing the metadata for the dataset
        """
        ...

__call__(file, **kwargs) #

Parse a file or directory and return metadata for the dataset

Parameters:

Name Type Description Default
file str

File or directory to parse

required
kwargs Any

Additional keyword arguments to pass to the parsing function.

{}

Returns:

Type Description
dict[str, Any]

Data catalog containing the metadata for the dataset

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
def __call__(self, file: str, **kwargs: Any) -> dict[str, Any]:
    """
    Parse a file or directory and return metadata for the dataset

    Parameters
    ----------
    file
        File or directory to parse

    kwargs
        Additional keyword arguments to pass to the parsing function.

    Returns
    -------
    :
        Data catalog containing the metadata for the dataset
    """
    ...

DatasetRegistrationResult #

Result of registering a dataset, containing information about file changes

Source code in packages/climate-ref/src/climate_ref/datasets/base.py
@define
class DatasetRegistrationResult:
    """
    Result of registering a dataset, containing information about file changes
    """

    dataset: Dataset
    dataset_state: ModelState | None
    files_added: list[str]
    files_updated: list[str]
    files_removed: list[str]
    files_unchanged: list[str]

    @property
    def total_changes(self) -> int:
        """Total number of file changes (added + updated + removed)"""
        return len(self.files_added) + len(self.files_updated) + len(self.files_removed)

total_changes property #

Total number of file changes (added + updated + removed)