02241130创建于 2021年6月9日历史提交
;;;
;;; Tools to handle internal queueing, using lparallel.queue
;;;
(in-package :pgloader.pgcopy)

;;;
;;; The pgloader architectures uses a reader thread and a writer thread. The
;;; reader fills in batches of data from the source of data, and the writer
;;; pushes the data down to PostgreSQL using the COPY protocol.
;;;
(defstruct (batch
             (:constructor
              make-batch (&key
                          (max-count (init-batch-max-count))
                          &aux
                          (data
                           (make-array max-count
                                       :element-type '(simple-array
                                                       (unsigned-byte 8)))))))
  (start     (get-internal-real-time) :type fixnum)
  (data      nil                      :type array)
  (count     0                        :type fixnum)
  (max-count 0                        :type fixnum)
  (bytes     0                        :type fixnum))

;;;
;;; The simplest way to avoid all batches being sent at the same time to
;;; PostgreSQL is to make them of different sizes. Here we tweak the batch
;;; size from *copy-batch-rows* to that effect.
;;;
(defun init-batch-max-count (&optional (batch-rows *copy-batch-rows*))
  "Return a number between 0.7 and 1.3 times batch-rows."
  ;; 0.7 < 0.7 + (random 0.6) < 1.3
  (truncate (* batch-rows (+ 0.7 (random 0.6)))))

(defun batch-oversized-p (batch)
  "Return a generalized boolean that is true only when BATCH is considered
   over-sized when its size in BYTES is compared *copy-batch-size*."
  (and *copy-batch-size*                ; defaults to nil
       (<= *copy-batch-size* (batch-bytes batch))))

(defun batch-full-p (batch)
  (or (= (batch-count batch) (batch-max-count batch))
      (batch-oversized-p batch)))

(defun push-row (batch row row-bytes)
  (with-slots (data count bytes) batch
    (setf (aref data count) row)
    (incf count)
    (incf bytes row-bytes)))


;;;
;;; Integration of batch with COPY row format
;;;
(defun format-row-in-batch (copy nbcols row current-batch)
  "Given a row from the queue, prepare it for the next batch."
  (multiple-value-bind (pg-vector-row bytes)
      (prepare-and-format-row copy nbcols row)
    (when pg-vector-row
      (push-row current-batch pg-vector-row bytes))))

(defun add-row-to-current-batch (table columns copy nbcols batch row
                                 &key send-batch-fn format-row-fn)
  "Add another ROW we just received to CURRENT-BATCH, and prepare a new
   batch if needed. The current-batch (possibly a new one) is returned. When
   the batch is full, the function SEND-BATCH-FN is called with TABLE,
   COLUMNS and the full BATCH as parameters."
  (let ((seconds       0)
        (current-batch batch))
    ;; if current-batch is full, send data to PostgreSQL
    ;; and prepare a new batch
    (when (batch-full-p current-batch)
      (incf seconds (funcall send-batch-fn table columns current-batch))
      (setf current-batch (make-batch))

      ;; give a little help to our friend, now is a good time
      ;; to garbage collect
      #+sbcl
      (let ((garbage-collect-start (get-internal-real-time)))
        (sb-ext:gc :full t)
        (incf seconds (elapsed-time-since garbage-collect-start))))

    ;; also add up the time it takes to format the rows
    (let ((start-time (get-internal-real-time)))
      (multiple-value-bind (pg-vector-row bytes)
          (funcall format-row-fn copy nbcols row)
        (when pg-vector-row
          (push-row current-batch pg-vector-row bytes)))
      (incf seconds (elapsed-time-since start-time)))

    (values current-batch seconds)))