Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions astro-airflow-mcp/src/astro_airflow_mcp/cli/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ def list_dag_runs(
typer.Option("--offset", "-o", help="Offset for pagination"),
] = 0,
order_by: Annotated[
str | None,
typer.Option("--order-by", help="Sort field (prefix - for descending, e.g., -start_date)"),
] = None,
str,
typer.Option(
"--order-by",
help=(
"Sort field; prefix with '-' for descending. Defaults to '-start_date' "
"so the most recent runs come first."
),
),
] = "-start_date",
state: Annotated[
str | None,
typer.Option("--state", "-s", help="Filter by state: running, success, failed, queued"),
Expand Down
28 changes: 24 additions & 4 deletions astro-airflow-mcp/src/astro_airflow_mcp/tools/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@ def _list_dag_runs_impl(
dag_id: str | None = None,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET,
order_by: str | None = None,
) -> str:
"""Internal implementation for listing DAG runs from Airflow.

Args:
dag_id: Optional DAG ID to filter runs for a specific DAG
limit: Maximum number of DAG runs to return (default: 100)
offset: Offset for pagination (default: 0)
order_by: Sort field; prefix with '-' for descending. ``None`` falls back
to the Airflow API default (``id`` ascending, i.e. oldest first).

Returns:
JSON string containing the list of DAG runs with their metadata
"""
try:
adapter = _get_adapter()
data = adapter.list_dag_runs(dag_id=dag_id, limit=limit, offset=offset)
data = adapter.list_dag_runs(dag_id=dag_id, limit=limit, offset=offset, order_by=order_by)

if "dag_runs" in data:
return _wrap_list_response(data["dag_runs"], "dag_runs", data)
Expand Down Expand Up @@ -213,7 +216,12 @@ def _trigger_dag_and_wait_impl(


@mcp.tool()
def list_dag_runs(dag_id: str | None = None) -> str:
def list_dag_runs(
dag_id: str | None = None,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET,
order_by: str = "-start_date",
) -> str:
"""Get execution history and status of DAG runs (workflow executions).

Use this tool when the user asks about:
Expand All @@ -237,11 +245,23 @@ def list_dag_runs(dag_id: str | None = None) -> str:
Args:
dag_id: Optional DAG ID to filter runs for a specific DAG.
If not provided, returns runs across all DAGs.
limit: Maximum number of DAG runs to return (default: 100).
offset: Offset for pagination (default: 0). Use together with `limit`
to page through DAGs that have more than `limit` runs.
order_by: Sort field; prefix with '-' for descending order.
Defaults to '-start_date' so the most recent runs are
returned first. The Airflow API default would be 'id'
ascending (oldest first), which is rarely what callers want.

Returns:
JSON with list of DAG runs, sorted by most recent
JSON with list of DAG runs, sorted most-recent-first by default
"""
return _list_dag_runs_impl(dag_id=dag_id)
return _list_dag_runs_impl(
dag_id=dag_id,
limit=limit,
offset=offset,
order_by=order_by,
)


@mcp.tool()
Expand Down
48 changes: 48 additions & 0 deletions astro-airflow-mcp/tests/test_cli_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Tests for the `af runs` CLI commands."""

from unittest.mock import MagicMock

from typer.testing import CliRunner

from astro_airflow_mcp.cli.main import app

runner = CliRunner()


class TestRunsListCommand:
"""Tests for `af runs list`."""

def test_defaults_to_newest_first(self, mocker):
"""Default order_by is '-start_date' so the most recent runs come first."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {
"dag_runs": [{"dag_run_id": "manual__2024-12-31"}],
"total_entries": 1,
}
mocker.patch("astro_airflow_mcp.cli.runs.get_adapter", return_value=mock_adapter)

result = runner.invoke(app, ["runs", "list", "-d", "example_dag"])

assert result.exit_code == 0
mock_adapter.list_dag_runs.assert_called_once_with(
dag_id="example_dag",
limit=100,
offset=0,
order_by="-start_date",
)

def test_custom_order_by_overrides_default(self, mocker):
"""Caller can override the default sort (e.g. to get oldest first)."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {"dag_runs": [], "total_entries": 0}
mocker.patch("astro_airflow_mcp.cli.runs.get_adapter", return_value=mock_adapter)

result = runner.invoke(app, ["runs", "list", "--order-by", "id"])

assert result.exit_code == 0
mock_adapter.list_dag_runs.assert_called_once_with(
dag_id=None,
limit=100,
offset=0,
order_by="id",
)
98 changes: 98 additions & 0 deletions astro-airflow-mcp/tests/test_consolidated_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,104 @@ def test_diagnose_dag_run_not_found(self, mocker):
assert "error" in data["run_info"]


class TestListDagRunsTool:
"""Tests for list_dag_runs MCP tool."""

def test_list_dag_runs_defaults_to_newest_first(self, mocker):
"""Default order_by is '-start_date' so callers get recent runs, not oldest."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {
"dag_runs": [{"dag_run_id": "manual__2024-12-31"}],
"total_entries": 1,
}

mocker.patch("astro_airflow_mcp.tools.dag_run._get_adapter", return_value=mock_adapter)

list_fn = get_tool_fn(dag_run_module, "list_dag_runs")
list_fn("example_dag")

mock_adapter.list_dag_runs.assert_called_once_with(
dag_id="example_dag",
limit=100,
offset=0,
order_by="-start_date",
)

def test_list_dag_runs_passes_limit_and_offset(self, mocker):
"""Caller-supplied limit and offset are forwarded to the adapter."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {"dag_runs": [], "total_entries": 0}

mocker.patch("astro_airflow_mcp.tools.dag_run._get_adapter", return_value=mock_adapter)

list_fn = get_tool_fn(dag_run_module, "list_dag_runs")
list_fn("example_dag", limit=25, offset=50)

mock_adapter.list_dag_runs.assert_called_once_with(
dag_id="example_dag",
limit=25,
offset=50,
order_by="-start_date",
)

def test_list_dag_runs_custom_order_by_overrides_default(self, mocker):
"""Caller can override the default sort (e.g. to get oldest first)."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {"dag_runs": [], "total_entries": 0}

mocker.patch("astro_airflow_mcp.tools.dag_run._get_adapter", return_value=mock_adapter)

list_fn = get_tool_fn(dag_run_module, "list_dag_runs")
list_fn(order_by="id")

mock_adapter.list_dag_runs.assert_called_once_with(
dag_id=None,
limit=100,
offset=0,
order_by="id",
)

def test_list_dag_runs_impl_passes_order_by_none(self, mocker):
"""order_by=None reaches the adapter as None; BaseAdapter._call drops Nones."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {"dag_runs": [], "total_entries": 0}

mocker.patch("astro_airflow_mcp.tools.dag_run._get_adapter", return_value=mock_adapter)

dag_run_module._list_dag_runs_impl(dag_id="example_dag", order_by=None)

mock_adapter.list_dag_runs.assert_called_once_with(
dag_id="example_dag",
limit=100,
offset=0,
order_by=None,
)

def test_list_dag_runs_no_dag_runs_key(self, mocker):
"""Falls back to a descriptive string when the API returns no dag_runs key."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.return_value = {"unexpected": "shape"}

mocker.patch("astro_airflow_mcp.tools.dag_run._get_adapter", return_value=mock_adapter)

list_fn = get_tool_fn(dag_run_module, "list_dag_runs")
result = list_fn("example_dag")

assert "No DAG runs found" in result

def test_list_dag_runs_adapter_error(self, mocker):
"""Exceptions from the adapter are returned as a string, not raised."""
mock_adapter = MagicMock()
mock_adapter.list_dag_runs.side_effect = Exception("boom")

mocker.patch("astro_airflow_mcp.tools.dag_run._get_adapter", return_value=mock_adapter)

list_fn = get_tool_fn(dag_run_module, "list_dag_runs")
result = list_fn("example_dag")

assert "boom" in result


class TestDeleteDagRunTool:
"""Tests for delete_dag_run MCP tool."""

Expand Down
Loading