02241130创建于 2021年6月9日历史提交
;;
;; Abstract classes to define the API to connect to a data source
;;
(in-package :pgloader.connection)

;;;
;;; Generic API
;;;
(defclass connection ()
  ((type   :initarg :type :accessor conn-type)
   (handle :initarg :conn :accessor conn-handle :initform nil))
  (:documentation "pgloader connection parameters, base class"))

(define-condition connection-error (error)
  ((type :initarg :type :reader connection-error-type)
   (mesg :initarg :mesg :reader connection-error-mesg)))

(defgeneric open-connection (connection &key)
  (:documentation "Open a connection to the data source."))

(defgeneric close-connection (connection)
  (:documentation "Close a connection to the data source."))

(defgeneric check-connection (connection)
  (:documentation "Check that we can actually connect."))

(defgeneric clone-connection (connection)
  (:documentation "Instanciate a new connection object with similar properties."))


;;;
;;; File based objects
;;;
(defclass fd-connection (connection)
  ((uri  :initarg :uri  :accessor fd-uri)
   (arch :initarg :arch :accessor fd-arch)
   (path :initarg :path :accessor fd-path))
  (:documentation "pgloader connection parameters for a file based data source."))

(defmethod clone-connection ((fd fd-connection))
  (let ((clone (make-instance 'fd-connection :type (conn-type fd))))
    (loop :for slot :in '(uri arch path)
       :do (when (slot-boundp fd slot)
             (setf (slot-value clone slot) (slot-value fd slot))))
    clone))

(define-condition fd-connection-error (connection-error)
  ((path :initarg :path :reader connection-error-path))
  (:report (lambda (err stream)
             (format stream "Failed to open ~a file ~s: ~a"
                     (connection-error-type err)
                     (connection-error-path err)
                     (connection-error-mesg err)))))

(defmethod print-object ((fd fd-connection) stream)
  (print-unreadable-object (fd stream :type t :identity t)
    (let ((url (cond ((and (slot-boundp fd 'path) (slot-value fd 'path))
                      (slot-value fd 'path))
                     ((and (slot-boundp fd 'arch) (slot-value fd 'arch))
                      (slot-value fd 'arch))
                     ((and (slot-boundp fd 'uri) (slot-value fd 'uri))
                      (slot-value fd 'uri)))))
     (with-slots (type) fd
       (format stream "~a://~a" type url)))))

(defgeneric fetch-file (fd-connection)
  (:documentation "Support for HTTP URI for files."))

(defgeneric expand (fd-connection)
  (:documentation "Support for file archives."))

(defmethod expand ((fd fd-connection))
  "Expand the archive for the FD connection."
  (when (and (slot-boundp fd 'arch) (slot-value fd 'arch))
    (let ((archive-directory (expand-archive (fd-arch fd))))
      ;; if there's a single file in the archive, it must the the path
      (let ((files (uiop:directory-files archive-directory)))
        (if (= 1 (length files))
            (setf (fd-path fd) (first files))
            (setf (fd-path fd) archive-directory)))))
  fd)

(defmethod fetch-file ((fd fd-connection))
  "When the fd-connection has an URI slot, download its file."
  (when (and (slot-boundp fd 'uri) (slot-value fd 'uri))
    (let ((local-filename (http-fetch-file (fd-uri fd))))
      (if (archivep local-filename)
          (setf (fd-arch fd) local-filename)
          (setf (fd-path fd) local-filename))))
  fd)

;;;
;;; database connections
;;;
(defclass db-connection (connection)
  ((name :initarg :name :accessor db-name)
   (host :initarg :host :accessor db-host)
   (port :initarg :port :accessor db-port)
   (user :initarg :user :accessor db-user)
   (pass :initarg :pass :accessor db-pass))
  (:documentation "pgloader connection parameters for a database service."))

(defmethod clone-connection ((c db-connection))
  (make-instance 'db-connection
                 :type (conn-type c)
                 :name (db-name c)
                 :host (db-host c)
                 :port (db-port c)
                 :user (db-user c)
                 :pass (db-pass c)))

(defmethod print-object ((c db-connection) stream)
  (print-unreadable-object (c stream :type t :identity t)
    (with-slots (type name host port user) c
      (let ((host (typecase host
                    (cons (format nil "~a:~a"
                                  (string-downcase (car host))
                                  (cdr host)))
                    (t    host))))
       (format stream "~a://~a@~a:~a/~a" type user host port name)))))

(define-condition db-connection-error (connection-error)
  ((host :initarg :host :reader connection-error-host)
   (port :initarg :port :reader connection-error-port)
   (user :initarg :user :reader connection-error-user))
  (:report (lambda (err stream)
             (format stream "Failed to connect to ~a at ~s ~@[(port ~d)~]~@[ as user ~s~]: ~a"
                     (connection-error-type err)
                     (connection-error-host err)
                     (connection-error-port err)
                     (connection-error-user err)
                     (connection-error-mesg err)))))

(defgeneric query (db-connection sql &key)
  (:documentation "Query DB-CONNECTION with SQL query"))


;;;
;;; Tools for every connection classes
;;;
(defmacro with-connection ((var connection &rest args &key &allow-other-keys)
                           &body forms)
  "Connect to DB-CONNECTION and handle any condition when doing so, and when
   connected execute FORMS in a protected way so that we always disconnect
   at the end."
  (let ((conn (gensym "conn")))
    `(let* ((,conn ,connection)
            (,var (handler-case
                      ;; in some cases (client_min_messages set to debug5
                      ;; for example), PostgreSQL might send us some
                      ;; WARNINGs already when opening a new connection
                      (handler-bind ((cl-postgres:postgresql-warning
                                      #'(lambda (w)
                                          (log-message :warning "~a" w)
                                          (muffle-warning))))
                        (apply #'open-connection ,conn (list ,@args)))
                    (condition (e)
                      (cond ((typep ,connection 'fd-connection)
                             (error 'fd-connection-error
                                    :mesg (format nil "~a" e)
                                    :type (conn-type ,conn)
                                    :path (fd-path ,conn)))

                            ((typep ,connection 'db-connection)
                             (error 'db-connection-error
                                    :mesg (format nil "~a" e)
                                    :type (conn-type ,conn)
                                    :host (db-host ,conn)
                                    :port (db-port ,conn)
                                    :user (db-user ,conn)))

                            (t
                             (error 'connection-error
                                    :mesg (format nil "~a" e)
                                    :type (conn-type ,conn))))))))
       (unwind-protect
            (progn ,@forms)
         (close-connection ,var)))))

(defmethod check-connection ((fd fd-connection))
  "Check that it is possible to connect to db-connection C."
  (log-message :log "Attempting to open ~a" fd)
  (handler-case
      (with-connection (cnx fd)
        (log-message :log "Success, opened ~a." fd))
    (condition (e)
      (log-message :fatal "Failed to connect to ~a: ~a" fd e))))

(defmethod check-connection ((c db-connection))
  "Check that it is possible to connect to db-connection C."
  (log-message :log "Attempting to connect to ~a" c)
  (handler-case
      (with-connection (cnx c)
        (log-message :log "Success, opened ~a." c)
        (let ((sql "SELECT 1;"))
          (log-message :log "Running a simple query: ~a" sql)
          (handler-case
              (query cnx sql)
            (condition (e)
              (log-message :fatal "SQL failed on ~a: ~a" c e)))))
    (condition (e)
      (log-message :fatal "Failed to connect to ~a: ~a" c e))))