02241130创建于 2021年6月9日历史提交
;;;
;;; Read from a PostgreSQL database.
;;;

(in-package :pgloader.source.pgsql)

(defclass copy-pgsql (db-copy) ()
  (:documentation "pgloader PostgreSQL Data Source"))

(defmethod map-rows ((pgsql copy-pgsql) &key process-row-fn)
  "Extract PostgreSQL data and call PROCESS-ROW-FN function with a single
   argument (a list of column values) for each row"
  (let ((map-reader
         ;;
         ;; Build a Postmodern row reader that prepares a vector of strings
         ;; and call PROCESS-ROW-FN with the vector as single argument.
         ;;
         (cl-postgres:row-reader (fields)
           (let ((nb-cols (length fields)))
             (loop :while (cl-postgres:next-row)
                :do (let ((row (make-array nb-cols)))
                      (loop :for i :from 0
                         :for field :across fields
                         :do (setf (aref row i)
                                   (cl-postgres:next-field field)))
                      (funcall process-row-fn row)))))))

    (with-pgsql-connection ((source-db pgsql))
      (if (citus-backfill-table-p (target pgsql))
          ;;
          ;; SELECT dist_key, * FROM source JOIN dist ON ...
          ;;
          (let ((sql (citus-format-sql-select (source pgsql) (target pgsql))))
            (log-message :sql "~a" sql)
            (cl-postgres:exec-query pomo:*database* sql map-reader))

          ;;
          ;; No JOIN to add to backfill data in the SQL query here.
          ;;
          (let* ((cols   (mapcar #'column-name (fields pgsql)))
                 (sql
                  (format nil
                          "SELECT ~{~s::text~^, ~} FROM ~s.~s"
                          cols
                          (schema-source-name (table-schema (source pgsql)))
                          (table-source-name (source pgsql)))))
            (log-message :sql "~a" sql)
            (cl-postgres:exec-query pomo:*database* sql map-reader))))))

(defmethod copy-column-list ((pgsql copy-pgsql))
  "We are sending the data in the source columns ordering here."
  (mapcar (lambda (field) (ensure-quoted (column-name field)))
          (fields pgsql)))

(defmethod fetch-metadata ((pgsql copy-pgsql)
                           (catalog catalog)
                           &key
                             materialize-views
                             only-tables
                             create-indexes
                             foreign-keys
                             including
                             excluding)
  "PostgreSQL introspection to prepare the migration."
  (declare (ignore only-tables))
  (with-stats-collection ("fetch meta data"
                          :use-result-as-rows t
                          :use-result-as-read t
                          :section :pre)
    (with-pgsql-transaction (:pgconn (source-db pgsql))
      (let ((variant   (pgconn-variant (source-db pgsql)))
            (pgversion (pgconn-major-version (source-db pgsql))))
        ;;
        ;; First, create the source views that we're going to materialize in
        ;; the target database.
        ;;
        (when (and materialize-views (not (eq :all materialize-views)))
          (create-matviews materialize-views pgsql))

        (when (eq :pgdg variant)
          (list-all-sqltypes catalog
                             :including including
                             :excluding excluding))

        (list-all-columns catalog
                          :including including
                          :excluding excluding)

        (let* ((view-names (unless (eq :all materialize-views)
                             (mapcar #'matview-source-name materialize-views)))
               (including  (make-including-expr-from-view-names view-names)))
          (cond (view-names
                 (list-all-columns catalog
                                   :including including
                                   :table-type :view))

                ((eq :all materialize-views)
                 (list-all-columns catalog :table-type :view))))

        (when create-indexes
          (list-all-indexes catalog
                            :including including
                            :excluding excluding
                            :pgversion pgversion))

        (when (and (eq :pgdg variant) foreign-keys)
          (list-all-fkeys catalog
                          :including including
                          :excluding excluding))

        ;; return how many objects we're going to deal with in total
        ;; for stats collection
        (+ (count-tables catalog)
           (count-views catalog)
           (count-indexes catalog)
           (count-fkeys catalog)))))

  ;; be sure to return the catalog itself
  catalog)


(defmethod cleanup ((pgsql copy-pgsql) (catalog catalog) &key materialize-views)
  "When there is a PostgreSQL error at prepare-pgsql-database step, we might
   need to clean-up any view created in the source PostgreSQL connection for
   the migration purpose."
  (when materialize-views
    (with-pgsql-transaction (:pgconn  (source-db pgsql))
      (drop-matviews materialize-views pgsql))))