02241130创建于 2021年6月9日历史提交
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
(in-package :pgloader.pgcopy)

;;;
;;; We receive raw input rows from an lparallel queue, push their content
;;; down to PostgreSQL, handling any data related errors in the way.
;;;
(defun copy-rows-from-queue (copy queue
                             &key
                               disable-triggers
                               on-error-stop
                               (columns
                                (pgloader.sources:copy-column-list copy))
                             &aux
                               (pgconn  (clone-connection
                                         (pgloader.sources:target-db copy)))
                               (table   (pgloader.sources:target copy)))
  "Fetch rows from the QUEUE, prepare them in batches and send them down to
   PostgreSQL, and when that's done update stats."
  (let* ((nbcols        (length
                         (table-column-list (pgloader.sources::target copy))))
         (seconds 0))

    ;; we need to compute some information and have them at the right place
    ;; FIXME: review the API here, that's an half-baked refactoring.
    (prepare-copy-parameters copy)

    (log-message :info "COPY ON ERROR ~:[RESUME NEXT~;STOP~]" on-error-stop)

    (pgloader.pgsql:with-pgsql-connection (pgconn)
      (with-schema (unqualified-table-name table)
        (with-disabled-triggers (unqualified-table-name
                                 :disable-triggers disable-triggers)
          (log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a"
                       (lp:kernel-worker-index)
                       (format-table-name table)
                       columns)

          (let ((copy-fun
                 (cond ((eq :redshift (pgconn-variant pgconn))
                        ;;
                        ;; When using Redshift as the target, we lose the
                        ;; COPY FROM STDIN feature, and we have to use S3 as
                        ;; an intermediate step. We then upload content a
                        ;; batch at a time, and don't follow the
                        ;; on-error-stop setting.
                        ;;
                        (log-message :log "copy-rows-from-queue REDSHIFT")
                        (function batch-rows-to-s3-then-copy))

                       (on-error-stop
                        ;;
                        ;; When on-error-stop is true, we don't need to
                        ;; handle batch processing, we can stop as soon as
                        ;; there's a failure.
                        ;;
                        (function stream-rows-to-copy))

                       (t
                        ;;
                        ;; When on-error-stop is nil, we actually implement
                        ;; on-error-resume-next behavior, and for that we
                        ;; need to keep a batch of rows around in order to
                        ;; replay COPYing its content around, skipping rows
                        ;; that are rejected by PostgreSQL.
                        ;;
                        (function batch-rows-to-copy)))))

            ;;
            ;; As all our function have the same API. we can just funcall
            ;; the selected one here.
            ;;
            (incf seconds
                  (funcall copy-fun table columns copy nbcols queue))))))

    ;; each writer thread sends its own stop timestamp and the monitor keeps
    ;; only the latest entry
    (update-stats :data table :ws seconds :stop (get-internal-real-time))
    (log-message :debug "Writer[~a] for ~a is done in ~6$s"
                 (lp:kernel-worker-index)
                 (format-table-name table)
                 seconds)
    (list :writer table seconds)))

(defun prepare-copy-parameters (copy)
  "add some COPY activity related bits to our COPY object."
  (setf (transforms copy)
        (let ((funs (transforms copy)))
          (unless (every #'null funs)
            funs))

        ;; FIXME: we should change the API around preprocess-row, someday.
        (preprocessor copy)
        (pgloader.sources::preprocess-row copy)

        ;; FIXME: we could change the API around data-is-preformatted-p,
        ;; but that's a bigger change than duplicating the information in
        ;; the object.
        (copy-format copy)
        (if (data-is-preformatted-p copy) :escaped :raw)))