02241130创建于 2021年6月9日历史提交
;;;
;;; Citus support in pgloader allows to declare what needs to change in the
;;; source schema in terms of Citus concepts: reference and distributed
;;; table.
;;;

#|
   distribute billers using id
   distribute bills using biller_id
   distribute receivable_accounts using biller_id
   distribute payments using biller_id

   distribute splits using biller_id
                      from receivable_accounts

   distribute ach_accounts as reference table
|#


(in-package #:pgloader.citus)

;;;
;;; Main data structures to host our distribution rules.
;;;
(defstruct citus-reference-rule table)
(defstruct citus-distributed-rule table using from)

(defun citus-distribute-schema (catalog distribution-rules)
  "Distribute a CATALOG with given user provided DISTRIBUTION-RULES. Return
   the list of rules applied."
  (let ((processed-rules '())
        (derived-rules
         (loop :for rule :in distribution-rules
            :append (progn
                      (citus-set-table rule catalog)
                      (compute-foreign-rules rule (citus-rule-table rule))))))

    ;;
    ;; Apply rules only once.
    ;;
    ;; ERROR Database error 42P16: table ;; "campaigns" is already distributed
    ;;
    ;; In the PostgreSQL source case, we have the table OIDs already at this
    ;; point, but in the general case we don't. Use the names to match what
    ;; we did up to now.
    ;;
    (loop :for rule :in (append distribution-rules derived-rules)
       :unless (member (table-source-name (citus-rule-table rule))
                       processed-rules
                       :key (lambda (rule)
                              (table-source-name (citus-rule-table rule)))
                       :test #'equal)
       :collect (progn
                  (push rule processed-rules)
                  (apply-citus-rule rule)
                  rule))))

(define-condition citus-rule-table-not-found (error)
  ((schema-name :initarg :schema-name
                :accessor citus-rule-table-not-found-schema-name)
   (table-name :initarg :table-name
               :accessor citus-rule-table-not-found-table-name))
  (:report
   (lambda (err stream)
     (let ((*print-circle* nil))
       (with-slots (schema-name table-name)
           err
         (format stream
                 "Could not find table ~s in schema ~s for distribution rules."
                 table-name schema-name))))))

(defun citus-find-table (catalog table)
  (let* ((source-name (table-source-name table))
         (table-name  (etypecase source-name
                        (string source-name)
                        (cons   (cdr source-name))))
         (schema-name (schema-name (table-schema table))))
    (or (find-table (find-schema catalog schema-name) table-name)
        (error (make-condition 'citus-rule-table-not-found
                               :table-name table-name
                               :schema-name schema-name)))))

(defgeneric citus-rule-table (rule)
  (:documentation "Returns the RULE's table.")
  (:method ((rule citus-reference-rule))   (citus-reference-rule-table rule))
  (:method ((rule citus-distributed-rule)) (citus-distributed-rule-table rule)))

(defgeneric citus-set-table (rule catalog)
  (:documentation "Find citus RULE table in CATALOG and update the
  placeholder with the table found there.")
  (:method ((rule citus-reference-rule) (catalog catalog))
    (let ((table (citus-reference-rule-table rule)))
      (setf (citus-reference-rule-table rule)
            (citus-find-table catalog table))))

  (:method ((rule citus-distributed-rule) (catalog catalog))
    (let ((table (citus-distributed-rule-table rule)))
      (map-into (citus-distributed-rule-from rule)
                (lambda (from) (citus-find-table catalog from))
                (citus-distributed-rule-from rule))
      (setf (citus-distributed-rule-table rule)
            (citus-find-table catalog table)))))

(defmethod print-object ((rule citus-reference-rule) stream)
  (print-unreadable-object (rule stream :type t :identity t)
    (with-slots (table) rule
      (format stream
              "distribute ~a as reference"
              (format-table-name table)))))

(defmethod print-object ((rule citus-distributed-rule) stream)
  (print-unreadable-object (rule stream :type t :identity t)
    (with-slots (table using from) rule
      (format stream
              "distribute ~a :using ~a~@[ :from ~{~a~^, ~}~]"
              (format-table-name table)
              (column-name using)
              (mapcar #'format-table-name from)))))


;;;
;;; When distributing a table on a given key, we can follow foreign keys
;;; pointing to this table. We might find out that when computing the
;;; following rule:
;;;
;;;    distribute companies using id
;;;
;;; We then want to add the set of rules that we find walking the foreign
;;; keys:
;;;
;;;   distribute campaigns using company_id
;;;   distribute ads using company_id from campaigns
;;;   distribute clicks using company_id from ads, campaigns
;;;   distribute impressions using company_id from ads, campaigns
;;;
(defgeneric compute-foreign-rules (rule table &key)
  (:documentation
   "Compute rules to apply that derive from the distribution rule RULE when
    following foreign-keys from TABLE."))

(defmethod compute-foreign-rules ((rule citus-reference-rule)
                                  (table table)
                                  &key)
  "There's nothing to do here, reference table doesn't impact the schema."
  nil)

(defmethod compute-foreign-rules ((rule citus-distributed-rule)
                                  (table table)
                                  &key fkey-list)
  "Find every foreign key that points to TABLE and add return a list of new
   rules for the source of those foreign keys."
  (let ((pkey  (find-if #'index-primary (table-index-list table))))

    (when (and pkey (member (column-name (citus-distributed-rule-using rule))
                            (index-columns pkey)
                            :test #'string=))
      (loop :for fkey :in (index-fk-deps pkey)
         :for new-fkey-list := (cons fkey fkey-list)
         :for new-rule := (make-distributed-table-from-fkey rule new-fkey-list)
         :collect new-rule :into new-rule-list
         :collect (compute-foreign-rules rule (fkey-table fkey)
                                         :fkey-list new-fkey-list)
         :into dep-rule-list
         :finally (return (append new-rule-list
                                  ;; flatten sub-lists as we go
                                  (apply #'append dep-rule-list)))))))

(defun make-distributed-table-from-fkey (rule fkey-list)
  "Make a new Citus distributed table rule from an existing rule and a fkey
   definition."
  ;;
  ;; We have a list of foreign keys pointing from a current table,
  ;; (fkey-table fkey), to the root table that is distributed,
  ;; (fkey-foreign-table fkey).
  ;;
  ;; For the distribution key name, we consider the name of the column used
  ;; in the last entry from the fkey-list, the column name that points to
  ;; the root.id distribution key and might be named root_id or something.
  ;;
  ;; Then we only need to specifying USING the intermediate tables, the last
  ;; entry gives us the data we need to backfill our tables.
  ;;
  (let* ((fkey     (car (last fkey-list)))
         (dist-key (column-name (citus-distributed-rule-using rule)))
         (dist-key-pos (position dist-key
                                 (fkey-foreign-columns fkey)
                                 :test #'string=))
         (fkey-table-dist-key (nth dist-key-pos (fkey-columns fkey)))
         (from-table-list (butlast (mapcar #'fkey-foreign-table fkey-list))))
    (make-citus-distributed-rule :table (fkey-table (first fkey-list))
                                  :using (make-column :name fkey-table-dist-key)
                                  :from from-table-list)))


;;;
;;; Apply a citus distribution rule to given table, and store the rule
;;; itself to the table-citus-rule slot so that we later know to generate a
;;; proper SELECT query that includes the backfilling.
;;;
(define-condition citus-rule-is-missing-from-list (error)
  ((rule  :initarg :rule :accessor citus-rule))
  (:report
   (lambda (err stream)
     (let ((*print-circle* nil))
       (format stream
               "Failed to add column ~s to table ~a for lack of a FROM clause in the distribute rule:~%    distribute ~a using ~a from ?"
               (column-name (citus-distributed-rule-using (citus-rule err)))
               (format-table-name (citus-distributed-rule-table (citus-rule err)))
               (format-table-name (citus-distributed-rule-table (citus-rule err)))
               (column-name (citus-distributed-rule-using (citus-rule err))))))))

(defgeneric apply-citus-rule (rule)
  (:documentation "Apply a Citus distribution RULE to given TABLE."))

(defmethod apply-citus-rule ((rule citus-reference-rule))
  ;; for a reference table, we have nothing to do really.
  (setf (table-citus-rule (citus-reference-rule-table rule)) rule)
  t)

(defmethod apply-citus-rule ((rule citus-distributed-rule))
  ;; ok now we need to check if the USING column exists or if we need to add
  ;; it to our model
  (setf (table-citus-rule (citus-distributed-rule-table rule)) rule)

  (let* ((table  (citus-distributed-rule-table rule))
         (column (find (column-name (citus-distributed-rule-using rule))
                       (table-field-list table)
                       :test #'string=
                       :key #'field-name)))
    (if column

        ;; add it to the PKEY definition, in first position
        (add-column-to-pkey table
                            (column-name (citus-distributed-rule-using rule)))

        ;; The column doesn't exist, we need to find it in the :FROM rule's
        ;; list. The :FROM slot of the rule is a list of tables to
        ;; "traverse" when backfilling the data. The list follows the
        ;; foreign-key relationships from TABLE to the source of the
        ;; distribution key.
        ;;
        ;; To find the column definition to add to the current TABLE, look
        ;; it up in the last entry of the FROM rule's list.
        (let* ((last-from-rule (car (last (citus-distributed-rule-from rule))))
               (column-definition
                (when last-from-rule
                  (find (column-name (citus-distributed-rule-using rule))
                        (table-field-list last-from-rule)
                        :test #'string=
                        :key #'column-name)))
               (new-column
                (when column-definition
                  (make-column :name (column-name column-definition)
                               :type-name (column-type-name column-definition)
                               :nullable (column-nullable column-definition)
                               :transform (column-transform column-definition)))))

          (if column-definition
              (progn
                ;;
                ;; Here also we need to add the new column to the PKEY
                ;; definition, in first position.
                ;;
                (add-column-to-pkey table (column-name new-column))

                ;;
                ;; We need to backfill the distribution key in the data,
                ;; which we're implementing with a JOIN when we SELECT from
                ;; the source table. We add the new field here.
                ;;
                (push new-column (table-field-list table))
                (push new-column (table-column-list table)))

              ;;
              ;; We don't have any table-field-list in the citus rule,
              ;; meaning that the distribute ... using ... clause is lacking
              ;; the FROM part, and we need it.
              ;;
              (error
               (make-condition 'citus-rule-is-missing-from-list :rule rule)))))))


(defun add-column-to-pkey (table column-name)
  "Add COLUMN in the first position of the TABLE's primary key index."
  (let* ((index  (find-if #'index-primary (table-index-list table)))
         (idxcol (when index
                   (find column-name (index-columns index) :test #'string=))))
    (when (and index (null idxcol))
      ;; add a new column
      (push column-name (index-columns index))
      ;; now remove origin schema sql and condef, we need to redo them
      (setf (index-sql index) nil)
      (setf (index-condef index) nil)

      ;; now tweak the fkey definitions that are using this index
      (loop :for fkey :in (index-fk-deps index)
         :do (push column-name (fkey-columns fkey))
         :do (push column-name (fkey-foreign-columns fkey))
         :do (setf (fkey-condef fkey) nil)))))


;;;
;;; Format a query for backfilling the data right from pgloader:
;;;
;;;   SELECT dist_key, * FROM source JOIN pivot ON ...
;;;
(defun format-citus-join-clause (table distribution-rule)
  "Format a JOIN clause to backfill the distribution key data in tables that
   are referencing (even indirectly) the main distribution table."
  (with-output-to-string (s)
    (loop :for current-table := table :then rel
       :for rel :in (citus-distributed-rule-from distribution-rule)
       :do (let* ((fkey
                   (find (ensure-unquoted (table-name rel))
                         (table-fkey-list current-table)
                         :test #'string=
                         :key (lambda (fkey)
                                (ensure-unquoted
                                 (table-name (fkey-foreign-table fkey))))))
                  (ftable (fkey-foreign-table fkey)))
             (format s
                     " JOIN ~s.~s"
                     (schema-source-name (table-schema ftable))
                     (table-source-name ftable))
             ;;
             ;; Skip the first column in the fkey definition, that's the
             ;; distribution key that was just added by pgloader: we don't
             ;; have it on the source database, we are going to create it on
             ;; the target database.
             ;;
             (loop :for first := t :then nil
                :for c :in (cdr (fkey-columns fkey))
                :for fc :in (cdr (fkey-foreign-columns fkey))
                :do (format s
                            " ~:[AND~;ON~] ~a.~a = ~a.~a"
                            first
                            (table-source-name (fkey-table fkey))
                            c
                            (table-source-name (fkey-foreign-table fkey))
                            fc))))))

(defun citus-format-sql-select (source-table target-table)
  "Return the SQL statement to use to fetch data from the COPY context,
   including backfilling the distribution key in related tables."

  ;;
  ;; SELECT from.id, id, ... from source join from-table ...
  ;;
  ;; So we must be careful to prefix the column names with the
  ;; proper table name, because of the join(s), and the first column
  ;; in the output is taken from the main FROM table (the last one
  ;; in the rule).
  ;;
  (let* ((last-from-rule
          (car (last (citus-distributed-rule-from
                      (table-citus-rule target-table)))))
         (cols
          (append (list
                   (format nil "~a.~a"
                           (table-name last-from-rule)
                           (column-name (first (table-field-list source-table)))))
                  (mapcar (lambda (field)
                            (format nil "~a.~a"
                                    (table-name source-table)
                                    (column-name field)))
                          (rest (table-field-list source-table)))))
         (joins
          (format-citus-join-clause source-table
                                    (table-citus-rule target-table))))
    (format nil
            "SELECT ~{~a::text~^, ~} FROM ~s.~s ~a"
            cols
            (schema-source-name (table-schema source-table))
            (table-source-name source-table)
            joins)))

;;;
;;; Predicate to see if a table needs backfilling
;;;
(defun citus-backfill-table-p (table)
  "Returns non-nil when given TABLE should be backfilled with the
   distribution key."
  (and (table-citus-rule table)
       (typep (table-citus-rule table) 'citus-distributed-rule)
       (not (null (citus-distributed-rule-from (table-citus-rule table))))))