-
Notifications
You must be signed in to change notification settings - Fork 609
Expand file tree
/
Copy pathclickhouse_driver.py
More file actions
172 lines (125 loc) · 5.83 KB
/
clickhouse_driver.py
File metadata and controls
172 lines (125 loc) · 5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
from sentry_sdk.tracing import Span
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
from typing import TYPE_CHECKING, TypeVar
# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Any, ParamSpec, Callable
else:
# Fake ParamSpec
class ParamSpec:
def __init__(self, _):
self.args = None
self.kwargs = None
# Callable[anything] will return None
class _Callable:
def __getitem__(self, _):
return None
# Make instances
Callable = _Callable()
try:
from clickhouse_driver import VERSION # type: ignore[import-not-found]
from clickhouse_driver.client import Client # type: ignore[import-not-found]
from clickhouse_driver.connection import Connection # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("clickhouse-driver not installed.")
class ClickhouseDriverIntegration(Integration):
identifier = "clickhouse_driver"
origin = f"auto.db.{identifier}"
@staticmethod
def setup_once() -> None:
_check_minimum_version(ClickhouseDriverIntegration, VERSION)
# Every query is done using the Connection's `send_query` function
Connection.send_query = _wrap_start(Connection.send_query)
# If the query contains parameters then the send_data function is used to send those parameters to clickhouse
_wrap_send_data()
# Every query ends either with the Client's `receive_end_of_query` (no result expected)
# or its `receive_result` (result expected)
Client.receive_end_of_query = _wrap_end(Client.receive_end_of_query)
if hasattr(Client, "receive_end_of_insert_query"):
# In 0.2.7, insert queries are handled separately via `receive_end_of_insert_query`
Client.receive_end_of_insert_query = _wrap_end(
Client.receive_end_of_insert_query
)
Client.receive_result = _wrap_end(Client.receive_result)
P = ParamSpec("P")
T = TypeVar("T")
def _wrap_start(f: "Callable[P, T]") -> "Callable[P, T]":
@ensure_integration_enabled(ClickhouseDriverIntegration, f)
def _inner(*args: "P.args", **kwargs: "P.kwargs") -> "T":
connection = args[0]
query = args[1]
query_id = args[2] if len(args) > 2 else kwargs.get("query_id")
params = args[3] if len(args) > 3 else kwargs.get("params")
span = sentry_sdk.start_span(
op=OP.DB,
name=query,
origin=ClickhouseDriverIntegration.origin,
)
connection._sentry_span = span # type: ignore[attr-defined]
_set_db_data(span, connection)
span.set_data("query", query)
if query_id:
span.set_data("db.query_id", query_id)
if params and should_send_default_pii():
span.set_data("db.params", params)
# run the original code
ret = f(*args, **kwargs)
return ret
return _inner
def _wrap_end(f: "Callable[P, T]") -> "Callable[P, T]":
def _inner_end(*args: "P.args", **kwargs: "P.kwargs") -> "T":
res = f(*args, **kwargs)
instance = args[0]
span = getattr(instance.connection, "_sentry_span", None) # type: ignore[attr-defined]
if span is not None:
if res is not None and should_send_default_pii():
span.set_data("db.result", res)
with capture_internal_exceptions():
span.scope.add_breadcrumb(
message=span._data.pop("query"), category="query", data=span._data
)
span.finish()
return res
return _inner_end
def _wrap_send_data() -> None:
original_send_data = Client.send_data
def _inner_send_data( # type: ignore[no-untyped-def] # clickhouse-driver does not type send_data
self, sample_block, data, types_check=False, columnar=False, *args, **kwargs
):
span = getattr(self.connection, "_sentry_span", None)
if span is not None:
_set_db_data(span, self.connection)
if should_send_default_pii():
db_params = span._data.get("db.params", [])
if isinstance(data, (list, tuple)):
db_params.extend(data)
else: # data is a generic iterator
orig_data = data
# Wrap the generator to add items to db.params as they are yielded.
# This allows us to send the params to Sentry without needing to allocate
# memory for the entire generator at once.
def wrapped_generator() -> "Iterator[Any]":
for item in orig_data:
db_params.append(item)
yield item
# Replace the original iterator with the wrapped one.
data = wrapped_generator()
span.set_data("db.params", db_params)
return original_send_data(
self, sample_block, data, types_check, columnar, *args, **kwargs
)
Client.send_data = _inner_send_data
def _set_db_data(span: "Span", connection: "Connection") -> None:
span.set_data(SPANDATA.DB_SYSTEM, "clickhouse")
span.set_data(SPANDATA.DB_DRIVER_NAME, "clickhouse-driver")
span.set_data(SPANDATA.SERVER_ADDRESS, connection.host)
span.set_data(SPANDATA.SERVER_PORT, connection.port)
span.set_data(SPANDATA.DB_NAME, connection.database)
span.set_data(SPANDATA.DB_USER, connection.user)