|
1 | 1 | from __future__ import absolute_import |
2 | 2 |
|
| 3 | +import logging |
3 | 4 | from collections import defaultdict |
4 | 5 | from datetime import datetime |
5 | | -import logging |
6 | 6 | from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple |
7 | 7 |
|
8 | 8 | import singlestoredb |
@@ -211,7 +211,7 @@ def update( |
211 | 211 | event_ts timestamp NULL DEFAULT NULL, |
212 | 212 | created_ts timestamp NULL DEFAULT NULL, |
213 | 213 | PRIMARY KEY(entity_key, feature_name), |
214 | | - INDEX {_quote_identifier(table_name + '_ek')} (entity_key))""" |
| 214 | + INDEX {_quote_identifier(table_name + "_ek")} (entity_key))""" |
215 | 215 | ) |
216 | 216 |
|
217 | 217 | for table in tables_to_delete: |
@@ -313,16 +313,22 @@ def _quote_identifier(identifier: str) -> str: |
313 | 313 | return f"`{escaped}`" |
314 | 314 |
|
315 | 315 |
|
316 | | -def _drop_discovered_versioned_tables(cur: Cursor, project: str, table: FeatureView) -> None: |
| 316 | +def _drop_discovered_versioned_tables( |
| 317 | + cur: Cursor, project: str, table: FeatureView |
| 318 | +) -> None: |
317 | 319 | base_table_name = online_store_table_id(project, table, enable_versioning=False) |
318 | | - like_pattern = f"{base_table_name}_v%" |
| 320 | + escaped_base_table_name = base_table_name.replace("\\", "\\\\") |
| 321 | + escaped_base_table_name = escaped_base_table_name.replace("%", "\\%") |
| 322 | + escaped_base_table_name = escaped_base_table_name.replace("_", "\\_") |
| 323 | + like_pattern = f"{escaped_base_table_name}\\_v%" |
319 | 324 | try: |
320 | | - cur.execute("SHOW TABLES LIKE %s", (like_pattern,)) |
| 325 | + cur.execute("SHOW TABLES LIKE %s ESCAPE '\\\\'", (like_pattern,)) |
321 | 326 | rows = cur.fetchall() or [] |
322 | 327 | for row in rows: |
323 | 328 | table_name = row[0] |
| 329 | + index_name = f"{table_name}_ek" |
324 | 330 | cur.execute( |
325 | | - f"DROP INDEX IF EXISTS {_quote_identifier(table_name + '_ek')} ON {_quote_identifier(table_name)};" |
| 331 | + f"DROP INDEX IF EXISTS {_quote_identifier(index_name)} ON {_quote_identifier(table_name)};" |
326 | 332 | ) |
327 | 333 | cur.execute(f"DROP TABLE IF EXISTS {_quote_identifier(table_name)}") |
328 | 334 | except Exception as e: |
|
0 commit comments