Skip to content

climate_ref.cli.executions #

View execution groups and their results

ListGroupsFilterOptions dataclass #

Options to filter execution groups in list-groups command

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
@dataclass
class ListGroupsFilterOptions:
    """Options to filter execution groups in list-groups command"""

    diagnostic: list[str] | None = None
    """Filter by diagnostic slug (substring, case-insensitive)"""

    provider: list[str] | None = None
    """Filter by provider slug (substring, case-insensitive)"""

    facets: dict[str, str] | None = None
    """Filter by facet key-value pairs (exact match)"""

diagnostic = None class-attribute instance-attribute #

Filter by diagnostic slug (substring, case-insensitive)

facets = None class-attribute instance-attribute #

Filter by facet key-value pairs (exact match)

provider = None class-attribute instance-attribute #

Filter by provider slug (substring, case-insensitive)

delete_groups(ctx, diagnostic=None, provider=None, filter=None, successful=None, dirty=None, remove_outputs=typer.Option(False, '--remove-outputs', help='Also remove output directories from the filesystem'), force=typer.Option(False, help='Skip confirmation prompt')) #

Delete execution groups matching the specified filters.

This command will delete execution groups and their associated executions. Use filters to specify which groups to delete. At least one filter must be provided to prevent accidental deletion of all groups.

Filters can be combined using AND logic across filter types and OR logic within a filter type.

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
@app.command()
def delete_groups(  # noqa: PLR0912, PLR0913
    ctx: typer.Context,
    diagnostic: Annotated[
        list[str] | None,
        typer.Option(
            help="Filter by diagnostic slug (substring match, case-insensitive)."
            "Multiple values can be provided."
        ),
    ] = None,
    provider: Annotated[
        list[str] | None,
        typer.Option(
            help="Filter by provider slug (substring match, case-insensitive)."
            "Multiple values can be provided."
        ),
    ] = None,
    filter: Annotated[  # noqa: A002
        list[str] | None,
        typer.Option(
            "--filter",
            help="Filter by facet key=value pairs (exact match). Multiple filters can be provided.",
        ),
    ] = None,
    successful: Annotated[
        bool | None,
        typer.Option(
            "--successful/--not-successful",
            help="Filter by successful or unsuccessful executions.",
        ),
    ] = None,
    dirty: Annotated[
        bool | None,
        typer.Option(
            "--dirty/--not-dirty",
            help="Filter to include only dirty or clean execution groups."
            "These execution groups will be re-computed on the next run.",
        ),
    ] = None,
    remove_outputs: bool = typer.Option(
        False, "--remove-outputs", help="Also remove output directories from the filesystem"
    ),
    force: bool = typer.Option(False, help="Skip confirmation prompt"),
) -> None:
    """
    Delete execution groups matching the specified filters.

    This command will delete execution groups and their associated executions.
    Use filters to specify which groups to delete. At least one filter must be provided
    to prevent accidental deletion of all groups.

    Filters can be combined using AND logic across filter types and OR logic within a filter type.
    """
    session = ctx.obj.database.session
    console = ctx.obj.console

    # Parse facet filters
    try:
        facet_filters = parse_facet_filters(filter)
    except ValueError as e:
        logger.error(str(e))
        raise typer.Exit(code=1)

    if not any([diagnostic, provider, facet_filters, successful is not None, dirty is not None]):
        logger.warning("THIS WILL DELETE ALL EXECUTION GROUPS IN THE DATABASE")
        raise typer.Exit(code=1)

    # Build filter options
    filters = ListGroupsFilterOptions(
        diagnostic=diagnostic,
        provider=provider,
        facets=facet_filters if facet_filters else None,
    )
    logger.debug(f"Applying filters: {filters}")

    # Apply filters to query
    try:
        all_filtered_results = get_execution_group_and_latest_filtered(
            session,
            diagnostic_filters=filters.diagnostic,
            provider_filters=filters.provider,
            facet_filters=filters.facets,
            successful=successful,
            dirty=dirty,
        )
    except Exception as e:  # pragma: no cover
        logger.error(f"Error applying filters: {e}")
        raise typer.Exit(code=1)

    # Check if any results found
    if not all_filtered_results:
        emit_no_results_warning(filters, session.query(ExecutionGroup).count())
        return

    # Convert to DataFrame for preview
    results_df = pd.DataFrame(
        [
            {
                "id": eg.id,
                "key": eg.key,
                "provider": eg.diagnostic.provider.slug,
                "diagnostic": eg.diagnostic.slug,
                "dirty": eg.dirty,
                "successful": result.successful if result else None,
                "created_at": eg.created_at,
                "updated_at": eg.updated_at,
                "selectors": json.dumps(eg.selectors),
            }
            for eg, result in all_filtered_results
        ]
    )

    # Display preview
    console.print("Execution groups to be deleted:")
    pretty_print_df(results_df, console=console)

    count = len(all_filtered_results)
    console.print(f"\nWill delete {count} execution group(s).")

    # Confirm unless force is set
    if not force:
        if not typer.confirm("Do you want to proceed with deletion?"):
            console.print("Deletion cancelled.")
            return

    # Remove output directories if requested
    if remove_outputs:
        config = ctx.obj.config
        for eg, _ in all_filtered_results:
            for execution in eg.executions:
                output_dir = config.paths.results / execution.output_fragment

                # Safety check
                if not output_dir.is_relative_to(config.paths.results):  # pragma: no cover
                    logger.error(f"Skipping unsafe path: {output_dir}")
                    continue

                if output_dir.exists():
                    try:
                        logger.warning(f"Removing output directory: {output_dir}")
                        shutil.rmtree(output_dir)
                    except Exception as e:
                        logger.error(f"Failed to remove {output_dir}: {e}")

    # Delete execution groups and all related records
    # TODO: Add cascade delete to FK relationships and simplify this code
    with session.begin_nested() if session.in_transaction() else session.begin():
        for eg, _ in all_filtered_results:
            for execution in eg.executions:
                # Delete MetricValues first
                for metric_value in execution.values:
                    session.delete(metric_value)

                # Delete ExecutionOutputs
                for output in execution.outputs:
                    session.delete(output)

                # Delete many-to-many associations with datasets
                session.execute(
                    execution_datasets.delete().where(execution_datasets.c.execution_id == execution.id)
                )

                # Now delete the execution
                session.delete(execution)

            # Finally delete the execution group
            session.delete(eg)

    if remove_outputs:
        console.print(f"[green]Successfully deleted {count} execution group(s) and their output directories.")
    else:
        console.print(f"[green]Successfully deleted {count} execution group(s).")

emit_no_results_warning(filters, total_count) #

Emit informative warning when filters produce no results.

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
def emit_no_results_warning(
    filters: ListGroupsFilterOptions,
    total_count: int,
) -> None:
    """
    Emit informative warning when filters produce no results.
    """
    filter_parts = []
    if filters.diagnostic:
        filter_parts.append(f"diagnostic filters: {filters.diagnostic}")
    if filters.provider:
        filter_parts.append(f"provider filters: {filters.provider}")
    if filters.facets:
        facet_strs = [f"{k}={v}" for k, v in filters.facets.items()]
        filter_parts.append(f"facet filters: {facet_strs}")

    logger.warning(
        f"No execution groups match the specified filters. "
        f"Total execution groups in database: {total_count}. "
        f"Applied filters: {', '.join(filter_parts)}"
    )

flag_dirty(ctx, execution_id) #

Flag an execution group for recomputation

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
@app.command()
def flag_dirty(ctx: typer.Context, execution_id: int) -> None:
    """
    Flag an execution group for recomputation
    """
    session = ctx.obj.database.session
    console = ctx.obj.console
    with session.begin():
        execution_group = session.get(ExecutionGroup, execution_id)

        if not execution_group:
            logger.error(f"Execution not found: {execution_id}")
            raise typer.Exit(code=1)

        execution_group.dirty = True

        console.print(_execution_panel(execution_group))

inspect(ctx, execution_id) #

Inspect a specific execution group by its ID

This will display the execution details, datasets, results directory, and logs if available.

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
@app.command()
def inspect(ctx: typer.Context, execution_id: int) -> None:
    """
    Inspect a specific execution group by its ID

    This will display the execution details, datasets, results directory, and logs if available.
    """
    config: Config = ctx.obj.config
    session = ctx.obj.database.session
    console = ctx.obj.console

    execution_group = session.get(ExecutionGroup, execution_id)

    if not execution_group:
        logger.error(f"Execution not found: {execution_id}")
        raise typer.Exit(code=1)

    console.print(_execution_panel(execution_group))

    if not execution_group.executions:
        logger.error(f"No results found for execution: {execution_id}")
        return

    result: Execution = execution_group.executions[-1]
    result_directory = config.paths.results / result.output_fragment

    console.print(_datasets_panel(result))
    console.print(_results_directory_panel(result_directory))
    console.print(_log_panel(result_directory))

list_groups(ctx, column=None, limit=typer.Option(100, help='Limit the number of rows to display'), diagnostic=None, provider=None, filter=None, successful=None, dirty=None) #

List the diagnostic execution groups that have been identified

The data catalog is sorted by the date that the execution group was created (first = newest). If the --column option is provided, only the specified columns will be displayed.

Filters can be combined using AND logic across filter types and OR logic within a filter type.

The output will be in a tabular format.

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
@app.command()
def list_groups(  # noqa: PLR0913
    ctx: typer.Context,
    column: Annotated[
        list[str] | None,
        typer.Option(help="Only include specified columns in the output"),
    ] = None,
    limit: int = typer.Option(100, help="Limit the number of rows to display"),
    diagnostic: Annotated[
        list[str] | None,
        typer.Option(
            help="Filter by diagnostic slug (substring match, case-insensitive)."
            "Multiple values can be provided."
        ),
    ] = None,
    provider: Annotated[
        list[str] | None,
        typer.Option(
            help="Filter by provider slug (substring match, case-insensitive)."
            "Multiple values can be provided."
        ),
    ] = None,
    filter: Annotated[  # noqa: A002
        list[str] | None,
        typer.Option(
            "--filter",
            help="Filter by facet key=value pairs (exact match). Multiple filters can be provided.",
        ),
    ] = None,
    successful: Annotated[
        bool | None,
        typer.Option(
            "--successful/--not-successful",
            help="Filter by successful or unsuccessful executions.",
        ),
    ] = None,
    dirty: Annotated[
        bool | None,
        typer.Option(
            "--dirty/--not-dirty",
            help="Filter to include only dirty or clean execution groups."
            "These execution groups will be re-computed on the next run.",
        ),
    ] = None,
) -> None:
    """
    List the diagnostic execution groups that have been identified

    The data catalog is sorted by the date that the execution group was created (first = newest).
    If the `--column` option is provided, only the specified columns will be displayed.

    Filters can be combined using AND logic across filter types and OR logic within a filter type.

    The output will be in a tabular format.
    """
    session = ctx.obj.database.session
    console = ctx.obj.console

    # Parse facet filters
    try:
        facet_filters = parse_facet_filters(filter)
    except ValueError as e:
        logger.error(str(e))
        raise typer.Exit(code=1)

    # Build filter options
    filters = ListGroupsFilterOptions(
        diagnostic=diagnostic,
        provider=provider,
        facets=facet_filters if facet_filters else None,
    )
    logger.debug(f"Applying filters: {filters}")

    # Get total count before filtering for warning messages
    total_count = session.query(ExecutionGroup).count()

    # Apply filters to query
    try:
        all_filtered_results = get_execution_group_and_latest_filtered(
            session,
            diagnostic_filters=filters.diagnostic,
            provider_filters=filters.provider,
            facet_filters=filters.facets,
            successful=successful,
            dirty=dirty,
        )
        execution_groups_results = all_filtered_results[:limit]
    except Exception as e:  # pragma: no cover
        logger.error(f"Error applying filters: {e}")
        raise typer.Exit(code=1)

    # Check if any results found
    if not execution_groups_results:
        emit_no_results_warning(filters, total_count)
        results_df = pd.DataFrame(
            columns=[
                "id",
                "key",
                "provider",
                "diagnostic",
                "dirty",
                "successful",
                "created_at",
                "updated_at",
                "selectors",
            ]
        )
    else:
        results_df = pd.DataFrame(
            [
                {
                    "id": eg.id,
                    "key": eg.key,
                    "provider": eg.diagnostic.provider.slug,
                    "diagnostic": eg.diagnostic.slug,
                    "dirty": eg.dirty,
                    "successful": result.successful if result else None,
                    "created_at": eg.created_at,
                    "updated_at": eg.updated_at,
                    "selectors": json.dumps(eg.selectors),
                }
                for eg, result in execution_groups_results
            ]
        )

    # Apply column filtering
    if column and not results_df.empty:  # Only apply if df is not empty
        if not all(col in results_df.columns for col in column):
            logger.error(f"Column not found in data catalog: {column}")
            raise typer.Exit(code=1)
        results_df = results_df[column]

    # Display results
    pretty_print_df(results_df, console=console)

    # Show limit warning if applicable
    filtered_count = len(all_filtered_results)
    if filtered_count > limit:
        logger.warning(
            f"Displaying {limit} of {filtered_count} filtered results. "
            f"Use the `--limit` option to display more."
        )

walk_directory(directory, tree) #

Recursively build a Tree with directory contents.

Source code in packages/climate-ref/src/climate_ref/cli/executions.py
def walk_directory(directory: pathlib.Path, tree: Tree) -> None:
    """Recursively build a Tree with directory contents."""
    # Sort dirs first then by filename
    paths = sorted(
        pathlib.Path(directory).iterdir(),
        key=lambda path: (path.is_file(), path.name.lower()),
    )
    for path in paths:
        # Remove hidden files
        if path.name.startswith("."):
            continue
        if path.is_dir():
            style = "dim" if path.name.startswith("__") else ""
            branch = tree.add(
                f"[bold magenta]:open_file_folder: [link file://{path}]{escape(path.name)}",
                style=style,
                guide_style=style,
            )
            walk_directory(path, branch)
        else:
            text_filename = Text(path.name, "green")
            text_filename.highlight_regex(r"\..*$", "bold red")
            text_filename.stylize(f"link file://{path}")
            file_size = path.stat().st_size
            text_filename.append(f" ({decimal(file_size)})", "blue")
            tree.add(text_filename)