API Reference Source

lib/dialects/postgres/connection-manager.js

  1. 'use strict';
  2.  
  3. const _ = require('lodash');
  4. const AbstractConnectionManager = require('../abstract/connection-manager');
  5. const { logger } = require('../../utils/logger');
  6. const debug = logger.debugContext('connection:pg');
  7. const Promise = require('../../promise');
  8. const sequelizeErrors = require('../../errors');
  9. const semver = require('semver');
  10. const dataTypes = require('../../data-types');
  11. const moment = require('moment-timezone');
  12.  
  13. class ConnectionManager extends AbstractConnectionManager {
  14. constructor(dialect, sequelize) {
  15. sequelize.config.port = sequelize.config.port || 5432;
  16. super(dialect, sequelize);
  17.  
  18. const pgLib = this._loadDialectModule('pg');
  19. this.lib = this.sequelize.config.native ? pgLib.native : pgLib;
  20.  
  21. this._clearDynamicOIDs();
  22. this._clearTypeParser();
  23. this.refreshTypeParser(dataTypes.postgres);
  24. }
  25.  
  26. // Expose this as a method so that the parsing may be updated when the user has added additional, custom types
  27. _refreshTypeParser(dataType) {
  28. const arrayParserBuilder = parser => {
  29. return value => this.lib.types.arrayParser.create(value, parser).parse();
  30. };
  31. const rangeParserBuilder = parser => {
  32. return value => dataType.parse(value, { parser });
  33. };
  34.  
  35. // Set range parsers
  36. if (dataType.key.toLowerCase() === 'range') {
  37. for (const name in this.nameOidMap) {
  38. const entry = this.nameOidMap[name];
  39. if (! entry.rangeOid) continue;
  40.  
  41. const rangeParser = rangeParserBuilder(this.getTypeParser(entry.oid));
  42. const arrayRangeParser = arrayParserBuilder(rangeParser);
  43.  
  44. this.oidParserMap.set(entry.rangeOid, rangeParser);
  45. if (! entry.arrayRangeOid) continue;
  46. this.oidParserMap.set(entry.arrayRangeOid, arrayRangeParser);
  47. }
  48. return;
  49. }
  50.  
  51. // Create parsers for normal or enum data types
  52. const parser = value => dataType.parse(value);
  53. const arrayParser = arrayParserBuilder(parser);
  54.  
  55. // Set enum parsers
  56. if (dataType.key.toLowerCase() === 'enum') {
  57. this.enumOids.oids.forEach(oid => {
  58. this.oidParserMap.set(oid, parser);
  59. });
  60. this.enumOids.arrayOids.forEach(arrayOid => {
  61. this.oidParserMap.set(arrayOid, arrayParser);
  62. });
  63. return;
  64. }
  65.  
  66. // Set parsers for normal data types
  67. dataType.types.postgres.forEach(name => {
  68. if (! this.nameOidMap[name]) return;
  69. this.oidParserMap.set(this.nameOidMap[name].oid, parser);
  70.  
  71. if (! this.nameOidMap[name].arrayOid) return;
  72. this.oidParserMap.set(this.nameOidMap[name].arrayOid, arrayParser);
  73. });
  74. }
  75.  
  76. _clearTypeParser() {
  77. this.oidParserMap = new Map();
  78. }
  79.  
  80. getTypeParser(oid, ...args) {
  81. if (this.oidParserMap.get(oid)) return this.oidParserMap.get(oid);
  82.  
  83. return this.lib.types.getTypeParser(oid, ...args);
  84. }
  85.  
  86. connect(config) {
  87. config.user = config.username;
  88. const connectionConfig = _.pick(config, [
  89. 'user', 'password', 'host', 'database', 'port'
  90. ]);
  91.  
  92. connectionConfig.types = {
  93. getTypeParser: ConnectionManager.prototype.getTypeParser.bind(this)
  94. };
  95.  
  96. if (config.dialectOptions) {
  97. _.merge(connectionConfig,
  98. _.pick(config.dialectOptions, [
  99. // see [http://www.postgresql.org/docs/9.3/static/runtime-config-logging.html#GUC-APPLICATION-NAME]
  100. 'application_name',
  101. // choose the SSL mode with the PGSSLMODE environment variable
  102. // object format: [https://github.com/brianc/node-postgres/blob/master/lib/connection.js#L79]
  103. // see also [http://www.postgresql.org/docs/9.3/static/libpq-ssl.html]
  104. 'ssl',
  105. // In addition to the values accepted by the corresponding server,
  106. // you can use "auto" to determine the right encoding from the
  107. // current locale in the client (LC_CTYPE environment variable on Unix systems)
  108. 'client_encoding',
  109. // !! DO NOT SET THIS TO TRUE !!
  110. // (unless you know what you're doing)
  111. // see [http://www.postgresql.org/message-id/flat/bc9549a50706040852u27633f41ib1e6b09f8339d845@mail.gmail.com#bc9549a50706040852u27633f41ib1e6b09f8339d845@mail.gmail.com]
  112. 'binary',
  113. // This should help with backends incorrectly considering idle clients to be dead and prematurely disconnecting them.
  114. // this feature has been added in pg module v6.0.0, check pg/CHANGELOG.md
  115. 'keepAlive',
  116. // Times out queries after a set time in milliseconds. Added in pg v7.3
  117. 'statement_timeout'
  118. ]));
  119. }
  120.  
  121. return new Promise((resolve, reject) => {
  122. let responded = false;
  123.  
  124. const connection = new this.lib.Client(connectionConfig);
  125.  
  126. const parameterHandler = message => {
  127. switch (message.parameterName) {
  128. case 'server_version':
  129. if (this.sequelize.options.databaseVersion === 0) {
  130. const version = semver.coerce(message.parameterValue).version;
  131. this.sequelize.options.databaseVersion = semver.valid(version)
  132. ? version
  133. : this.defaultVersion;
  134. }
  135. break;
  136. case 'standard_conforming_strings':
  137. connection['standard_conforming_strings'] = message.parameterValue;
  138. break;
  139. }
  140. };
  141.  
  142. const endHandler = () => {
  143. debug('connection timeout');
  144. if (!responded) {
  145. reject(new sequelizeErrors.ConnectionTimedOutError(new Error('Connection timed out')));
  146. }
  147. };
  148.  
  149. // If we didn't ever hear from the client.connect() callback the connection timeout
  150. // node-postgres does not treat this as an error since no active query was ever emitted
  151. connection.once('end', endHandler);
  152.  
  153. if (!this.sequelize.config.native) {
  154. // Receive various server parameters for further configuration
  155. connection.connection.on('parameterStatus', parameterHandler);
  156. }
  157.  
  158. connection.connect(err => {
  159. responded = true;
  160.  
  161. if (!this.sequelize.config.native) {
  162. // remove parameter handler
  163. connection.connection.removeListener('parameterStatus', parameterHandler);
  164. }
  165.  
  166. if (err) {
  167. if (err.code) {
  168. switch (err.code) {
  169. case 'ECONNREFUSED':
  170. reject(new sequelizeErrors.ConnectionRefusedError(err));
  171. break;
  172. case 'ENOTFOUND':
  173. reject(new sequelizeErrors.HostNotFoundError(err));
  174. break;
  175. case 'EHOSTUNREACH':
  176. reject(new sequelizeErrors.HostNotReachableError(err));
  177. break;
  178. case 'EINVAL':
  179. reject(new sequelizeErrors.InvalidConnectionError(err));
  180. break;
  181. default:
  182. reject(new sequelizeErrors.ConnectionError(err));
  183. break;
  184. }
  185. } else {
  186. reject(new sequelizeErrors.ConnectionError(err));
  187. }
  188. } else {
  189. debug('connection acquired');
  190. connection.removeListener('end', endHandler);
  191. resolve(connection);
  192. }
  193. });
  194. }).tap(connection => {
  195. let query = '';
  196.  
  197. if (this.sequelize.options.standardConformingStrings !== false && connection['standard_conforming_strings'] !== 'on') {
  198. // Disable escape characters in strings
  199. // see https://github.com/sequelize/sequelize/issues/3545 (security issue)
  200. // see https://www.postgresql.org/docs/current/static/runtime-config-compatible.html#GUC-STANDARD-CONFORMING-STRINGS
  201. query += 'SET standard_conforming_strings=on;';
  202. }
  203.  
  204. if (this.sequelize.options.clientMinMessages !== false) {
  205. query += `SET client_min_messages TO ${this.sequelize.options.clientMinMessages};`;
  206. }
  207.  
  208. if (!this.sequelize.config.keepDefaultTimezone) {
  209. const isZone = !!moment.tz.zone(this.sequelize.options.timezone);
  210. if (isZone) {
  211. query += `SET TIME ZONE '${this.sequelize.options.timezone}';`;
  212. } else {
  213. query += `SET TIME ZONE INTERVAL '${this.sequelize.options.timezone}' HOUR TO MINUTE;`;
  214. }
  215. }
  216.  
  217. if (query) {
  218. return connection.query(query);
  219. }
  220. }).tap(connection => {
  221. if (Object.keys(this.nameOidMap).length === 0 &&
  222. this.enumOids.oids.length === 0 &&
  223. this.enumOids.arrayOids.length === 0) {
  224. return this._refreshDynamicOIDs(connection);
  225. }
  226. }).tap(connection => {
  227. // Don't let a Postgres restart (or error) to take down the whole app
  228. connection.on('error', error => {
  229. connection._invalid = true;
  230. debug(`connection error ${error.code || error.message}`);
  231. this.pool.destroy(connection);
  232. });
  233. });
  234. }
  235.  
  236. disconnect(connection) {
  237. if (connection._ending) {
  238. debug('connection tried to disconnect but was already at ENDING state');
  239. return Promise.resolve();
  240. }
  241.  
  242. return Promise.fromCallback(callback => connection.end(callback));
  243. }
  244.  
  245. validate(connection) {
  246. return !connection._invalid && !connection._ending;
  247. }
  248.  
  249. _refreshDynamicOIDs(connection) {
  250. const databaseVersion = this.sequelize.options.databaseVersion;
  251. const supportedVersion = '8.3.0';
  252.  
  253. // Check for supported version
  254. if ( (databaseVersion && semver.gte(databaseVersion, supportedVersion)) === false) {
  255. return Promise.resolve();
  256. }
  257.  
  258. // Refresh dynamic OIDs for some types
  259. // These include Geometry / Geography / HStore / Enum / Citext / Range
  260. return (connection || this.sequelize).query(
  261. 'WITH ranges AS (' +
  262. ' SELECT pg_range.rngtypid, pg_type.typname AS rngtypname,' +
  263. ' pg_type.typarray AS rngtyparray, pg_range.rngsubtype' +
  264. ' FROM pg_range LEFT OUTER JOIN pg_type ON pg_type.oid = pg_range.rngtypid' +
  265. ')' +
  266. 'SELECT pg_type.typname, pg_type.typtype, pg_type.oid, pg_type.typarray,' +
  267. ' ranges.rngtypname, ranges.rngtypid, ranges.rngtyparray' +
  268. ' FROM pg_type LEFT OUTER JOIN ranges ON pg_type.oid = ranges.rngsubtype' +
  269. ' WHERE (pg_type.typtype IN(\'b\', \'e\'));'
  270. ).then(results => {
  271. let result = Array.isArray(results) ? results.pop() : results;
  272.  
  273. // When searchPath is prepended then two statements are executed and the result is
  274. // an array of those two statements. First one is the SET search_path and second is
  275. // the SELECT query result.
  276. if (Array.isArray(result)) {
  277. if (result[0].command === 'SET') {
  278. result = result.pop();
  279. }
  280. }
  281.  
  282. const newNameOidMap = {};
  283. const newEnumOids = { oids: [], arrayOids: [] };
  284.  
  285. for (const row of result.rows) {
  286. // Mapping enums, handled separatedly
  287. if (row.typtype === 'e') {
  288. newEnumOids.oids.push(row.oid);
  289. if (row.typarray) newEnumOids.arrayOids.push(row.typarray);
  290. continue;
  291. }
  292.  
  293. // Mapping base types and their arrays
  294. newNameOidMap[row.typname] = { oid: row.oid };
  295. if (row.typarray) newNameOidMap[row.typname].arrayOid = row.typarray;
  296.  
  297. // Mapping ranges(of base types) and their arrays
  298. if (row.rngtypid) {
  299. newNameOidMap[row.typname].rangeOid = row.rngtypid;
  300. if (row.rngtyparray) newNameOidMap[row.typname].arrayRangeOid = row.rngtyparray;
  301. }
  302. }
  303.  
  304. // Replace all OID mappings. Avoids temporary empty OID mappings.
  305. this.nameOidMap = newNameOidMap;
  306. this.enumOids = newEnumOids;
  307.  
  308. this.refreshTypeParser(dataTypes.postgres);
  309. });
  310. }
  311.  
  312. _clearDynamicOIDs() {
  313. this.nameOidMap = {};
  314. this.enumOids = { oids: [], arrayOids: [] };
  315. }
  316. }
  317.  
  318. module.exports = ConnectionManager;
  319. module.exports.ConnectionManager = ConnectionManager;
  320. module.exports.default = ConnectionManager;