Complete API documentation for pycubrid — a pure Python DB-API 2.0 driver for CUBRID.
- Module-Level Attributes
- Module-Level Constructor
- Async Module Constructor
- Connection Class
- Cursor Class
- AsyncConnection Class
- AsyncCursor Class
- Lob Class
- TimingStats Class
- Exception Hierarchy
- Type Objects
- Type Constructors
These attributes are defined at the module level as required by PEP 249.
| Attribute | Value | Description |
|---|---|---|
apilevel |
"2.0" |
DB-API specification version |
threadsafety |
1 |
Threads may share the module but not connections |
paramstyle |
"qmark" |
Question mark parameter style: WHERE name = ? |
__version__ |
"1.3.0" |
Package version string |
import pycubrid
print(pycubrid.apilevel) # "2.0"
print(pycubrid.threadsafety) # 1
print(pycubrid.paramstyle) # "qmark"
print(pycubrid.__version__) # "1.3.0"def connect(
host: str = "localhost",
port: int = 33000,
database: str = "",
user: str = "dba",
password: str = "",
decode_collections: bool = False,
json_deserializer: Any = None,
ssl: bool | ssl_module.SSLContext | None = None,
**kwargs: Any,
) -> ConnectionCreate a new database connection.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
CUBRID server hostname or IP address |
port |
int |
33000 |
CUBRID broker port |
database |
str |
"" |
Database name |
user |
str |
"dba" |
Database user |
password |
str |
"" |
Database password |
decode_collections |
bool |
False |
Decode SET/MULTISET/SEQUENCE columns into Python collections |
json_deserializer |
Any |
None |
Callable used to decode JSON columns on fetch; when unset JSON is returned as str |
ssl |
bool | ssl_module.SSLContext | None |
None |
Opt-in TLS for sync broker connections |
**kwargs |
Any |
— | Additional parameters such as connect_timeout, read_timeout, fetch_size, enable_timing, no_backslash_escapes, and autocommit |
When False (the default), collection columns are returned as raw CAS wire bytes for backward
compatibility. When True, pycubrid decodes supported SET, MULTISET, and SEQUENCE payloads
into Python containers.
json_deserializer controls JSON column decoding on fetch:
| Setting | Result |
|---|---|
None (default) |
Return JSON columns as str |
callable |
Pass the raw JSON string to the callable and return its result |
Returns: A new Connection instance.
Raises: OperationalError if the connection cannot be established.
import pycubrid
# Minimal connection
conn = pycubrid.connect(database="testdb")
# Full connection with timeout
conn = pycubrid.connect(
host="192.168.1.100",
port=33000,
database="production",
user="app_user",
password="secret",
connect_timeout=5.0,
)async def connect(
host: str = "localhost",
port: int = 33000,
database: str = "",
user: str = "dba",
password: str = "",
decode_collections: bool = False,
json_deserializer: Any = None,
ssl: bool | ssl_module.SSLContext | None = None,
**kwargs: Any,
) -> AsyncConnectionCreate and open an async connection.
- Returns a connected
AsyncConnection. - Accepts the same collection / JSON decoding kwargs as
pycubrid.connect(). - Supports
autocommit=Trueviaawait conn.set_autocommit(True)during construction. - Provides a similar async surface to the sync API, including
await conn.ping(reconnect=...);create_lob()remains sync-only, and auto-commit changes go throughawait conn.set_autocommit(...)instead of a property setter. - Async TLS is not yet supported; passing
ssl=Trueor anSSLContextraisesNotSupportedError.
import asyncio
import pycubrid.aio
async def main() -> None:
conn = await pycubrid.aio.connect(database="testdb")
cur = conn.cursor()
await cur.execute("SELECT 1")
print(await cur.fetchone())
await cur.close()
await conn.close()
asyncio.run(main())pycubrid.connection.Connection
Represents a single connection to a CUBRID database via the CAS broker protocol.
class Connection:
def __init__(
self,
host: str,
port: int,
database: str,
user: str,
password: str,
autocommit: bool = False,
**kwargs: Any,
) -> NoneNote: Do not instantiate
Connectiondirectly. Usepycubrid.connect()instead.
Selected **kwargs:
| Name | Type | Default | Description |
|---|---|---|---|
enable_timing |
bool | None |
None |
Enable optional driver-level timing instrumentation for connect, execute, fetch, and close operations. When None, falls back to the PYCUBRID_ENABLE_TIMING environment variable (truthy values: 1, true, yes, case-insensitive). When disabled, the timing module is not imported and connection.timing_stats is None (zero overhead). See Timing & Profiling Hooks. |
ssl |
bool | ssl.SSLContext | None |
None |
TLS configuration for sync connections. See Connection guide. |
read_timeout |
float | None |
None |
Socket read timeout in seconds. |
fetch_size |
int |
100 |
Server-side fetch batch size. |
json_deserializer |
Callable[[str], Any] | None |
None |
Opt-in JSON column decoder. |
decode_collections |
bool |
False |
Decode SET/MULTISET/SEQUENCE columns into Python collections. |
def connect(self) -> NoneEstablish a TCP CAS session with broker handshake and open the database. Called automatically by the constructor. Calling on an already-connected instance is a no-op.
Raises: OperationalError on network failure.
def close(self) -> NoneClose the connection and all tracked cursors. Sends a CloseDatabasePacket to the server. Calling on an already-closed connection is a no-op. After close(), any further method calls will raise InterfaceError.
conn = pycubrid.connect(database="testdb")
# ... work ...
conn.close() # Connection and all cursors are closeddef commit(self) -> NoneCommit the current transaction. Sends a CommitPacket to the server.
Raises: InterfaceError if the connection is closed.
def rollback(self) -> NoneRoll back the current transaction. Sends a RollbackPacket to the server.
Raises: InterfaceError if the connection is closed.
def cursor(self) -> CursorCreate and return a new Cursor bound to this connection. The cursor is tracked by the connection and will be closed when the connection closes.
Returns: A new Cursor instance.
Raises: InterfaceError if the connection is closed.
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
cur.execute("SELECT 1 + 1")
print(cur.fetchone()) # (2,)
cur.close()def get_server_version(self) -> strReturn the server engine version string (e.g., "11.2.0.0378").
conn = pycubrid.connect(database="testdb")
print(conn.get_server_version()) # "11.2.0.0378"def get_last_insert_id(self) -> strReturn the last auto-increment value generated by an INSERT statement, as a string.
cur.execute("INSERT INTO users (name) VALUES ('alice')")
conn.commit()
print(conn.get_last_insert_id()) # "1"def ping(self, reconnect: bool = True) -> boolPerform a lightweight CHECK_CAS health check without executing SQL.
- Returns
Truewhen the CAS connection is alive. - When
reconnect=True, attempts reconnection before returningFalse.
if not conn.ping():
raise RuntimeError("database connection is unavailable")def create_lob(self, lob_type: int) -> LobCreate a new LOB (Large Object) on the server.
Parameters:
| Parameter | Type | Description |
|---|---|---|
lob_type |
int |
LOB type code: 23 for BLOB, 24 for CLOB |
Returns: A new Lob instance.
from pycubrid.constants import CUBRIDDataType
lob = conn.create_lob(CUBRIDDataType.CLOB) # 24
lob.write(b"Hello, CUBRID!")Important:
Lobobjects cannot be passed as query parameters. Insert strings/bytes directly into CLOB/BLOB columns instead. See TYPES.md for details.
def get_schema_info(
self,
schema_type: int,
table_name: str = "",
pattern_match_flag: int = 1,
) -> GetSchemaPacketQuery schema information from the server.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
schema_type |
int |
— | Schema type code (see CCISchemaType) |
table_name |
str |
"" |
Table name filter |
pattern_match_flag |
int |
1 |
Pattern match flag |
Returns: A GetSchemaPacket with query_handle and tuple_count attributes.
from pycubrid.constants import CCISchemaType
packet = conn.get_schema_info(CCISchemaType.CLASS)
print(f"Found {packet.tuple_count} tables")Available CCISchemaType values:
| Code | Name | Description |
|---|---|---|
| 1 | CLASS |
Tables |
| 2 | VCLASS |
Views |
| 4 | ATTRIBUTE |
Columns |
| 11 | CONSTRAINT |
Constraints |
| 16 | PRIMARY_KEY |
Primary keys |
| 17 | IMPORTED_KEYS |
Foreign keys (imported) |
| 18 | EXPORTED_KEYS |
Foreign keys (exported) |
@property
def autocommit(self) -> bool
@autocommit.setter
def autocommit(self, value: bool) -> NoneGet or set the auto-commit mode. When enabled, each statement is committed immediately. Setting this property sends a SetDbParameterPacket and CommitPacket to flush the transaction state on the server.
conn = pycubrid.connect(database="testdb")
print(conn.autocommit) # False
conn.autocommit = True
# Statements now auto-commit@property
def timing_stats(self) -> TimingStats | NoneReturn the per-connection TimingStats accumulator when the connection
was opened with enable_timing=True (or with PYCUBRID_ENABLE_TIMING set). Returns None
when timing is disabled — in which case the timing module is never imported and the hot
path incurs no overhead.
import pycubrid
conn = pycubrid.connect(database="testdb", enable_timing=True)
cur = conn.cursor()
cur.execute("SELECT 1")
cur.fetchall()
stats = conn.timing_stats
assert stats is not None
print(stats)
# TimingStats(connect=1 calls, 12.345ms total, 12.345ms avg,
# execute=1 calls, 0.987ms total, 0.987ms avg,
# fetch=1 calls, 0.123ms total, 0.123ms avg,
# close=0 calls)
print(stats.execute_count, stats.execute_total_ns) # 1 987000
stats.reset() # clear all countersSee Timing & Profiling Hooks for the full guide.
Connection implements the context manager protocol (__enter__ / __exit__).
- On successful exit: calls
commit()thenclose(). - On exception: calls
rollback()thenclose().
with pycubrid.connect(database="testdb") as conn:
cur = conn.cursor()
cur.execute("INSERT INTO users (name) VALUES ('bob')")
# Auto-commits on exit
# conn is closed herepycubrid.cursor.Cursor
Represents a database cursor for executing SQL statements and fetching results.
class Cursor:
def __init__(self, connection: Connection) -> NoneNote: Do not instantiate
Cursordirectly. Useconnection.cursor()instead.
def execute(
self,
operation: str,
parameters: Sequence[Any] | None = None,
) -> CursorPrepare and execute a SQL statement.
Parameters:
| Parameter | Type | Description |
|---|---|---|
operation |
str |
SQL statement with optional ? placeholders |
parameters |
Sequence or None |
Non-string positional parameter values to bind |
Returns: The cursor itself (for chaining).
Raises:
InterfaceErrorif the cursor is closedProgrammingErroron SQL errors or parameter mismatch
# Simple query
cur.execute("SELECT * FROM users")
# Parameterized query (qmark style)
cur.execute("SELECT * FROM users WHERE age > ?", [21])
# INSERT with parameters
cur.execute("INSERT INTO users (name, age) VALUES (?, ?)", ["alice", 30])Important: mappings such as
dictare not supported for parameter binding. Passing a mapping raisesProgrammingError("parameters must be a sequence").
Supported parameter types:
| Python Type | SQL Literal |
|---|---|
None |
NULL |
bool |
1 / 0 |
str |
'escaped' |
bytes |
X'hex' |
int, float |
Numeric literal |
Decimal |
Numeric literal |
datetime.date |
DATE'YYYY-MM-DD' |
datetime.time |
TIME'HH:MM:SS' |
datetime.datetime |
DATETIME'YYYY-MM-DD HH:MM:SS.mmm' |
def executemany(
self,
operation: str,
seq_of_parameters: Sequence[Sequence[Any]],
) -> CursorExecute the same SQL statement repeatedly with different parameter sets. Each element must be a
non-string sequence. For non-SELECT statements, rowcount is set to the cumulative total of
affected rows.
data = [("alice", 30), ("bob", 25), ("carol", 28)]
cur.executemany("INSERT INTO users (name, age) VALUES (?, ?)", data)
print(cur.rowcount) # 3def nextset(self) -> NoneDB-API compatibility method. pycubrid does not expose multiple server result sets through this
interface, so nextset() always returns None after checking that the cursor is still open.
assert cur.nextset() is Nonedef executemany_batch(
self,
sql_list: list[str],
auto_commit: bool | None = None,
) -> list[tuple[int, int]]Execute multiple distinct SQL statements in a single batch request to the server. This is more efficient than calling execute() in a loop when the SQL statements differ.
Parameters:
| Parameter | Type | Description |
|---|---|---|
sql_list |
list[str] |
List of complete SQL statements |
auto_commit |
bool or None |
Override auto-commit for this batch (default: connection setting) |
Returns: List of (statement_type, result_count) tuples.
results = cur.executemany_batch([
"CREATE TABLE t1 (id INT AUTO_INCREMENT PRIMARY KEY, val VARCHAR(50))",
"INSERT INTO t1 (val) VALUES ('hello')",
"INSERT INTO t1 (val) VALUES ('world')",
])
# results: [(4, 0), (20, 1), (20, 1)]
# statement_type 4 = CREATE_CLASS, 20 = INSERTNote:
executemany_batchis a pycubrid extension, not part of PEP 249.
def fetchone(self) -> tuple[Any, ...] | NoneFetch the next row of a query result set. Returns None when no more rows are available. Automatically fetches more rows from the server when the local buffer is exhausted (100 rows per fetch).
cur.execute("SELECT name, age FROM users")
row = cur.fetchone()
if row:
name, age = rowdef fetchmany(self, size: int | None = None) -> list[tuple[Any, ...]]Fetch the next size rows. Defaults to cursor.arraysize if size is not specified.
cur.execute("SELECT * FROM users")
batch = cur.fetchmany(10) # Up to 10 rowsdef fetchall(self) -> list[tuple[Any, ...]]Fetch all remaining rows of a query result. Returns an empty list if no rows remain.
cur.execute("SELECT * FROM users")
all_rows = cur.fetchall()
for row in all_rows:
print(row)def callproc(
self,
procname: str,
parameters: Sequence[Any] = (),
) -> Sequence[Any]Call a stored procedure. Constructs and executes a CALL procname(?, ?, ...) statement.
Returns: The original parameters sequence (as per PEP 249).
cur.callproc("my_procedure", [1, "hello"])def setinputsizes(self, sizes: Any) -> NoneDB-API no-op. Accepted for compatibility.
def setoutputsize(self, size: int, column: int | None = None) -> NoneDB-API no-op. Accepted for compatibility.
def close(self) -> NoneClose the cursor and release the active query handle. Calling on an already-closed cursor is a no-op.
@property
def description(self) -> tuple[DescriptionItem, ...] | NoneReturn result-set metadata for the last executed statement, or None if no query has been executed or the last statement did not return rows.
Each item is a 7-tuple:
(name, type_code, display_size, internal_size, precision, scale, null_ok)
# str int None None int int bool| Index | Field | Type | Description |
|---|---|---|---|
| 0 | name |
str |
Column name |
| 1 | type_code |
int |
CUBRID data type code (see CUBRIDDataType) |
| 2 | display_size |
None |
Not used |
| 3 | internal_size |
None |
Not used |
| 4 | precision |
int |
Column precision |
| 5 | scale |
int |
Column scale |
| 6 | null_ok |
bool |
Whether the column is nullable |
cur.execute("SELECT name, age FROM users")
for col in cur.description:
print(f"{col[0]}: type={col[1]}, precision={col[4]}, nullable={col[6]}")@property
def rowcount(self) -> intNumber of rows affected by the last execute() call. Returns -1 for SELECT statements or if no statement has been executed.
@property
def lastrowid(self) -> int | NoneLast generated auto-increment identifier for an INSERT statement. None if no INSERT has been executed or the table has no auto-increment column.
cur.execute("INSERT INTO users (name) VALUES ('alice')")
print(cur.lastrowid) # e.g., 1@property
def arraysize(self) -> int
@arraysize.setter
def arraysize(self, value: int) -> NoneDefault number of rows for fetchmany(). Defaults to 1.
Raises: ProgrammingError if set to a value less than 1.
Cursor implements the iterator protocol, allowing you to iterate directly over result rows:
cur.execute("SELECT name, age FROM users")
for name, age in cur:
print(f"{name} is {age} years old")Calls fetchone() internally. Raises StopIteration when no more rows.
with conn.cursor() as cur:
cur.execute("SELECT 1")
print(cur.fetchone())
# cur is automatically closedpycubrid.aio.connection.AsyncConnection
Async counterpart to Connection for use with asyncio, with a similar surface rather than full method-for-method parity.
| Method | Signature | Notes |
|---|---|---|
cursor() |
def cursor(self) -> AsyncCursor |
Returns an async cursor immediately; do not await it |
commit() |
async def commit(self) -> None |
Commit current transaction |
rollback() |
async def rollback(self) -> None |
Roll back current transaction |
close() |
async def close(self) -> None |
Close connection and tracked cursors |
ping() |
async def ping(self, reconnect: bool = True) -> bool |
Native CHECK_CAS health check with optional reconnect |
get_server_version() |
async def get_server_version(self) -> str |
Fetch engine version |
get_last_insert_id() |
async def get_last_insert_id(self) -> str |
Fetch last AUTO_INCREMENT value |
get_schema_info() |
async def get_schema_info(...) -> GetSchemaPacket |
Returns the parsed packet object |
set_autocommit() |
async def set_autocommit(self, value: bool) -> None |
Sends SetDbParameterPacket and CommitPacket |
AsyncConnection exposes async ping() parity with sync Connection.ping(). create_lob() remains sync-only.
async with await pycubrid.aio.connect(database="testdb") as conn:
cur = conn.cursor()
await cur.execute("INSERT INTO logs (msg) VALUES (?)", ["ok"])
await conn.set_autocommit(True)
await cur.close()AsyncConnection.autocommit is read-only; use await conn.set_autocommit(True) to change it.
Like the sync setter, this sends both SetDbParameterPacket and CommitPacket.
async def ping(self, reconnect: bool = True) -> boolPerform a lightweight native CHECK_CAS health check without executing SQL.
- Returns
Truewhen the CAS connection is alive. - Always issues the native
CHECK_CASround-trip when the socket is open, regardless of the broker's transaction status (CAS_INFO). - When
reconnect=False, suppresses the implicit broker-handoff reconnect that normally fires onCAS_INFO=INACTIVE; returnsFalseonly if the socket is closed orCHECK_CASitself fails. - When
reconnect=True, attempts close + reconnect on socket/protocol failure before returningFalse.
if not await conn.ping(reconnect=False):
await conn.ping(reconnect=True)pycubrid.aio.cursor.AsyncCursor
Async counterpart to Cursor.
| Method | Signature | Notes |
|---|---|---|
execute() |
async def execute(self, operation: str, parameters: Sequence[Any] | None = None) -> AsyncCursor |
Sequence-only parameter binding, same rules as sync cursor |
executemany() |
async def executemany(self, operation: str, seq_of_parameters: Sequence[Sequence[Any]]) -> AsyncCursor |
Batch path for non-SELECT statements |
fetchone() |
async def fetchone(self) -> tuple[Any, ...] | None |
Returns one row or None |
fetchmany() |
async def fetchmany(self, size: int | None = None) -> list[tuple[Any, ...]] |
Uses arraysize by default |
fetchall() |
async def fetchall(self) -> list[tuple[Any, ...]] |
Returns remaining rows |
nextset() |
async def nextset(self) -> None |
DB-API compatibility method; always returns None |
close() |
async def close(self) -> None |
Releases active query handle |
cur = conn.cursor() # not awaitable
await cur.execute("SELECT * FROM users WHERE age > ?", [21])
rows = await cur.fetchall()
assert await cur.nextset() is Nonepycubrid.lob.Lob
Represents a CUBRID Large Object (BLOB or CLOB).
@classmethod
def create(cls, connection: Connection, lob_type: int) -> LobCreate a new LOB object on the server. Prefer using connection.create_lob() instead.
Parameters:
| Parameter | Type | Description |
|---|---|---|
connection |
Connection |
Active database connection |
lob_type |
int |
CUBRIDDataType.BLOB (23) or CUBRIDDataType.CLOB (24) |
Raises: ValueError if lob_type is not BLOB or CLOB.
def write(self, data: bytes, offset: int = 0) -> intWrite bytes to the LOB starting from offset.
Returns: Number of bytes written.
def read(self, length: int, offset: int = 0) -> bytesRead up to length bytes from the LOB starting from offset.
Returns: The read bytes.
@property
def lob_handle(self) -> bytesThe raw LOB handle bytes used for server communication.
@property
def lob_type(self) -> intThe LOB type code (23 = BLOB, 24 = CLOB).
LOB columns return a dict on fetch, not a Lob object:
cur.execute("SELECT clob_col FROM my_table")
row = cur.fetchone()
lob_info = row[0]
# {'lob_type': 24, 'lob_length': 1234, 'file_locator': '...', 'packed_lob_handle': b'...'}To insert LOB data, pass strings/bytes directly:
cur.execute("INSERT INTO my_table (clob_col) VALUES (?)", ["large text content"])pycubrid.timing.TimingStats — also re-exported as pycubrid.TimingStats.
Per-connection accumulator for optional driver-level timing instrumentation. Instances are
created automatically when a connection is opened with enable_timing=True (or with
PYCUBRID_ENABLE_TIMING set) and are exposed via Connection.timing_stats.
When to use: quick in-process diagnosis of where wall-clock time is going (connect vs execute vs fetch vs close) without reaching for cProfile or external profilers. For deep hot-path analysis, prefer the standalone profiling scripts in Performance Investigation.
All counters are plain int attributes. Durations are nanoseconds captured via
time.perf_counter_ns(). Updates are serialised by an internal threading.Lock so the
object is safe to read from a thread other than the one driving the connection.
| Attribute | Type | Description |
|---|---|---|
connect_count / connect_total_ns |
int |
Total number of Connection.connect() calls and cumulative elapsed time. Recorded even on failure. |
execute_count / execute_total_ns |
int |
Cursor.execute() and executemany() calls and cumulative elapsed time. |
fetch_count / fetch_total_ns |
int |
fetchone() / fetchmany() / fetchall() calls combined and cumulative elapsed time. |
close_count / close_total_ns |
int |
Connection.close() calls and cumulative elapsed time. |
def reset(self) -> NoneReset every counter to zero atomically. Useful for measuring a specific code section after a warm-up phase.
Returns a human-readable summary, e.g.:
TimingStats(connect=1 calls, 12.345ms total, 12.345ms avg,
execute=10 calls, 4.200ms total, 0.420ms avg,
fetch=10 calls, 1.500ms total, 0.150ms avg,
close=0 calls)
pycubrid.aio.AsyncConnection accepts the same enable_timing keyword and exposes the
same timing_stats property with identical semantics. Async durations include event-loop
scheduling latency — treat them as end-to-end client-side latency, not pure server time.
pycubrid implements the full PEP 249 exception hierarchy:
graph TD
exception[Exception]
warning[Warning]
error[Error]
interface[InterfaceError]
database[DatabaseError]
data[DataError]
operational[OperationalError]
integrity[IntegrityError]
internal[InternalError]
programming[ProgrammingError]
notsupported[NotSupportedError]
exception --> warning
exception --> error
error --> interface
error --> database
database --> data
database --> operational
database --> integrity
database --> internal
database --> programming
database --> notsupported
All exceptions have msg (str) and code (int) attributes. DatabaseError and its subclasses additionally have errno (int | None) and sqlstate (str | None).
class Warning(Exception):
def __init__(self, msg: str = "", code: int = 0) -> NoneRaised for important warnings (e.g., data truncation during insertion).
class Error(Exception):
def __init__(self, msg: str = "", code: int = 0) -> NoneBase class for all pycubrid errors.
class InterfaceError(Error)Raised for errors related to the database interface — calling methods on closed connections/cursors, invalid arguments, etc.
class DatabaseError(Error):
def __init__(
self,
msg: str = "",
code: int = 0,
errno: int | None = None,
sqlstate: str | None = None,
) -> NoneBase class for database-side errors. Includes errno and sqlstate for server-reported error details.
class DataError(DatabaseError)Raised for data processing problems (division by zero, numeric overflow, etc.).
class OperationalError(DatabaseError)Raised for database operation errors (unexpected disconnect, memory errors, transaction failures, connection lost).
class IntegrityError(DatabaseError)Raised when relational integrity is affected (foreign key violation, duplicate key, constraint violation).
class InternalError(DatabaseError)Raised for internal database errors (invalid cursor state, out-of-sync transactions).
class ProgrammingError(DatabaseError)Raised for programming errors (SQL syntax errors, table not found, wrong number of parameters, unsupported parameter types).
class NotSupportedError(DatabaseError)Raised when an unsupported method or API is called.
pycubrid automatically classifies server errors based on the error message:
| Keywords in error message | Exception raised |
|---|---|
unique, duplicate, foreign key, constraint violation |
IntegrityError |
syntax, unknown class, does not exist, not found |
ProgrammingError |
| All others | DatabaseError |
PEP 249 type objects for comparing cursor.description type codes:
| Type Object | Description | Matching CUBRID types |
|---|---|---|
STRING |
String types | CHAR, STRING, NCHAR, VARNCHAR, ENUM, CLOB, JSON |
BINARY |
Binary types | BIT, VARBIT, BLOB |
NUMBER |
Numeric types | SHORT, INT, BIGINT, FLOAT, DOUBLE, NUMERIC, MONETARY |
DATETIME |
Date/time types | DATE, TIME, DATETIME, TIMESTAMP |
ROWID |
Row ID types | OBJECT |
import pycubrid
cur.execute("SELECT name FROM users")
type_code = cur.description[0][1]
if type_code == pycubrid.STRING:
print("It's a string column")PEP 249 constructor functions:
| Constructor | Return Type | Description |
|---|---|---|
Date(y, m, d) |
datetime.date |
Construct a date |
Time(h, m, s) |
datetime.time |
Construct a time |
Timestamp(y,m,d,h,m,s) |
datetime.datetime |
Construct a timestamp |
DateFromTicks(t) |
datetime.date |
Date from Unix ticks |
TimeFromTicks(t) |
datetime.time |
Time from Unix ticks |
TimestampFromTicks(t) |
datetime.datetime |
Timestamp from Unix ticks |
Binary(s) |
bytes |
Binary string from bytes |
import pycubrid
d = pycubrid.Date(2025, 1, 15)
t = pycubrid.Time(14, 30, 0)
ts = pycubrid.Timestamp(2025, 1, 15, 14, 30, 0)
b = pycubrid.Binary(b"\x00\x01\x02")