02241130创建于 2021年6月9日历史提交
;;;
;;; Tools to handle PostgreSQL queries
;;;
(in-package :pgloader.pgsql)

(defparameter *pgconn-variant* :pgdg
  "The PostgreSQL version/variant we are talking to. At the moment, this
  could be either :pgdg for PostgreSQL Global Development Group,
  or :redshift. The value is parsed from SELECT version(); and set in the
  pgsql-connection instance at open-connection time.")

;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defclass pgsql-connection (db-connection)
  ((use-ssl :initarg :use-ssl :accessor pgconn-use-ssl)
   (table-name :initarg :table-name :accessor pgconn-table-name)
   (version-string :initarg :pg-version :accessor pgconn-version-string)
   (major-version :initarg :major-version :accessor pgconn-major-version)
   (variant :initarg :variant :accessor pgconn-variant))
  (:documentation "PostgreSQL connection for pgloader"))

(defmethod initialize-instance :after ((pgconn pgsql-connection) &key)
  "Assign the type slot to pgsql."
  (setf (slot-value pgconn 'type) "pgsql"))

(defmethod clone-connection ((c pgsql-connection))
  (let ((clone
         (change-class (call-next-method c) 'pgsql-connection)))
    (setf (pgconn-use-ssl clone)        (pgconn-use-ssl c)
          (pgconn-table-name clone)     (pgconn-table-name c)
          (pgconn-version-string clone) (pgconn-version-string c)
          (pgconn-major-version clone)  (pgconn-major-version c)
          (pgconn-variant clone)        (pgconn-variant c))
    clone))

(defmethod ssl-enable-p ((pgconn pgsql-connection))
  "Return non-nil when the connection uses SSL"
  (member (pgconn-use-ssl pgconn) '(:try :yes)))

(defun new-pgsql-connection (pgconn)
  "Prepare a new connection object with all the same properties as pgconn,
   so as to avoid stepping on it's handle"
  (make-instance 'pgsql-connection
                 :user (db-user pgconn)
                 :pass (db-pass pgconn)
                 :host (db-host pgconn)
                 :port (db-port pgconn)
                 :name (db-name pgconn)
                 :use-ssl (pgconn-use-ssl pgconn)
                 :table-name (pgconn-table-name pgconn)))

;;;
;;; Implement SSL Client Side certificates
;;; http://www.postgresql.org/docs/current/static/libpq-ssl.html#LIBPQ-SSL-FILE-USAGE
;;;
(defvar *pgsql-client-certificate* "~/.postgresql/postgresql.crt"
  "File where to read the PostgreSQL Client Side SSL Certificate.")

(defvar *pgsql-client-key* "~/.postgresql/postgresql.key"
  "File where to read the PostgreSQL Client Side SSL Private Key.")

;;;
;;; PostgreSQL errors types  for pgloader.
;;;
(deftype postgresql-retryable ()
  "PostgreSQL errors that we know how to retry in a batch."
  `(or
    cl-postgres-error::data-exception
    cl-postgres-error::integrity-violation
    cl-postgres-error:internal-error
    cl-postgres-error::insufficient-resources
    cl-postgres-error::program-limit-exceeded))

(deftype postgresql-unavailable ()
    "It might happen that PostgreSQL becomes unavailable in the middle of
     our processing: it being restarted is an example."
  `(or
    cl-postgres-error::server-shutdown
    cl-postgres-error::admin-shutdown
    cl-postgres-error::crash-shutdown
    cl-postgres-error::operator-intervention
    cl-postgres-error::cannot-connect-now
    cl-postgres-error::database-connection-error
    cl-postgres-error::database-connection-lost
    cl-postgres-error::database-socket-error))

;;;
;;; We need to distinguish some special cases of PostgreSQL errors within
;;; Class 53 — Insufficient Resources: in case of "too many connections" we
;;; typically want to leave room for another worker to finish and free one
;;; connection, then try again.
;;;
;;; http://www.postgresql.org/docs/9.4/interactive/errcodes-appendix.html
;;;
;;; The "leave room to finish and try again" heuristic is currently quite
;;; simplistic, but at least it work in my test cases.
;;;
(cl-postgres-error::deferror "53300"
    too-many-connections cl-postgres-error:insufficient-resources)
(cl-postgres-error::deferror "53400"
    configuration-limit-exceeded cl-postgres-error:insufficient-resources)

(defvar *retry-connect-times* 5
  "How many times to we try to connect again.")

(defvar *retry-connect-delay* 0.5
  "How many seconds to wait before trying to connect again.")

(defmethod open-connection ((pgconn pgsql-connection) &key username)
  "Open a PostgreSQL connection."
  (let* (#+unix
         (cl-postgres::*unix-socket-dir*  (get-unix-socket-dir pgconn))
         (crt-file (expand-user-homedir-pathname *pgsql-client-certificate*))
         (key-file (expand-user-homedir-pathname *pgsql-client-key*))
         (pomo::*ssl-certificate-file* (when (and (ssl-enable-p pgconn)
                                                  (probe-file crt-file))
                                         (uiop:native-namestring crt-file)))
         (pomo::*ssl-key-file*         (when (and (ssl-enable-p pgconn)
                                                  (probe-file key-file))
                                         (uiop:native-namestring key-file)))
         ;;
         ;; It's ok to set :verify-mode to NONE here because
         ;; cl+ssl:*make-ssl-client-stream-verify-default* defaults to
         ;; :require and takes precedence.
         ;;
         ;; Only when --no-ssl-cert-verification is passed as a command line
         ;; option do we set cl+ssl:*make-ssl-client-stream-verify-default*
         ;; to NIL, then allowing the NONE behaviour set here.
         ;;
         (ssl-context
          (CL+SSL:MAKE-CONTEXT :disabled-protocols nil
                               :verify-mode CL+SSL:+SSL-VERIFY-NONE+)))
    (flet ((connect (pgconn username)
             (handler-case
                 ;; in some cases (client_min_messages set to debug5
                 ;; for example), PostgreSQL might send us some
                 ;; WARNINGs already when opening a new connection
                 (handler-bind ((cl-postgres:postgresql-warning
                                 #'(lambda (w)
                                     (log-message :warning "~a" w)
                                     (muffle-warning))))
                   (CL+SSL:WITH-GLOBAL-CONTEXT (ssl-context :auto-free-p t)
                    (pomo:connect (db-name pgconn)
                                  (or username (db-user pgconn))
                                  (db-pass pgconn)
                                  (let ((host (db-host pgconn)))
                                    (if (and (consp host) (eq :unix (car host)))
                                        :unix
                                        host))
                                  :port (db-port pgconn)
                                  :use-ssl (or (pgconn-use-ssl pgconn) :no))))

               ((or too-many-connections configuration-limit-exceeded) (e)
                 (log-message :error
                              "Failed to connect to ~a: ~a; will try again in ~fs"
                              pgconn e *retry-connect-delay*)
                (sleep *retry-connect-delay*))

               (CL+SSL:SSL-ERROR-VERIFY (e)
                 (log-message :error
                              "Connecting to PostgreSQL ~a: ~a"
                              (db-host pgconn) e)
                 (log-message :log "You may try --no-ssl-cert-verification")
                 (error e)))))
      (loop :while (null (conn-handle pgconn))
         :repeat *retry-connect-times*
         :do (setf (conn-handle pgconn) (connect pgconn username))))

    (unless (conn-handle pgconn)
      (error "Failed ~d times to connect to ~a" *retry-connect-times* pgconn))

    (log-message :debug "CONNECTED TO ~s" pgconn)
    (set-session-gucs *pg-settings* :database (conn-handle pgconn))
    (set-postgresql-version pgconn)

    pgconn))

(defmethod close-connection ((pgconn pgsql-connection))
  "Close a PostgreSQL connection."
  (assert (not (null (conn-handle pgconn))))
  (pomo:disconnect (conn-handle pgconn))
  (setf (conn-handle pgconn) nil)
  pgconn)

(defmethod query ((pgconn (eql nil)) sql &key)
  "Case when a connection already exists around the call, as per
   `with-connection' and `with-transaction'."
  (log-message :sql "~a" sql)
  (pomo:query sql))

(defmethod query ((pgconn pgsql-connection) sql &key)
  (let ((pomo:*database* (conn-handle pgconn)))
    (log-message :sql "~a" sql)
    (pomo:query sql)))

(defmacro handling-pgsql-notices (&body forms)
  "The BODY is run within a PostgreSQL transaction where *pg-settings* have
   been applied. PostgreSQL warnings and errors are logged at the
   appropriate log level."
  `(handler-bind
       (((and cl-postgres:database-error
              (not postgresql-unavailable))
          #'(lambda (e)
              (log-message :error "~a" e)))
	(cl-postgres:postgresql-warning
	 #'(lambda (w)
	     (log-message :warning "~a" w)
	     (muffle-warning))))
     (progn ,@forms)))

(defmacro with-pgsql-transaction ((&key pgconn database) &body forms)
  "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
   given."
  (if database
      `(let ((pomo:*database* ,database))
	 (handling-pgsql-notices
              (pomo:with-transaction ()
                (log-message :debug "BEGIN")
                ,@forms)))
      ;; no database given, create a new database connection
      `(with-pgsql-connection (,pgconn)
         (pomo:with-transaction ()
           (log-message :debug "BEGIN")
           ,@forms))))

(defmacro with-pgsql-connection ((pgconn) &body forms)
  "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection
   spec from the DBNAME, use `get-connection-spec'."
  (let ((conn (gensym "pgsql-conn")))
    `(with-connection (,conn ,pgconn)
       (let ((pomo:*database*  (conn-handle ,conn))
             (*pgconn-variant* (pgconn-variant ,conn)))
         (handling-pgsql-notices
           ,@forms)))))

(defun get-unix-socket-dir (pgconn)
  "When *pgconn* host is a (cons :unix path) value, return the right value
   for cl-postgres::*unix-socket-dir*."
  (let ((host (db-host pgconn)))
    (if (and (consp host) (eq :unix (car host)))
        ;; set to *pgconn* host value
        (directory-namestring (fad:pathname-as-directory (cdr host)))
        ;; keep as is.
        cl-postgres::*unix-socket-dir*)))

(defun set-session-gucs (alist &key transaction database)
  "Set given GUCs to given values for the current session."
  (let ((pomo:*database* (or database pomo:*database*)))
    (loop
       :for (name . value) :in alist
       :for set := (cond
                     ((string-equal "search_path" name)
                      ;; for search_path, don't quote the value
                      (format nil "SET~:[~; LOCAL~] ~a TO ~a"
                              transaction name value))
                     (t
                      ;; general case: quote the value
                      (format nil "SET~:[~; LOCAL~] ~a TO '~a'"
                              transaction name value)))
       :do (progn                       ; indent helper
             (log-message :debug set)
             (pomo:execute set)))))



;;;
;;; The parser is still hard-coded to support only PostgreSQL targets
;;;
(defun sanitize-user-gucs (gucs)
  "Forbid certain actions such as setting a client_encoding different from utf8."
  (let ((gucs
         (append
          (list (cons "client_encoding" "utf8"))
          (loop :for (name . value) :in gucs
             :when    (and (string-equal name "client_encoding")
                           (not (member value '("utf-8" "utf8") :test #'string-equal)))
             :do      (log-message :warning
                                   "pgloader always talk to PostgreSQL in utf-8, client_encoding has been forced to 'utf8'.")
             :else
             :collect (cons name value)))))
    ;;
    ;; Now see about the application_name, provide "pgloader" if it's not
    ;; been overloaded already.
    ;;
    (cond ((not (assoc "application_name" gucs :test #'string-equal))
           (append gucs (list (cons "application_name" "pgloader"))))

          (t
           gucs))))


;;;
;;; DDL support with stats (timing, object count)
;;;
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql)
  "Run pgsql-execute-with-timing within a newly establised connection."
  (handler-case
      (with-pgsql-connection (pgconn)
        (pomo:with-transaction ()
          (pgsql-execute-with-timing section label sql :log-level :notice)))

    (postgresql-unavailable (condition)

      (log-message :error "~a" condition)
      (log-message :error "Reconnecting to PostgreSQL")

     ;; in order to avoid Socket error in "connect": ECONNREFUSED if we
     ;; try just too soon, wait a little
      (sleep 2)

      (pgsql-connect-and-execute-with-timing pgconn section label sql))))

(defun pgsql-execute-with-timing (section label sql-list
                                  &key
                                    (log-level :sql)
                                    on-error-stop
                                    client-min-messages)
  "Execute given SQL and resgister its timing into STATE."
  (let ((sql-list (alexandria:ensure-list sql-list)))
    (multiple-value-bind (res secs)
        (timing
          (multiple-value-bind (nb-ok nb-errors)
              (pgsql-execute sql-list
                             :log-level log-level
                             :on-error-stop on-error-stop
                             :client-min-messages client-min-messages)
            (update-stats section label :rows nb-ok :errs nb-errors)))
      (declare (ignore res))
      (update-stats section label :read (length sql-list) :secs secs))))

(defun pgsql-execute (sql
                      &key
                        (log-level :sql)
                        client-min-messages
                        (on-error-stop t))
  "Execute given SQL list of statements in current transaction.

   When ON-ERROR-STOP is non-nil (the default), we stop at the first sql
   statement that fails. That's because this facility is meant for DDL. With
   ON_ERROR_STOP nil, log the problem and continue thanks to PostgreSQL
   savepoints."
  (let ((sql-list  (alexandria::ensure-list sql))
        (nb-ok     0)
        (nb-errors 0))
    (when client-min-messages
      (unless (eq :redshift *pgconn-variant*)
        (pomo:execute
         (format nil "SET LOCAL client_min_messages TO ~a;"
                 (symbol-name client-min-messages)))))

    (if on-error-stop
        (loop :for sql :in sql-list
           :do (progn
                 (log-message log-level "~a" sql)
                 (pomo:execute sql))
           ;; never executed in case of error, which signals out of here
           :finally (incf nb-ok (length sql-list)))

        ;; handle failures and just continue
        (loop :for sql :in sql-list
           :do (progn
                 (pomo:execute "savepoint pgloader;")
                 (handler-case
                     (progn
                       (log-message log-level "~a" sql)
                       (pomo:execute sql)
                       (pomo:execute "release savepoint pgloader;")
                       (incf nb-ok))
                   (cl-postgres:database-error (e)
                     (incf nb-errors)
                     (log-message :error "PostgreSQL ~a" e)
                     (pomo:execute "rollback to savepoint pgloader;"))))))

    (when client-min-messages
      (unless (eq :redshift *pgconn-variant*)
        (pomo:execute (format nil "RESET client_min_messages;"))))

    (values nb-ok nb-errors)))


;;;
;;; PostgreSQL version specific support, that we get once connected
;;;

;;;
;;; Given that RedShift is a PostgreSQL fork(), we can use PostgreSQL
;;; protocol to talk to it. That said, it's a strange beast:
;;;
;;;  pgloader=> show server_version;
;;;  ERROR:  must be superuser to examine "server_version"
;;;
;;; So we're back to parsing the output of select version();
;;;
(defun set-postgresql-version (pgconn)
  "Get the PostgreSQL version string and parse it."
  (let* ((database        (conn-handle pgconn))
         (pomo:*database* (or database pomo:*database*))
         (version-string  (pomo:query "select version()" :single)))
    (multiple-value-bind (version major-version variant)
        (parse-postgresql-version-string version-string)
      (declare (ignore version))
      (setf (pgconn-version-string pgconn) version-string)
      (setf (pgconn-major-version pgconn) major-version)
      (setf (pgconn-variant pgconn) variant))))

;;; Parse the version string and return the major and complete version
;;; "numbers" as strings, and the variant as a third value, where variant
;;; might be "pgsql" or "Redshift".
;;;
;;;  PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.2058
;;;  PostgreSQL 10.1 on x86_64-apple-darwin14.5.0, compiled by Apple LLVM version 7.0.0 (clang-700.1.76), 64-bit
;;;  PostgreSQL 10.6 (Ubuntu 10.6-1.pgdg14.04+1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 4.8.4-2ubuntu1~14.04.4) 4.8.4, 64-bit
;;;  PostgreSQL 10.6, compiled by Visual C++ build 1800, 64-bit
(defun parse-postgresql-version-string (version-string)
  "Parse PostgreSQL select version() output."
  (cl-ppcre:register-groups-bind (full-version maybe-os maybe-variant)
      ("PostgreSQL ([0-9.]+)( [^,]+)?, [^,]+, (.*)" version-string)
    (declare (ignore maybe-os))
    (let* ((version-dots  (split-sequence:split-sequence #\. full-version))
           (major-version (if (= 3 (length version-dots))
                              (format nil "~a.~a"
                                      (first version-dots)
                                      (second version-dots))
                              (first version-dots)))
           (variant       (if (cl-ppcre:scan "Redshift" maybe-variant)
                              :redshift
                              :pgdg)))
      (values full-version major-version variant))))

(defun list-typenames-without-btree-support ()
  "Fetch PostgresQL data types without btree support, so that it's possible
   to later CREATE INDEX ... ON ... USING gist(...), or even something else
   than gist. "
  (loop :for (typename access-methods) :in
     (pomo:query (sql "/pgsql/list-typenames-without-btree-support.sql"))
     :collect (cons typename access-methods)))

(defvar *redshift-reserved-keywords*
  '("aes128"
    "aes256"
    "all"
    "allowoverwrite"
    "analyse"
    "analyze"
    "and"
    "any"
    "array"
    "as"
    "asc"
    "authorization"
    "backup"
    "between"
    "binary"
    "blanksasnull"
    "both"
    "bytedict"
    "bzip2"
    "case"
    "cast"
    "check"
    "collate"
    "column"
    "constraint"
    "create"
    "credentials"
    "cross"
    "current_date"
    "current_time"
    "current_timestamp"
    "current_user"
    "current_user_id"
    "default"
    "deferrable"
    "deflate"
    "defrag"
    "delta"
    "delta32k"
    "desc"
    "disable"
    "distinct"
    "do"
    "else"
    "emptyasnull"
    "enable"
    "encode"
    "encrypt"
    "encryption"
    "end"
    "except"
    "explicit"
    "false"
    "for"
    "foreign"
    "freeze"
    "from"
    "full"
    "globaldict256"
    "globaldict64k"
    "grant"
    "group"
    "gzip"
    "having"
    "identity"
    "ignore"
    "ilike"
    "in"
    "initially"
    "inner"
    "intersect"
    "into"
    "is"
    "isnull"
    "join"
    "leading"
    "left"
    "like"
    "limit"
    "localtime"
    "localtimestamp"
    "lun"
    "luns"
    "lzo"
    "lzop"
    "minus"
    "mostly13"
    "mostly32"
    "mostly8"
    "natural"
    "new"
    "not"
    "notnull"
    "null"
    "nulls"
    "off"
    "offline"
    "offset"
    "oid"
    "old"
    "on"
    "only"
    "open"
    "or"
    "order"
    "outer"
    "overlaps"
    "parallel"
    "partition"
    "percent"
    "permissions"
    "placing"
    "primary"
    "raw"
    "readratio"
    "recover"
    "references"
    "respect"
    "rejectlog"
    "resort"
    "restore"
    "right"
    "select"
    "session_user"
    "similar"
    "snapshot"
    "some"
    "sysdate"
    "system"
    "table"
    "tag"
    "tdes"
    "text255"
    "text32k"
    "then"
    "timestamp"
    "to"
    "top"
    "trailing"
    "true"
    "truncatecolumns"
    "union"
    "unique"
    "user"
    "using"
    "verbose"
    "wallet"
    "when"
    "where"
    "with"
    "without")
  "See https://docs.aws.amazon.com/redshift/latest/dg/r_pg_keywords.html")

(defun list-reserved-keywords (pgconn)
  "Connect to PostgreSQL DBNAME and fetch reserved keywords."
  (with-pgsql-connection (pgconn)
    (handler-case
        (pomo:query "select word
                   from pg_get_keywords()
                  where catcode IN ('R', 'T')" :column)
      ;; support for Amazon Redshift
      (cl-postgres-error::syntax-error-or-access-violation (e)
        (declare (ignore e))
        ;; 42883	undefined_function
        ;;    Database error 42883: function pg_get_keywords() does not exist

        (let ((version-string (pomo:query "select version()" :single)))

          (if (cl-ppcre:scan "Redshift" version-string)
              *redshift-reserved-keywords*

              ;;
              ;; In case the pg_get_keywords() function isn't supported but
              ;; we're not talking to a Redshift database, use the following
              ;; list of keywords, that comes from a manual query against a
              ;; local PostgreSQL server (version 9.5devel). It's better to
              ;; have this list than nothing at all.
              ;;
              (list "all"
                    "analyse"
                    "analyze"
                    "and"
                    "any"
                    "array"
                    "as"
                    "asc"
                    "asymmetric"
                    "authorization"
                    "binary"
                    "both"
                    "case"
                    "cast"
                    "check"
                    "collate"
                    "collation"
                    "column"
                    "concurrently"
                    "constraint"
                    "create"
                    "cross"
                    "current_catalog"
                    "current_date"
                    "current_role"
                    "current_schema"
                    "current_time"
                    "current_timestamp"
                    "current_user"
                    "default"
                    "deferrable"
                    "desc"
                    "distinct"
                    "do"
                    "else"
                    "end"
                    "except"
                    "false"
                    "fetch"
                    "for"
                    "foreign"
                    "freeze"
                    "from"
                    "full"
                    "grant"
                    "group"
                    "having"
                    "ilike"
                    "in"
                    "initially"
                    "inner"
                    "intersect"
                    "into"
                    "is"
                    "isnull"
                    "join"
                    "lateral"
                    "leading"
                    "left"
                    "like"
                    "limit"
                    "localtime"
                    "localtimestamp"
                    "natural"
                    "not"
                    "notnull"
                    "null"
                    "offset"
                    "on"
                    "only"
                    "or"
                    "order"
                    "outer"
                    "overlaps"
                    "placing"
                    "primary"
                    "references"
                    "returning"
                    "right"
                    "select"
                    "session_user"
                    "similar"
                    "some"
                    "symmetric"
                    "table"
                    "then"
                    "to"
                    "trailing"
                    "true"
                    "union"
                    "unique"
                    "user"
                    "using"
                    "variadic"
                    "verbose"
                    "when"
                    "where"
                    "window"
                    "with")))))))