02241130创建于 2021年6月9日历史提交
;;;
;;; Generic API for pgloader sources
;;; Methods for source types with multiple files input
;;;

(in-package :pgloader.load)

(defmethod copy-database ((copy md-copy)
                          &key
                            (on-error-stop *on-error-stop*)
                            truncate
                            disable-triggers
			    drop-indexes

                            max-parallel-create-index

                            ;; generic API, but ignored here
                            (worker-count 4)
                            (concurrency 1)

                            data-only
			    schema-only
                            create-tables
			    include-drop
                            foreign-keys
			    create-indexes
			    reset-sequences
                            materialize-views
                            set-table-oids
                            including
                            excluding)
  "Copy the contents of the COPY formated file to PostgreSQL."
  (declare (ignore data-only schema-only
                   create-tables include-drop foreign-keys
                   create-indexes reset-sequences materialize-views
                   set-table-oids including excluding))

  (let* ((*on-error-stop* on-error-stop)
         (pgconn (target-db copy))
         pgsql-catalog)

    (handler-case
        (with-pgsql-connection (pgconn)
          (setf pgsql-catalog
                (fetch-pgsql-catalog (db-name pgconn)
                                     :table (target copy)
                                     :variant (pgconn-variant pgconn)
                                     :pgversion (pgconn-major-version pgconn)))

          ;; if the user didn't tell us the column list of the table, now is
          ;; a proper time to set it in the copy object
          (unless (and (slot-boundp copy 'columns)
                       (slot-value copy 'columns))
            (setf (columns copy)
                  (mapcar (lambda (col)
                            ;; we need to handle the md-copy format for the
                            ;; column list, which allow for user given
                            ;; options: each column is a list which car is
                            ;; the column name.
                            (list (column-name col)))
                          (table-field-list (first (table-list pgsql-catalog))))))

          (log-message :data "CATALOG: ~s" pgsql-catalog)

          ;; this sets (table-index-list (target copy))
          (maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes)

          ;; now is the proper time to truncate, before parallel operations
          (when truncate
            (truncate-tables pgsql-catalog)))

      (cl-postgres:database-error (e)
        (log-message :fatal "Failed to prepare target PostgreSQL table.")
        (log-message :fatal "~a" e)
        (return-from copy-database)))

    ;; Keep the PostgreSQL table target around in the copy instance,
    ;; with the following subtleties to deal with:
    ;;   1. the catalog fetching did fill-in PostgreSQL columns as fields
    ;;   2. we might target fewer pg columns than the table actually has
    (let ((table (first (table-list pgsql-catalog))))
      (setf (table-column-list table)
            (loop :for column-name :in (mapcar #'first (columns copy))
               :collect (find column-name (table-field-list table)
                              :key #'column-name
                              :test #'string=)))
      (setf (target copy) table))

    ;; expand the specs of our source, we might have to care about several
    ;; files actually.
    (let* ((lp:*kernel* (make-kernel worker-count))
           (channel     (lp:make-channel))
           (path-list   (expand-spec (source copy)))
           (task-count  0))
      (with-stats-collection ("Files Processed" :section :post
                                                :use-result-as-read t
                                                :use-result-as-rows t)
        (loop :for path-spec :in path-list
           :count t
           :do (let ((table-source (clone-copy-for copy path-spec)))
                 (when (and (header table-source) (null (fields table-source)))
                   (parse-header table-source))
                 (incf task-count
                       (copy-from table-source
                                  :concurrency concurrency
                                  :kernel lp:*kernel*
                                  :channel channel
                                  :on-error-stop on-error-stop
                                  :disable-triggers disable-triggers)))))

      ;; end kernel
      (with-stats-collection ("COPY Threads Completion" :section :post
                                                        :use-result-as-read t
                                                        :use-result-as-rows t)
        (loop :repeat task-count
           :do (handler-case
                   (destructuring-bind (task table seconds)
                       (lp:receive-result channel)
                     (log-message :debug
                                  "Finished processing ~a for ~s ~50T~6$s"
                                  task (format-table-name table) seconds))
                 (condition (e)
                   (log-message :fatal "~a" e)))
           :finally (progn
                      (lp:end-kernel :wait nil)
                      (return task-count))))
      (lp:end-kernel :wait t))

    ;; re-create the indexes from the target table entry
    (create-indexes-again (target-db copy)
                          pgsql-catalog
                          :max-parallel-create-index max-parallel-create-index
                          :drop-indexes drop-indexes)))