Practical SQLAlchemy 2.0 ORM patterns for CUBRID. This guide covers table definitions,
relationships, CRUD operations, and CUBRID-specific DML extensions using the modern
DeclarativeBase / mapped_column API.
For connection setup, see CONNECTION.md. For type mapping details, see TYPES.md. For DML extensions reference, see DML_EXTENSIONS.md.
- Quick Setup
- Table Definitions
- Basic CRUD
- Relationships
- ON DUPLICATE KEY UPDATE
- MERGE Statement
- REPLACE INTO
- Collection Types
- Working with LOBs
- Eager Loading
- Hybrid Properties
- CUBRID-Specific Gotchas
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("cubrid://dba:password@localhost:33000/demodb")Use DeclarativeBase and mapped_column (SQLAlchemy 2.0 style).
from __future__ import annotations
from datetime import datetime
from sqlalchemy import ForeignKey, String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy_cubrid import CLOB, MONETARY, SET, SMALLINT, STRING
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
bio: Mapped[str | None] = mapped_column(CLOB, default=None)
# CUBRID has no native BOOLEAN — use SMALLINT (0/1)
is_active: Mapped[int] = mapped_column(SMALLINT, default=1)
created_at: Mapped[datetime] = mapped_column(server_default=func.sysdate())
posts: Mapped[list[Post]] = relationship(back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200))
body: Mapped[str] = mapped_column(STRING)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped[User] = relationship(back_populates="posts")
tags: Mapped[list[Tag]] = relationship(secondary="post_tags", back_populates="posts")from sqlalchemy import Column, Integer, MetaData, Table
from sqlalchemy_cubrid import BIT, BLOB, MONETARY, MULTISET, OBJECT, SEQUENCE, SET
metadata = MetaData()
products = Table(
"products",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("price", MONETARY),
Column("colors", SET("red", "green", "blue")),
Column("sizes", MULTISET("S", "M", "L", "XL")),
Column("image", BLOB),
Column("flags", BIT(8)),
)from sqlalchemy.orm import Session
with Session(engine) as session:
user = User(name="Alice", email="[email protected]")
session.add(user)
session.commit()
# Access auto-generated ID (CUBRID uses AUTO_INCREMENT)
print(user.id)from sqlalchemy import select
with Session(engine) as session:
# Single row
stmt = select(User).where(User.email == "[email protected]")
user = session.scalars(stmt).one()
# All rows with filtering
stmt = select(User).where(User.is_active == 1).order_by(User.name)
users = session.scalars(stmt).all()with Session(engine) as session:
user = session.get(User, 1)
user.name = "Alice Updated"
session.commit()with Session(engine) as session:
user = session.get(User, 1)
session.delete(user)
session.commit()with Session(engine) as session:
user = User(name="Bob", email="[email protected]")
user.posts.append(Post(title="First Post", body="Hello world"))
user.posts.append(Post(title="Second Post", body="More content"))
session.add(user)
session.commit()
# Query with relationship
stmt = select(User).where(User.name == "Bob")
bob = session.scalars(stmt).one()
for post in bob.posts:
print(f"{post.title} by {post.author.name}")class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list[Post]] = relationship(secondary="post_tags", back_populates="tags")
# Association table
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)with Session(engine) as session:
tag = Tag(name="python")
post = session.get(Post, 1)
post.tags.append(tag)
session.commit()Note: CUBRID has no
RETURNINGclause. SQLAlchemy usesSELECT LAST_INSERT_ID()to retrieve auto-generated primary keys after INSERT. This works transparently — the ORM handles it via the dialect'spostfetch_lastrowidmechanism.
Use sqlalchemy_cubrid.insert() (not SQLAlchemy's built-in insert()).
from sqlalchemy_cubrid import insert
with Session(engine) as session:
stmt = insert(User).values(
id=1,
name="Alice",
email="[email protected]",
).on_duplicate_key_update(
name="Alice Updated",
email="[email protected]",
)
session.execute(stmt)
session.commit()stmt = insert(User).values(id=1, name="Alice", email="[email protected]")
stmt = stmt.on_duplicate_key_update(
name=stmt.inserted.name, # Use the value being inserted
)Note:
stmt.inserted.<column>is the supported way to renderVALUES(column)in CUBRID ODKU clauses.
Upsert with full WHEN MATCHED / WHEN NOT MATCHED control.
from sqlalchemy import select
from sqlalchemy_cubrid import merge
with Session(engine) as session:
source = select(User).where(User.is_active == 1).subquery()
stmt = (
merge(Post.__table__)
.using(source)
.on(Post.__table__.c.author_id == source.c.id)
.when_matched_then_update({"title": source.c.name})
.when_not_matched_then_insert({
"title": source.c.name,
"body": "Auto-created",
"author_id": source.c.id,
})
)
session.execute(stmt)
session.commit()Deletes existing row if primary key conflicts, then inserts. Use with caution — it deletes and re-inserts (unlike ODKU which updates in place).
from sqlalchemy_cubrid import replace
with Session(engine) as session:
stmt = replace(User.__table__).values(
id=1,
name="Alice Replaced",
email="[email protected]",
)
session.execute(stmt)
session.commit()CUBRID provides SET, MULTISET, and SEQUENCE collection types.
from sqlalchemy import Column, Integer, MetaData, Table
from sqlalchemy_cubrid import MULTISET, SEQUENCE, SET
metadata = MetaData()
inventory = Table(
"inventory",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("colors", SET("VARCHAR")), # Unordered, unique elements
Column("sizes", MULTISET("VARCHAR")), # Unordered, duplicates allowed
Column("history", SEQUENCE("VARCHAR")), # Ordered, duplicates allowed
)CREATE TABLE inventory (
id INTEGER AUTO_INCREMENT NOT NULL,
colors SET(VARCHAR),
sizes MULTISET(VARCHAR),
history SEQUENCE(VARCHAR),
PRIMARY KEY (id)
)from sqlalchemy_cubrid import CLOB
class Article(Base):
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(CLOB) # Large text contentfrom sqlalchemy_cubrid import BLOB
class Attachment(Base):
__tablename__ = "attachments"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
filename: Mapped[str] = mapped_column(String(255))
data: Mapped[bytes] = mapped_column(BLOB)with Session(engine) as session:
with open("photo.jpg", "rb") as f:
attachment = Attachment(filename="photo.jpg", data=f.read())
session.add(attachment)
session.commit()Avoid N+1 queries with joinedload and selectinload.
from sqlalchemy.orm import joinedload, selectinload
with Session(engine) as session:
# Joined load — single query with JOIN
stmt = select(User).options(joinedload(User.posts)).where(User.id == 1)
user = session.scalars(stmt).unique().one()
# Select-in load — separate SELECT with IN clause (better for collections)
stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).unique().all()
# Nested eager loading
stmt = (
select(User)
.options(selectinload(User.posts).selectinload(Post.tags))
.where(User.is_active == 1)
)
users = session.scalars(stmt).unique().all()Computed properties that work both in Python and SQL.
from sqlalchemy.ext.hybrid import hybrid_property
class Product(Base):
__tablename__ = "products_v2"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
price: Mapped[float] = mapped_column()
tax_rate: Mapped[float] = mapped_column(default=0.1)
@hybrid_property
def price_with_tax(self) -> float:
return self.price * (1 + self.tax_rate)
@price_with_tax.inplace.expression
@classmethod
def _price_with_tax_expression(cls):
return cls.price * (1 + cls.tax_rate)with Session(engine) as session:
# Works in Python
product = session.get(Product, 1)
print(product.price_with_tax) # Computed in Python
# Works in SQL queries
stmt = select(Product).where(Product.price_with_tax > 100)
expensive = session.scalars(stmt).all()CUBRID does not support INSERT ... RETURNING or UPDATE ... RETURNING.
The ORM retrieves auto-generated keys via SELECT LAST_INSERT_ID() automatically.
Impact: Bulk inserts with insert().returning() are not available. Use standard
session.add_all() or insert().values([...]) without returning.
CUBRID maps Boolean to SMALLINT (0/1). Use integer values in queries:
# Correct
stmt = select(User).where(User.is_active == 1)
# Also works — SQLAlchemy handles the conversion
stmt = select(User).where(User.is_active == True) # noqa: E712CUBRID 10.2+ supports a native JSON column type. Since sqlalchemy-cubrid v1.2.0, JSON is fully mapped:
from sqlalchemy_cubrid import JSON
class Config(Base):
__tablename__ = "configs"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
data = mapped_column(JSON) # Native JSON column
# Path access compiles to JSON_EXTRACT:
session.query(Config).filter(Config.data["key"] == "value")CUBRID < 10.2 workaround: Store JSON as
STRING/VARCHARand serialize/deserialize in Python. See Troubleshooting.
CUBRID uses SET, MULTISET, and SEQUENCE instead of SQL ARRAY:
| Type | Ordered | Duplicates | Use Case |
|---|---|---|---|
SET |
✗ | ✗ | Unique tags, categories |
MULTISET |
✗ | ✓ | Counts, repeated values |
SEQUENCE |
✓ | ✓ | Ordered lists, history |
CUBRID implicitly commits all DDL statements (CREATE TABLE, ALTER TABLE, etc.).
This means Base.metadata.create_all(engine) commits immediately — it cannot be
rolled back. The Alembic integration sets transactional_ddl = False accordingly.
CUBRID does not support CREATE TEMPORARY TABLE. Use regular tables with
a cleanup strategy if you need temporary storage.
CUBRID folds identifiers to lowercase (unlike the SQL standard which uses uppercase). Quoted identifiers preserve case but are rarely needed.
from sqlalchemy import func
from sqlalchemy_cubrid import insert
with Session(engine) as session:
stmt = insert(User.__table__).values(id=1, name="Alice", email="[email protected]")
stmt = stmt.on_duplicate_key_update(
name=stmt.inserted.name,
updated_at=func.current_datetime(),
)
session.execute(stmt)
session.commit()BATCH = 500
with Session(engine) as session:
for i in range(0, len(payload_rows), BATCH):
chunk = payload_rows[i : i + BATCH]
session.add_all(User(name=row["name"], email=row["email"]) for row in chunk)
session.flush() # keep memory bounded
session.commit()from sqlalchemy import select
with Session(engine) as session:
account = session.scalars(
select(Account).where(Account.id == 10).with_for_update()
).one()
account.balance -= 100
session.commit()!!! warning "Avoid RETURNING-based ORM patterns"
CUBRID does not support RETURNING. Prefer flush() + mapped identity values.
!!! warning "Use CUBRID extension APIs for upsert"
For ODKU behavior, use sqlalchemy_cubrid.insert() rather than SQLAlchemy's generic insert().
!!! tip "Prefer selectinload for large collections"
joinedload can multiply rows significantly on one-to-many joins.
See also: Feature Support Matrix for a complete comparison with MySQL, PostgreSQL, and SQLite.