Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1621 +/- ##
==========================================
- Coverage 67.36% 67.32% -0.05%
==========================================
Files 152 153 +1
Lines 25320 25406 +86
==========================================
+ Hits 17058 17105 +47
- Misses 8262 8301 +39 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Should we add to docs? |
There was a problem hiding this comment.
Pull request overview
This pull request introduces a new CodeTransformAgent that extends the BaseCodeAgent to enable LLM-generated pandas code for data transformations. The agent generates Python code to transform DataFrames, executes it safely, and exposes the result as a DuckDB view.
Changes:
- Adds
PandasExecutorclass to safely execute LLM-generated pandas transformation code - Introduces
CodeTransformAgentfor generating and executing pandas data transformations - Includes prompt templates for code generation and safety validation
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| lumen/ai/code_executor.py | Adds PandasExecutor class with pandas/numpy import restrictions and DataFrame output validation |
| lumen/ai/agents/code_transform.py | Implements CodeTransformAgent with TransformSpec model, code generation, and DuckDB source creation |
| lumen/ai/agents/init.py | Exports CodeTransformAgent in the agents module |
| lumen/ai/prompts/CodeTransformAgent/main.jinja2 | Defines main prompt template with pandas transformation instructions |
| lumen/ai/prompts/CodeTransformAgent/code_safety.jinja2 | Defines safety validation prompt extending BaseViewAgent template |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class CodeTransformAgent(BaseCodeAgent): | ||
| """ | ||
| Generates pandas transformation code, executes it, and exposes the result as a DuckDB view. | ||
| """ | ||
|
|
||
| conditions = param.List( | ||
| default=[ | ||
| "If no pipeline is available you MUST use a SQL agent to make the data available first", | ||
| "Use when the user asks to clean, reshape, or transform data that is not easily achievable in SQL", | ||
| "Use for feature engineering, column creation, filtering, or aggregation in pandas", | ||
| "Use when the user wants a new derived table from the current dataset", | ||
| ] | ||
| ) | ||
|
|
||
| purpose = param.String( | ||
| default="Generates pandas code to transform the current DataFrame into a new table." | ||
| ) | ||
|
|
||
| prompts = param.Dict( | ||
| default={ | ||
| "main": { | ||
| "response_model": TransformSpec, | ||
| "template": PROMPTS_DIR / "CodeTransformAgent" / "main.jinja2", | ||
| }, | ||
| "code_safety": { | ||
| "response_model": CodeSafetyCheck, | ||
| "template": PROMPTS_DIR / "CodeTransformAgent" / "code_safety.jinja2", | ||
| }, | ||
| } | ||
| ) | ||
|
|
||
| user = param.String(default="Transform") | ||
|
|
||
| _executor_class = PandasExecutor | ||
|
|
||
| _output_type = LumenOutput | ||
|
|
||
| input_schema = CodeTransformInputs | ||
| output_schema = CodeTransformOutputs | ||
|
|
||
| async def _generate_code_spec( | ||
| self, | ||
| messages: list[Message], | ||
| context: TContext, | ||
| pipeline: Pipeline, | ||
| errors: list[str] | None = None, | ||
| ) -> dict[str, Any] | None: | ||
| errors_context = self._build_errors_context(pipeline, context, errors) | ||
| try: | ||
| available_tables = pipeline.source.get_tables() | ||
| except Exception: | ||
| available_tables = [] | ||
|
|
||
| with self._add_step(title="Generating transformation code", steps_layout=self._steps_layout) as step: | ||
| system_prompt = await self._render_prompt( | ||
| "main", | ||
| messages, | ||
| context, | ||
| table=pipeline.table, | ||
| tables=available_tables, | ||
| **errors_context, | ||
| ) | ||
|
|
||
| model_spec = self.prompts["main"].get("llm_spec", self.llm_spec_key) | ||
| response = self.llm.stream( | ||
| messages, | ||
| system=system_prompt, | ||
| model_spec=model_spec, | ||
| response_model=TransformSpec, | ||
| ) | ||
|
|
||
| async for output in response: | ||
| step.stream(output.chain_of_thought, replace=True) | ||
|
|
||
| step.stream(f"\n```python\n{output.code}\n```\n", replace=False) | ||
|
|
||
| system = None | ||
| if self.code_execution == "llm": | ||
| system = await self._render_prompt( | ||
| "code_safety", | ||
| messages, | ||
| context, | ||
| code=output.code, | ||
| ) | ||
|
|
||
| df = await get_data(pipeline) | ||
| transformed = await self._execute_code(output.code, df, system=system, step=step) | ||
|
|
||
| if transformed is None: | ||
| raise UserCancelledError("Code execution rejected by user.") | ||
|
|
||
| table_slug = normalize_table_name(output.table_slug or f"{pipeline.table}_transformed") | ||
|
|
||
| source = DuckDBSource(uri=":memory:", mirrors={table_slug: transformed}, ephemeral=True) | ||
| new_pipeline = Pipeline(source=source, table=table_slug) | ||
|
|
||
| return { | ||
| "code": output.code, | ||
| "pipeline": new_pipeline, | ||
| "source": source, | ||
| "table": table_slug, | ||
| "data": transformed, | ||
| } | ||
|
|
||
| async def respond( | ||
| self, | ||
| messages: list[Message], | ||
| context: TContext, | ||
| step_title: str | None = None, | ||
| ) -> tuple[list[Any], CodeTransformOutputs]: | ||
| pipeline = context.get("pipeline") | ||
| if not pipeline: | ||
| raise ValueError("Context did not contain a pipeline.") | ||
|
|
||
| result = await self._generate_code_spec(messages, context, pipeline) | ||
| if result is None: | ||
| return [], {} | ||
|
|
||
| out = self._output_type(component=result["pipeline"], title=step_title) | ||
| out_context = { | ||
| "code": result["code"], | ||
| "pipeline": result["pipeline"], | ||
| "source": result["source"], | ||
| "table": result["table"], | ||
| "data": await describe_data(result["data"]), | ||
| } | ||
| return [out], out_context |
There was a problem hiding this comment.
The new CodeTransformAgent lacks test coverage. The test file lumen/tests/ai/test_agents.py contains tests for other agents (ChatAgent, SQLAgent, VegaLiteAgent, AnalysisAgent), but no tests exist for CodeTransformAgent. At minimum, tests should cover: basic transformation execution, error handling when pipeline is missing, table_slug normalization, and validation that the transformed data is correctly exposed as a DuckDB view.
| if result is None: | ||
| return [], {} |
There was a problem hiding this comment.
When the result from _generate_code_spec is None (line 171), the method returns an empty list and empty dict. However, the return type annotation indicates it should return CodeTransformOutputs, not an empty dict. This type inconsistency could cause issues for code that expects a properly typed output. Consider either raising an exception or returning a properly structured CodeTransformOutputs instance with None/default values.
| if transformed is None: | ||
| raise UserCancelledError("Code execution rejected by user.") | ||
|
|
||
| table_slug = normalize_table_name(output.table_slug or f"{pipeline.table}_transformed") |
There was a problem hiding this comment.
The fallback table name uses string concatenation which could produce invalid table names if pipeline.table contains special characters. While normalize_table_name will clean this up, consider using a more explicit format that's clearer about the intent, such as f"{normalize_table_name(pipeline.table)}_transformed" to ensure the base table name is also normalized before concatenation.
| table_slug = normalize_table_name(output.table_slug or f"{pipeline.table}_transformed") | |
| table_slug = normalize_table_name( | |
| output.table_slug or f"{normalize_table_name(pipeline.table)}_transformed" | |
| ) |
|
|
||
| table_slug = normalize_table_name(output.table_slug or f"{pipeline.table}_transformed") | ||
|
|
||
| source = DuckDBSource(uri=":memory:", mirrors={table_slug: transformed}, ephemeral=True) |
There was a problem hiding this comment.
The new Pipeline created on line 150 uses the DuckDBSource with an in-memory URI (":memory:"). This creates an isolated pipeline that won't have access to any other tables from the original source. If users need to reference multiple tables or join the transformed data with other tables, they won't be able to do so. Consider documenting this limitation in the docstring, or providing a way to preserve access to other tables from the original source.
| source = DuckDBSource(uri=":memory:", mirrors={table_slug: transformed}, ephemeral=True) | |
| # Reuse the original DuckDBSource URI when possible so that the new pipeline | |
| # can still access other tables from the original source. Fall back to an | |
| # in-memory database if the original source is not a DuckDBSource or does | |
| # not expose a URI. | |
| source_uri = ":memory:" | |
| if isinstance(pipeline.source, DuckDBSource): | |
| try: | |
| source_uri = pipeline.source.uri # type: ignore[attr-defined] | |
| except AttributeError: | |
| source_uri = ":memory:" | |
| source = DuckDBSource(uri=source_uri, mirrors={table_slug: transformed}, ephemeral=True) |
| table_slug: str = Field( | ||
| description=( | ||
| "Short, descriptive snake_case name for the transformed table " | ||
| "(e.g. filtered_orders_2024)." | ||
| ) |
There was a problem hiding this comment.
The LLM is asked to provide a "snake_case" table_slug (line 27-31), but there's no validation that the LLM actually follows this instruction. The normalize_table_name function will fix most issues, but it also converts to lowercase and replaces special characters with underscores, potentially creating a table name that differs significantly from what the LLM intended. Consider adding validation feedback if the normalized name differs from the LLM-provided name, so the LLM can learn to provide properly formatted names.
| conditions = param.List( | ||
| default=[ | ||
| "If no pipeline is available you MUST use a SQL agent to make the data available first", | ||
| "Use when the user asks to clean, reshape, or transform data that is not easily achievable in SQL", |
There was a problem hiding this comment.
The condition on line 64 states "Use when the user asks to clean, reshape, or transform data that is not easily achievable in SQL". However, this condition is vague - what defines "not easily achievable in SQL"? This could lead to confusion about when to use this agent versus a SQL agent. Consider providing more specific examples or criteria, such as "Use for complex transformations requiring iterative logic, custom Python functions, or operations not supported by SQL (e.g., advanced string manipulation, custom aggregations)".
| "Use when the user asks to clean, reshape, or transform data that is not easily achievable in SQL", | |
| "Use when the user asks to clean, reshape, or transform data in ways that are cumbersome or unsupported in SQL (e.g. iterative or row-wise logic, custom Python functions, advanced string manipulation, complex multi-step feature engineering, or custom aggregations)", |
| out_context = { | ||
| "code": result["code"], | ||
| "pipeline": result["pipeline"], | ||
| "source": result["source"], | ||
| "table": result["table"], | ||
| "data": await describe_data(result["data"]), |
There was a problem hiding this comment.
The output context on line 180 calls describe_data on the transformed data, but there's no error handling if describe_data fails. Given that this is a new DataFrame created by LLM-generated code, it could potentially have unexpected types or structures that might cause describe_data to fail. Consider wrapping this call in a try-except block to gracefully handle any potential errors, similar to the pattern used elsewhere in the codebase.
| out_context = { | |
| "code": result["code"], | |
| "pipeline": result["pipeline"], | |
| "source": result["source"], | |
| "table": result["table"], | |
| "data": await describe_data(result["data"]), | |
| try: | |
| described_data = await describe_data(result["data"]) | |
| except Exception: | |
| described_data = result["data"] | |
| out_context = { | |
| "code": result["code"], | |
| "pipeline": result["pipeline"], | |
| "source": result["source"], | |
| "table": result["table"], | |
| "data": described_data, |
| try: | ||
| available_tables = pipeline.source.get_tables() | ||
| except Exception: | ||
| available_tables = [] |
There was a problem hiding this comment.
The error handling when getting available tables wraps all exceptions with a bare except clause and returns an empty list. This could hide legitimate errors (like network issues with a remote database) and make debugging difficult. Consider logging the exception or being more specific about which exceptions to catch (e.g., only catching expected exceptions like AttributeError if the source doesn't support get_tables).
| class CodeTransformInputs(ContextModel): | ||
| data: Any | ||
| pipeline: Pipeline | ||
| table: str |
There was a problem hiding this comment.
The CodeTransformInputs schema includes 'table' as a required field, but this field doesn't appear to be used anywhere in the agent implementation. The agent retrieves the table from 'pipeline.table' instead. Either remove this unused field from the schema or add validation to ensure consistency between context['table'] and context['pipeline'].table if both are expected to be present.
| table: str |
|
|
||
| Requirements: | ||
| - Use pandas (`pd`) and optionally numpy (`np`); both are available. | ||
| - Assign the final transformed DataFrame to `df_out`. |
There was a problem hiding this comment.
The prompt instruction states "Assign the final transformed DataFrame to df_out" but doesn't explicitly mention that the input DataFrame is available as df. While this might be implied, being explicit about both the input variable name (df) and output variable name (df_out) would improve clarity and reduce potential confusion for the LLM.
| - Assign the final transformed DataFrame to `df_out`. | |
| - The input DataFrame is available as `df`; assign the final transformed DataFrame to `df_out`. |
Builds on the
BaseCodeAgentintroducing aCodeTransformAgentthat allows generating code to transform pandas data.