02241130创建于 2021年6月9日历史提交
;;;
;;; Generic API for pgloader sources
;;; Methods for database source types (with introspection)
;;;

(in-package :pgloader.load)

;;;
;;; Prepare the PostgreSQL database before streaming the data into it.
;;;
(defmethod prepare-pgsql-database ((copy db-copy)
                                   (catalog catalog)
                                   &key
                                     truncate
                                     create-tables
                                     create-schemas
                                     drop-schema
                                     drop-indexes
                                     set-table-oids
                                     materialize-views
                                     foreign-keys
                                     include-drop)
  "Prepare the target PostgreSQL database: create tables casting datatypes
   from the MySQL definitions, prepare index definitions and create target
   tables for materialized views.

   That function mutates index definitions in ALL-INDEXES."
  (log-message :notice "Prepare PostgreSQL database.")

  (with-pgsql-transaction (:pgconn (target-db copy))

    (finalize-catalogs catalog (pgconn-variant (target-db copy)))

    (if create-tables
        (progn
          (when create-schemas
            (with-stats-collection ("Create Schemas" :section :pre
                                                     :use-result-as-read t
                                                     :use-result-as-rows t)
              (create-schemas catalog
                              :include-drop drop-schema
                              :client-min-messages :error)))

          ;; create new SQL types (ENUMs, SETs) if needed and before we
          ;; get to the table definitions that will use them
          (with-stats-collection ("Create SQL Types" :section :pre
                                                     :use-result-as-read t
                                                     :use-result-as-rows t)
            ;; some SQL types come from extensions (ip4r, hstore, etc)
            (create-extensions catalog
                               :include-drop include-drop
                               :if-not-exists t
                               :client-min-messages :error)

            (create-sqltypes catalog
                             :include-drop include-drop
                             :client-min-messages :error))

          ;; now the tables
          (with-stats-collection ("Create tables" :section :pre
                                                  :use-result-as-read t
                                                  :use-result-as-rows t)
            (create-tables catalog
                           :include-drop include-drop
                           :client-min-messages :error)))

        (progn
          ;; if we're not going to create the tables, now is the time to
          ;; remove the constraints: indexes, primary keys, foreign keys
          ;;
          ;; to be able to do that properly, get the constraints from
          ;; the pre-existing target database catalog
          (let* ((pgversion   (pgconn-major-version (target-db copy)))
                 (pgsql-catalog
                  (fetch-pgsql-catalog (db-name (target-db copy))
                                       :source-catalog catalog
                                       :pgversion pgversion)))
            (merge-catalogs catalog pgsql-catalog))

          ;; now the foreign keys and only then the indexes, because a
          ;; drop constraint on a primary key cascades to the drop of
          ;; any foreign key that targets the primary key
          (when foreign-keys
            (with-stats-collection ("Drop Foreign Keys" :section :pre
                                                        :use-result-as-read t
                                                        :use-result-as-rows t)
              (drop-pgsql-fkeys catalog :log-level :notice)))

          (when drop-indexes
            (with-stats-collection ("Drop Indexes" :section :pre
                                                   :use-result-as-read t
                                                   :use-result-as-rows t)
              ;; we want to error out early in case we can't DROP the
              ;; index, don't CASCADE
              (drop-indexes catalog :cascade nil :log-level :notice)))

          (when truncate
            (with-stats-collection ("Truncate" :section :pre
                                               :use-result-as-read t
                                               :use-result-as-rows t)
              (truncate-tables catalog)))))

    ;; Some database sources allow the same index name being used
    ;; against several tables, so we add the PostgreSQL table OID in the
    ;; index name, to differenciate. Set the table oids now.
    (when (and create-tables set-table-oids)
      (with-stats-collection ("Set Table OIDs" :section :pre
                                               :use-result-as-read t
                                               :use-result-as-rows t)
        (set-table-oids catalog :variant (pgconn-variant (target-db copy)))))

    ;; We might have to MATERIALIZE VIEWS
    (when (and create-tables materialize-views)
      (with-stats-collection ("Create MatViews Tables" :section :pre
                                                       :use-result-as-read t
                                                       :use-result-as-rows t)
        (create-views catalog
                      :include-drop include-drop
                      :client-min-messages :error))))

  ;; Citus Support
  ;;
  ;; We need a separate transaction here in some cases, because of the
  ;; distributed DDL support from Citus, to avoid the following error:
  ;;
  ;; ERROR Database error 25001: cannot establish a new connection for
  ;; placement 2299, since DDL has been executed on a connection that is in
  ;; use
  ;;
  (when (catalog-distribution-rules catalog)
    (with-pgsql-transaction (:pgconn (target-db copy))
      (with-stats-collection ("Citus Distribute Tables" :section :pre)
        (create-distributed-table (catalog-distribution-rules catalog)))))

  ;; log the catalog we just fetched and (maybe) merged
  (log-message :data "CATALOG: ~s" catalog))


(defmethod complete-pgsql-database ((copy db-copy)
                                    (catalog catalog)
                                    pkeys
                                    &key
                                      foreign-keys
                                      create-indexes
                                      create-triggers
                                      reset-sequences)
  "After loading the data into PostgreSQL, we can now reset the sequences
     and declare foreign keys."
  ;;
  ;; Now Reset Sequences, the good time to do that is once the whole data
  ;; has been imported and once we have the indexes in place, as max() is
  ;; able to benefit from the indexes. In particular avoid doing that step
  ;; while CREATE INDEX statements are in flight (avoid locking).
  ;;
  (log-message :notice "Completing PostgreSQL database.")

  (when reset-sequences
    (reset-sequences (clone-connection (target-db copy)) catalog))

  (handler-case
      (with-pgsql-transaction (:pgconn (clone-connection (target-db copy)))
        ;;
        ;; Turn UNIQUE indexes into PRIMARY KEYS now
        ;;
        (when create-indexes
          (pgsql-execute-with-timing :post "Primary Keys" pkeys
                                     :log-level :notice))

        ;;
        ;; Foreign Key Constraints
        ;;
        ;; We need to have finished loading both the reference and the
        ;; refering tables to be able to build the foreign keys, so wait
        ;; until all tables and indexes are imported before doing that.
        ;;
        (when foreign-keys
          (create-pgsql-fkeys catalog
                              :section :post
                              :label "Create Foreign Keys"
                              :log-level :notice))

        ;;
        ;; Triggers and stored procedures -- includes special default values
        ;;
        (when create-triggers
          (create-triggers catalog
                           :section :post
                           :label "Create Triggers"))

        ;;
        ;; Add schemas that needs to be in the search_path to the database
        ;; search_path, when using PostgreSQL. Redshift doesn't know how to
        ;; do that, unfortunately.
        ;;
        (unless (eq :redshift (pgconn-variant (target-db copy)))
          (add-to-search-path catalog
                              :section :post
                              :label "Set Search Path"))

        ;;
        ;; And now, comments on tables and columns.
        ;;
        (comment-on-tables-and-columns catalog
                                       :section :post
                                       :label "Install Comments"))

    (postgresql-unavailable (condition)

      (log-message :error "~a" condition)
      (log-message :error
                   "Complete PostgreSQL database reconnecting to PostgreSQL.")

      ;; in order to avoid Socket error in "connect": ECONNREFUSED if we
      ;; try just too soon, wait a little
      (sleep 2)


      ;;
      ;; Reset Sequence can be done several times safely, and the rest of the
      ;; operations run in a single transaction, so if the connection was lost,
      ;; nothing has been done. Retry.
      ;;
      (complete-pgsql-database copy
                               catalog
                               pkeys
                               :foreign-keys foreign-keys
                               :create-indexes create-indexes
                               :create-triggers create-triggers
                               :reset-sequences reset-sequences))))


(defun process-catalog (copy catalog &key alter-table alter-schema distribute)
  "Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause
   rewriting, pgloader level alter schema and alter table commands."
  (log-message :info "Processing source catalogs")

  ;; cast the catalog into something PostgreSQL can work on
  (cast catalog)

  ;; support code for index filters (where clauses)
  (process-index-definitions catalog :sql-dialect (class-name (class-of copy)))

  ;; we may have to alter schemas
  (when alter-schema
    (alter-schema catalog alter-schema))

  ;; if asked, now alter the catalog with given rules: the alter-table
  ;; keyword parameter actually contains a set of alter table rules.
  (when alter-table
    (alter-table catalog alter-table))

  ;; we also support schema changes necessary for Citus distribution
  (when distribute
    (log-message :info "Applying distribution rules")
    (setf (catalog-distribution-rules catalog)
          (citus-distribute-schema catalog distribute))))


;;;
;;; Generic enough implementation of the copy-database method.
;;;
(defmethod copy-database ((copy db-copy)
			  &key
                            (on-error-stop    *on-error-stop*)
                            (worker-count     4)
                            (concurrency      1)
                            (multiple-readers nil)
                            max-parallel-create-index
			    (truncate         nil)
			    (disable-triggers nil)
			    (data-only        nil)
			    (schema-only      nil)
                            (create-schemas   t)
			    (create-tables    t)
			    (include-drop     t)
                            (drop-schema      nil)
			    (create-indexes   t)
                            (index-names      :uniquify)
			    (reset-sequences  t)
			    (foreign-keys     t)
                            (reindex          nil)
                            (after-schema     nil)
                            distribute
			    including
			    excluding
                            set-table-oids
                            alter-table
                            alter-schema
			    materialize-views)
  "Export database source data and Import it into PostgreSQL"
  (log-message :log "Migrating from ~a" (source-db copy))
  (log-message :log "Migrating into ~a" (target-db copy))
  (let* ((*on-error-stop* on-error-stop)
         (copy-data      (or data-only (not schema-only)))
         (create-ddl     (or schema-only (not data-only)))
         (create-tables  (and create-tables create-ddl))
         (create-schemas (and create-schemas create-ddl))
         ;; foreign keys has a special meaning in data-only mode
         (foreign-keys   (if (eq :redshift (pgconn-variant (target-db copy)))
                             nil
                             foreign-keys))
         (drop-indexes   (if (eq :redshift (pgconn-variant (target-db copy)))
                             nil
                             (or reindex
                                 (and include-drop create-ddl))))
         (create-indexes (if (eq :redshift (pgconn-variant (target-db copy)))
                             nil
                             (or reindex
                                 (and create-indexes drop-indexes create-ddl))))

         (reset-sequences (if (eq :redshift (pgconn-variant (target-db copy)))
                              nil
                              reset-sequences))

         (*preserve-index-names*
          (or (eq :preserve index-names)
              ;; if we didn't create the tables, we are re-installing the
              ;; pre-existing indexes
              (not create-tables)))

         (copy-kernel  (make-kernel worker-count))
         (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
         (catalog      (handler-case
                           (fetch-metadata
                            copy
                            (make-catalog
                             :name (typecase (source-db copy)
                                     (db-connection
                                      (db-name (source-db copy)))
                                     (fd-connection
                                      (pathname-name
                                       (fd-path (source-db copy))))))
                            :materialize-views materialize-views
                            :create-indexes create-indexes
                            :foreign-keys foreign-keys
                            :including including
                            :excluding excluding)
                         (mssql::mssql-error (e)
                           (log-message :error "MSSQL ERROR: ~a" e)
                           (log-message :log "You might need to review the FreeTDS protocol version in your freetds.conf file, see http://www.freetds.org/userguide/choosingtdsprotocol.htm")
                           (return-from copy-database))
                         #+pgloader-image
                         (condition (e)
                           (log-message :error
                                        "~a: ~a"
                                        (conn-type (source-db copy))
                                        e)
                           (return-from copy-database))))
         pkeys
         (writers-count (make-hash-table :size (count-tables catalog)))
         (max-indexes   (when create-indexes
                          (max-indexes-per-table catalog)))
         (idx-kernel    (when (and max-indexes (< 0 max-indexes))
                          (make-kernel (or max-parallel-create-index
                                           max-indexes))))
         (idx-channel   (when idx-kernel
                          (let ((lp:*kernel* idx-kernel))
                            (lp:make-channel))))

         (task-count    0))

    ;; apply catalog level transformations to support the database migration
    ;; that's CAST rules, index WHERE clause rewriting and ALTER commands
    (handler-case
        (process-catalog copy catalog
                         :alter-table alter-table
                         :alter-schema alter-schema
                         :distribute distribute)

      #+pgloader-image
      ((or citus-rule-table-not-found citus-rule-is-missing-from-list) (e)
        (log-message :fatal "~a" e)
        (return-from copy-database))

      #+pgloader-image
      (condition (e)
        (log-message :fatal "Failed to process catalogs: ~a" e)
        (return-from copy-database)))

    ;; if asked, first drop/create the tables on the PostgreSQL side
    (handler-case
        (progn
          (prepare-pgsql-database copy
                                  catalog
                                  :truncate truncate
                                  :create-tables create-tables
                                  :create-schemas create-schemas
                                  :drop-indexes drop-indexes
                                  :drop-schema drop-schema
                                  :include-drop include-drop
                                  :foreign-keys foreign-keys
                                  :set-table-oids set-table-oids
                                  :materialize-views materialize-views)

          ;; if there's an AFTER SCHEMA DO/EXECUTE command, now is the time
          ;; to run it.
          (when after-schema
            (pgloader.parser::execute-sql-code-block (target-db copy)
                                                     :pre
                                                     after-schema
                                                     "after schema")))
      ;;
      ;; In case some error happens in the preparatory transaction, we
      ;; need to stop now and refrain from trying to load the data into
      ;; an incomplete schema.
      ;;
      (cl-postgres:database-error (e)
        (declare (ignore e))		; a log has already been printed
        (log-message :fatal "Failed to create the schema, see above.")

        ;; we might have some cleanup to do...
        (cleanup copy catalog :materialize-views materialize-views)

        (return-from copy-database)))

    (loop
       :for table :in (append (table-list catalog)
                              ;; when materialized views are not supported,
                              ;; view-list is empty here
                              (view-list catalog))

       :do (let ((table-source (instanciate-table-copy-object copy table)))
             ;; first COPY the data from source to PostgreSQL, using copy-kernel
             (if (not copy-data)
                 ;; start indexing straight away then
                 (when create-indexes
                   (alexandria:appendf
                    pkeys
                    (create-indexes-in-kernel (target-db copy)
                                              table
                                              idx-kernel
                                              idx-channel)))

                 ;; prepare the writers-count hash-table, as we start
                 ;; copy-from, we have concurrency tasks writing.
                 (progn                 ; when copy-data
                   (setf (gethash table writers-count) concurrency)

                   (incf task-count
                         (copy-from table-source
                                    :concurrency concurrency
                                    :multiple-readers multiple-readers
                                    :kernel copy-kernel
                                    :channel copy-channel
                                    :on-error-stop on-error-stop
                                    :disable-triggers disable-triggers))))))

    ;; now end the kernels
    ;; and each time a table is done, launch its indexing
    (when copy-data
      (let ((lp:*kernel* copy-kernel))
        (with-stats-collection ("COPY Threads Completion" :section :post
                                                          :use-result-as-read t
                                                          :use-result-as-rows t)
          (loop :repeat task-count
             :do (destructuring-bind (task table seconds)
                     (lp:receive-result copy-channel)
                   (log-message :debug
                                "Finished processing ~a for ~s ~50T~6$s"
                                task (format-table-name table) seconds)
                   (when (eq :writer task)
                     ;;
                     ;; Start the CREATE INDEX parallel tasks only when
                     ;; the data has been fully copied over to the
                     ;; corresponding table, that's when the writers
                     ;; count is down to zero.
                     ;;
                     (decf (gethash table writers-count))
                     (log-message :debug "writers-counts[~a] = ~a"
                                  (format-table-name table)
                                  (gethash table writers-count))

                     (when (and create-indexes
                                (zerop (gethash table writers-count)))
                       (log-message :notice "DONE copying ~a"
                                    (format-table-name table))
                       (alexandria:appendf
                        pkeys
                        (create-indexes-in-kernel (target-db copy)
                                                  table
                                                  idx-kernel
                                                  idx-channel)))))
             :finally (progn
                        (lp:end-kernel :wait nil)
                        (return worker-count))))))

    (log-message :info "Done with COPYing data, waiting for indexes")

    (when create-indexes
      (let ((lp:*kernel* idx-kernel))
        ;; wait until the indexes are done being built...
        ;; don't forget accounting for that waiting time.
        (with-stats-collection ("Index Build Completion" :section :post
                                                         :use-result-as-read t
                                                         :use-result-as-rows t)
          (loop :for count :below (count-indexes catalog)
             :do (lp:receive-result idx-channel))
          (lp:end-kernel :wait t)
          (log-message :info "Done waiting for indexes")
          (count-indexes catalog))))

    ;;
    ;; Complete the PostgreSQL database before handing over.
    ;;
    (complete-pgsql-database copy
                             catalog
                             pkeys
                             :foreign-keys foreign-keys
                             :create-indexes create-indexes
                             ;; only create triggers (for default values)
                             ;; when we've been responsible for creating the
                             ;; tables -- otherwise assume the schema is
                             ;; good as it is
                             :create-triggers create-tables
                             :reset-sequences reset-sequences)

    ;;
    ;; Time to cleanup!
    ;;
    (cleanup copy catalog :materialize-views materialize-views)))