02241130创建于 2021年6月9日历史提交
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
(in-package #:pgloader.pgcopy)

;;;
;;; Compute how many rows we're going to try loading next, depending on
;;; where we are in the batch currently and where is the next-error to be
;;; seen, if that's between current position and the end of the batch.
;;;
(defun next-batch-rows (batch-rows current-batch-pos next-error)
  "How many rows should we process in next iteration?"
  (cond
    ((< current-batch-pos next-error)
     ;; We Can safely push a batch with all the rows until the first error,
     ;; and here current-batch-pos should be 0 anyways.
     ;;
     ;; How many rows do we have from position 0 to position next-error,
     ;; excluding next-error? Well, next-error.
     (- next-error current-batch-pos))

    ((= current-batch-pos next-error)
     ;; Now we got to the line that we know is an error, we need to process
     ;; only that one in the next batch
     1)

    (t
     ;; We're past the known erroneous row. The batch might have new errors,
     ;; or maybe that was the only one. We'll figure it out soon enough,
     ;; let's try the whole remaining rows.
     (- batch-rows current-batch-pos))))

;;;
;;; In case of COPY error, PostgreSQL gives us the line where the error was
;;; found as a CONTEXT message. Let's parse that information to optimize our
;;; batching splitting in case of errors.
;;;
;;;  CONTEXT: COPY errors, line 1, column b: "2006-13-11"
;;;  CONTEXT: COPY byte, line 1: "hello\0world"
;;;
;;; Those error messages are a translation target, tho, so we can only
;;; assume to recognize the command tag (COPY), the comma, and a numer after
;;; a world that might be Zeile (de), línea (es), ligne (fr), riga (it),
;;; linia (pl), linha (pt), строка (ru), 行 (zh), or something else
;;; entirely.
;;;
(defun parse-copy-error-context (context)
  "Given a COPY command CONTEXT error message, return the batch position
   where the error comes from."
  (cl-ppcre:register-groups-bind ((#'parse-integer n))
      ("COPY [^,]+, [^ ]+ (\\d+)" context :sharedp t)
    (1- n)))

;;;
;;; The main retry batch function.
;;;
(defun retry-batch (table columns batch condition
                    &optional (current-batch-pos 0)
                    &aux (nb-errors 0))
  "Batch is a list of rows containing at least one bad row, the first such
   row is known to be located at FIRST-ERROR index in the BATCH array."

  (log-message :info "Entering error recovery.")

  ;; Not all COPY errors produce a COPY error message. Foreign key violation
  ;; produce a detailed message containing the data that we can't insert. In
  ;; that case we're going to insert every single row of the batch, one at a
  ;; time, and handle the error(s) individually.
  ;;
  (unless (parse-copy-error-context (database-error-context condition))
    (let ((table-name  (format-table-name table))
          (first-error t))
      (loop :repeat (batch-count batch)
         :for pos :from 0
         :do (handler-case
                 (incf pos
                       (copy-partial-batch table-name columns batch 1 pos))
               (postgresql-retryable (condition)
                 (pomo:execute "ROLLBACK")
                 (process-bad-row table condition (aref (batch-data batch) pos))
                 (if first-error
                     ;; the first error has been logged about already
                     (setf first-error nil)
                     (log-message :error "PostgreSQL [~s] ~a"
                                  table-name condition))
                 (incf nb-errors)))))

    ;; that's all folks, we're done.
    (return-from retry-batch nb-errors))

  ;; now deal with the COPY error case where we have a line number and have
  ;; the opportunity to be smart about it.
  (loop
     :with table-name := (format-table-name table)
     :with next-error := (parse-copy-error-context
                          (database-error-context condition))

     :while (< current-batch-pos (batch-count batch))

     :do
     (progn                             ; indenting helper
       (log-message :debug "pos: ~s ; err: ~a" current-batch-pos next-error)
       (when (= current-batch-pos next-error)
         (log-message :info "error recovery at ~d/~d, processing bad row"
                      current-batch-pos (batch-count batch))
         (process-bad-row table
                          condition
                          (aref (batch-data batch) current-batch-pos))
         (incf current-batch-pos)
         (incf nb-errors))

       (let* ((current-batch-rows
               (next-batch-rows (batch-count batch) current-batch-pos next-error)))
         (when (< 0 current-batch-rows)
           (if (< current-batch-pos next-error)
               (log-message :info
                            "error recovery at ~d/~d, next error at ~d, ~
                             loading ~d row~:p"
                            current-batch-pos
                            (batch-count batch)
                            next-error
                            current-batch-rows)
               (log-message :info
                            "error recovery at ~d/~d, trying ~d row~:p"
                            current-batch-pos
                            (batch-count batch)
                            current-batch-rows))

           (handler-case
               (incf current-batch-pos
                     (copy-partial-batch table-name
                                         columns
                                         batch
                                         current-batch-rows
                                         current-batch-pos))

             ;; the batch didn't make it, prepare error handling for next turn
             (postgresql-retryable (next-error-in-batch)
               (pomo:execute "ROLLBACK")
               (log-message :error "PostgreSQL [~s] ~a"
                            table-name
                            next-error-in-batch)
               (let ((next-error-relative
                      (parse-copy-error-context
                       (database-error-context next-error-in-batch))))

                 (setf condition  next-error-in-batch
                       next-error (+ current-batch-pos next-error-relative)))))))))

  (log-message :info "Recovery found ~d errors in ~d row~:p"
               nb-errors (batch-count batch))

  ;; Return how many rows where erroneous, for statistics purposes
  nb-errors)

(defun copy-partial-batch (table-name columns
                           batch current-batch-rows current-batch-pos)
  "Copy some rows of the batch, not all of them."
  (pomo:execute "BEGIN;")
  (let ((stream
         (cl-postgres:open-db-writer pomo:*database* table-name columns)))

    (unwind-protect
         (loop :repeat current-batch-rows
            :for pos :from current-batch-pos
            :do (db-write-row stream (aref (batch-data batch) pos)))

      ;; close-db-writer is the one signaling cl-postgres-errors
      (progn
        (cl-postgres:close-db-writer stream)
        (pomo:execute "COMMIT;")))

    ;; return how many rows we loaded, which is current-batch-rows
    current-batch-rows))