11package tink .sql .drivers .node ;
22
3- import #if haxe3 js .lib .Error #else js .Error #end as JsError ;
4- import haxe .DynamicAccess ;
53import haxe .Int64 ;
6- import haxe .extern .EitherType ;
7- import haxe .io .Bytes ;
8- import js .node .Buffer ;
9- import js .node .events .EventEmitter ;
104import js .node .stream .Readable .Readable ;
5+ import js .node .events .EventEmitter ;
6+ import js .node .Buffer ;
117import js .node .tls .SecureContext ;
12- import tink .sql .Info ;
8+ import haxe .DynamicAccess ;
9+ import haxe .extern .EitherType ;
10+ import haxe .io .Bytes ;
1311import tink .sql .Query ;
12+ import tink .sql .Info ;
1413import tink .sql .Types ;
15- import tink .sql .expr .ExprTyper ;
16- import tink .sql .format .MySqlFormatter ;
1714import tink .sql .format .Sanitizer ;
18- import tink .sql .format .SqlFormatter ;
19- import tink .sql .parse .ResultParser ;
2015import tink .streams .Stream ;
16+ import tink .sql .format .MySqlFormatter ;
17+ import tink .sql .expr .ExprTyper ;
18+ import tink .sql .parse .ResultParser ;
19+
20+ import #if haxe3 js .lib .Error #else js .Error #end as JsError ;
2121
2222using tink .CoreApi ;
2323
2424typedef NodeSettings = MySqlSettings & {
2525 final ? connectionLimit : Int ;
2626 final ? ssl : EitherType <String , SecureContextOptions >;
27- final ? maxPreparedStatements : Int ;
28- final ? rowsAsArray : Bool ;
29- final ? queueLimit : Int ;
30- final ? waitForConnections : Bool ;
31- final ? enableKeepAlive : Bool ;
32- final ? keepAliveInitialDelay : Int ;
3327}
3428
3529class MySql implements Driver {
30+
3631 public final type : Driver . DriverType = MySql ;
37-
32+
3833 final settings : NodeSettings ;
3934
4035 public function new (settings ) {
@@ -58,30 +53,20 @@ class MySql implements Driver {
5853 multipleStatements : true ,
5954 supportBigNumbers : true ,
6055 bigNumberStrings : true ,
61-
62- maxPreparedStatements : settings .maxPreparedStatements ?? 0 ,
63- rowsAsArray : settings .rowsAsArray ?? false ,
64-
65- queueLimit : settings .queueLimit ?? 0 ,
66- waitForConnections : settings .waitForConnections ?? true ,
67- enableKeepAlive : settings .enableKeepAlive ?? true ,
68- keepAliveInitialDelay : settings .keepAliveInitialDelay ?? 10000
6956 });
70-
71- #if debug
72- pool .on (' acquire' , function (connection ) {
73- js. Node .console .log (' Connection ${connection .threadId } acquired' );
74- });
75- pool .on (' connection' , function (connection ) {
76- js. Node .console .log (' Connection ${connection .threadId } created' );
77- });
78- pool .on (' enqueue' , function () {
79- js. Node .console .log (' Waiting for available connection slot' );
80- });
81- pool .on (' release' , function (connection ) {
82- js. Node .console .log (' Connection ${connection .threadId } released' );
83- });
84- #end
57+
58+ // pool.on('acquire', function (connection) {
59+ // js.Node.console.log('Connection ${connection.threadId} acquired');
60+ // });
61+ // pool.on('connection', function (connection) {
62+ // js.Node.console.log('Connection ${connection.threadId} created');
63+ // });
64+ // pool.on('enqueue', function () {
65+ // js.Node.console.log('Waiting for available connection slot');
66+ // });
67+ // pool.on('release', function (connection) {
68+ // js.Node.console.log('Connection ${connection.threadId} released');
69+ // });
8570
8671 return new MySqlConnectionPool (info , pool );
8772 }
@@ -92,37 +77,42 @@ class MySqlConnectionPool<Db> implements Connection.ConnectionPool<Db> {
9277 final pool : NativeConnectionPool ;
9378 final formatter : MySqlFormatter ;
9479 final parser : ResultParser <Db >;
80+
9581
9682 public function new (info , pool ) {
9783 this .info = info ;
9884 this .pool = pool ;
9985 this .formatter = new MySqlFormatter ();
10086 this .parser = new ResultParser ();
10187 }
102-
88+
89+
10390 public function getFormatter ()
10491 return formatter ;
105-
92+
10693 public function execute <Result >(query : Query <Db , Result >): Result {
10794 final cnx = getNativeConnection ();
10895 return new MySqlConnection (info , cnx , true ).execute (query );
10996 }
110-
97+
11198 public function isolate (): Pair <Connection <Db >, CallbackLink > {
11299 final cnx = getNativeConnection ();
113- return new Pair ((new MySqlConnection (info , cnx , false ) : Connection <Db >), (() -> cnx .handle (o -> switch o {
114- case Success (native ): native .release ();
115- case Failure (_ ): // nothing to do
116- }) : CallbackLink ));
100+ return new Pair (
101+ (new MySqlConnection (info , cnx , false ): Connection <Db >),
102+ (() -> cnx .handle (o -> switch o {
103+ case Success (native ): native .release ();
104+ case Failure (_ ): // nothing to do
105+ }): CallbackLink )
106+ );
117107 }
118-
108+
119109 function getNativeConnection () {
120110 return new Promise ((resolve , reject ) -> {
121111 var cancelled = false ;
122112 pool .getConnection ((err , cnx ) -> {
123- if (cancelled )
113+ if (cancelled )
124114 cnx .release ();
125- else if (err != null )
115+ else if (err != null )
126116 reject (Error .ofJsError (err ));
127117 else
128118 resolve (cnx );
@@ -136,8 +126,8 @@ class MySqlConnectionPool<Db> implements Connection.ConnectionPool<Db> {
136126 return new MySqlConnection (info , cnx , true ).executeSql (sql );
137127 }
138128}
139-
140129class MySqlConnection <Db > implements Connection <Db > implements Sanitizer {
130+
141131 final info : DatabaseInfo ;
142132 final cnx : Promise <NativeConnection >;
143133 final formatter : MySqlFormatter ;
@@ -153,14 +143,13 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
153143 }
154144
155145 public function value (v : Any ): String {
156- if (Std .is (v , Date )) return
157- #if mysql_session_timezone NativeDriver .escape ((v : Date ).toString ())
158- #else ' DATE_ADD(FROM_UNIXTIME(0), INTERVAL ${(v : Date ).getTime () / 1000 } SECOND)' #end;
146+ if (Std .is (v , Date ))
147+ return ' DATE_ADD(FROM_UNIXTIME(0), INTERVAL ${(v : Date ).getTime ()/ 1000 } SECOND)' ;
159148
160149 if (Int64 .isInt64 (v ))
161150 return Int64 .toStr (v );
162151
163- return NativeDriver .escape (if (Std .is (v , Bytes )) Buffer .hxFromBytes (v ) else v );
152+ return NativeDriver .escape (if (Std .is (v , Bytes )) Buffer .hxFromBytes (v ) else v );
164153 }
165154
166155 public function ident (s : String ): String
@@ -173,40 +162,29 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
173162 return Failure (Error .withData (error .message , error ));
174163
175164 public function execute <Result >(query : Query <Db , Result >): Result {
176- inline function fetch <T >(): Promise <T >
177- return run (queryOptions (query ));
165+ inline function fetch <T >(): Promise <T > return run (queryOptions (query ));
178166 return switch query {
179167 case Select (_ ) | Union (_ ):
180- final parse : DynamicAccess <Any > -> {} = parser .queryParser (query , formatter .isNested (query ));
168+ final parse : DynamicAccess <Any >-> {} = parser .queryParser (query , formatter .isNested (query ));
181169 stream (queryOptions (query )).map (parse );
170+
182171 case CallProcedure (_ ):
183172 Stream .promise (fetch ().next ((res : Array <Array <Any >>) -> {
184173 final iterator = res [0 ].iterator ();
185174 final parse = parser .queryParser (query , formatter .isNested (query ));
186175 Stream .ofIterator ({
187176 hasNext : () -> iterator .hasNext (),
188- next : () -> parse (iterator .next ())
177+ next : () -> parse (iterator .next ())
189178 });
190179 }));
191180 case Transaction (_ ) | CreateTable (_ , _ ) | DropTable (_ ) | AlterTable (_ , _ ) | TruncateTable (_ ):
192181 fetch ().next (_ -> Noise );
193- case Insert (insert ):
194- fetch ().next (res -> {
195- var insertId : Dynamic = res .insertId ;
196- final p : Promise <Dynamic > = switch (SqlFormatter .getAutoIncPrimaryKeyCol (insert .table )) {
197- case {type : DInt (Big , _ , _ , _ )}:
198- // insertId is always a number even if bigNumberStrings is enabled
199- // https://github.com/mysqljs/mysql/issues/2460
200- new Id64 (Int64 .fromFloat (insertId ));
201- case _ :
202- new Id (insertId );
203- }
204- p ;
205- });
182+ case Insert (_ ):
183+ fetch ().next (res -> new Id (res .insertId ));
206184 case Update (_ ):
207- fetch ().next (res -> {rowsAffected : (res .changedRows : Int )});
185+ fetch ().next (res -> {rowsAffected : (res .changedRows : Int )});
208186 case Delete (_ ):
209- fetch ().next (res -> {rowsAffected : (res .affectedRows : Int )});
187+ fetch ().next (res -> {rowsAffected : (res .affectedRows : Int )});
210188 case ShowColumns (_ ):
211189 fetch ().next ((res : Array <MysqlColumnInfo >) -> res .map (formatter .parseColumn ));
212190 case ShowIndex (_ ):
@@ -218,7 +196,7 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
218196 return run ({sql : sql });
219197 }
220198
221- function queryOptions (query : Query <Db , Dynamic >): QueryOptions {
199+ function queryOptions (query : Query <Db , Dynamic >): QueryOptions {
222200 final sql = formatter .format (query ).toString (this );
223201 #if sql_debug
224202 trace (sql );
@@ -231,32 +209,29 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
231209 }
232210 }
233211
234- function stream <T >(options : QueryOptions ): Stream <T , Error > {
212+ function stream <T >(options : QueryOptions ): Stream <T , Error > {
235213 return cnx .next (cnx -> {
236214 final query = cnx .query (options );
237215 Stream .ofNodeStream (' query' , query .stream ({highWaterMark : 1024 }), {onEnd : autoRelease ? cnx .release : null });
238216 });
239217 }
240218
241- function run <T >(options : QueryOptions ): Promise <T >
219+ function run <T >(options : QueryOptions ): Promise <T >
242220 return cnx .next (cnx -> {
243221 new Promise ((resolve , reject ) -> {
244222 cnx .query (options , (err , res ) -> {
245- if (autoRelease )
246- cnx .release ();
247- if (err != null )
248- reject (Error .ofJsError (err ));
249- else
250- resolve (cast res );
223+ if (autoRelease ) cnx .release ();
224+ if (err != null ) reject (Error .ofJsError (err ));
225+ else resolve (cast res );
251226 });
252227 null ; // irreversible, we always want to wait for the query to finish
253228 });
254229 });
255230
256- function typeCast (field , next ): Any {
231+ function typeCast (field , next ): Any {
257232 return switch field .type {
258233 case ' GEOMETRY' :
259- switch (field .buffer () : Buffer ) {
234+ switch (field .buffer (): Buffer ) {
260235 case null : null ;
261236 case v : @:privateAccess new ResultParser ().parseGeometryValue (v .hxToBytes ());
262237 }
@@ -267,7 +242,7 @@ class MySqlConnection<Db> implements Connection<Db> implements Sanitizer {
267242 }
268243}
269244
270- @:jsRequire (" mysql2 " )
245+ @:jsRequire (" mysql " )
271246private extern class NativeDriver {
272247 static function escape (value : Any ): String ;
273248 static function escapeId (ident : String ): String ;
@@ -299,15 +274,10 @@ private typedef NativeConfig = {
299274 final ? multipleStatements : Bool ;
300275 final ? flags : String ;
301276 final ? ssl : Any ;
302- final ? maxPreparedStatements : Int ;
303- final ? rowsAsArray : Bool ;
304- final ? enableKeepAlive : Bool ;
305- final ? keepAliveInitialDelay : Int ;
306277}
307-
308278private typedef NativePoolConfig = NativeConfig & {
309279 final ? acquireTimeout : Int ;
310- final ? waitForConnections : Bool ;
280+ final ? waitForConnections : Int ;
311281 final ? connectionLimit : Int ;
312282 final ? queueLimit : Int ;
313283}
@@ -321,17 +291,16 @@ private typedef QueryOptions = {
321291extern class NativeConnectionPool extends js.node.events. EventEmitter <NativeConnectionPool > {
322292 function getConnection (cb : JsError -> NativeConnection -> Void ): Void ;
323293}
324-
325294extern class NativeConnection {
326- @:overload (function (q : QueryOptions , cb : JsError -> Dynamic -> Void ): Void {})
327- function query <Row >(q : QueryOptions ): NativeQuery <Row >;
295+ @:overload (function (q : QueryOptions , cb : JsError -> Dynamic -> Void ): Void {})
296+ function query <Row >(q : QueryOptions ): NativeQuery <Row >;
328297 function pause (): Void ;
329298 function resume (): Void ;
330299 function release (): Void ;
331300}
332-
333301extern class NativeQuery <Row > extends EventEmitter <NativeQuery <Row >> {
334302 function stream (? opt : {? highWaterMark : Int }): NativeStream ;
335303}
336304
337305extern class NativeStream extends Readable <NativeStream > {}
306+
0 commit comments