;;;
;;; Tools to handle MySQL data fetching
;;;

(in-package :pgloader.source.mysql)

;;;
;;; Implement the specific methods
;;;
(defmethod concurrency-support ((mysql copy-mysql) concurrency)
  "Splits the read work thanks WHERE clauses when possible and relevant,
   return nil if we decide to read all in a single thread, and a list of as
   many copy-mysql instances as CONCURRENCY otherwise. Each copy-mysql
   instance in the returned list embeds specifications about how to read
   only its partition of the source data."
  (unless (= 1 concurrency)
    (let* ((indexes (table-index-list (target mysql)))
           (pkey    (first (remove-if-not #'index-primary indexes)))
           (pcol    (when pkey (first (index-columns pkey))))
           (coldef  (when pcol
                      (find pcol
                            (table-column-list (target mysql))
                            :key #'column-name
                            :test #'string=)))
           (ptype   (when (and coldef (stringp (column-type-name coldef)))
                      (column-type-name coldef))))
      (when (member ptype (list "integer" "bigint" "serial" "bigserial")
                    :test #'string=)
        ;; the table has a primary key over a integer data type we are able
        ;; to generate WHERE clause and range index scans.
        (with-connection (*connection* (source-db mysql))
          (let* ((col (mysql-column-name
                       (nth (position coldef (table-column-list (target mysql)))
                            (fields mysql))))
                 (sql (format nil "select min(`~a`), max(`~a`) from `~a`"
                              col col (table-source-name (source mysql)))))
            (destructuring-bind (min max)
                (let ((result (first (mysql-query sql))))
                  ;; result is (min max), or (nil nil) if table is empty
                  (if (or (null (first result))
                          (null (second result)))
                      result
                      (mapcar #'parse-integer result)))
              ;; generate a list of ranges from min to max
              (when (and min max)
                (let ((range-list (split-range min max *rows-per-range*)))
                  (unless (< (length range-list) concurrency)
                    ;; affect those ranges to each reader, we have CONCURRENCY
                    ;; of them
                    (let ((partitions (distribute range-list concurrency)))
                      (loop :for part :in partitions :collect
                         (make-instance 'copy-mysql
                                        :source-db  (clone-connection
                                                     (source-db mysql))
                                        :target-db  (target-db mysql)
                                        :source     (source mysql)
                                        :target     (target mysql)
                                        :fields     (fields mysql)
                                        :columns    (columns mysql)
                                        :transforms (transforms mysql)
                                        :encoding   (encoding mysql)
                                        :range-list (cons col part))))))))))))))

(defun call-with-encoding-handler (copy-mysql table-name func)
  (handler-bind
      ;; Newer versions of qmynd handle the babel error and signal this one
      ;; with more details and an improved reporting:
      ((qmynd-impl::decoding-error
        #'(lambda (c)
            (update-stats :data (target copy-mysql) :errs 1)
            (log-message :error "~a" c)
            (invoke-restart 'qmynd-impl::use-nil)))

       ;; Older versions of qmynd reported babel errors directly
       (babel-encodings:end-of-input-in-character
        #'(lambda (c)
            (update-stats :data (target copy-mysql) :errs 1)
            (log-message :error "~a" c)
            (invoke-restart 'qmynd-impl::use-nil)))

       (babel-encodings:character-decoding-error
        #'(lambda (c)
            (update-stats :data (target copy-mysql) :errs 1)
            (let* ((encoding (babel-encodings:character-coding-error-encoding c))
                   (position (babel-encodings:character-coding-error-position c))
                   (buffer   (babel-encodings:character-coding-error-buffer c))
                   (character
                    (when (and position (< position (length buffer)))
                      (aref buffer position))))
              (log-message :error
                           "While decoding text data from MySQL table ~s: ~%~
Illegal ~a character starting at position ~a~@[: ~a~].~%"
                           table-name encoding position character))
            (invoke-restart 'qmynd-impl::use-nil))))
    (funcall func)))

(defmacro with-encoding-handler ((copy-mysql table-name) &body forms)
  `(call-with-encoding-handler ,copy-mysql ,table-name (lambda () ,@forms)))

(defmethod map-rows ((mysql copy-mysql) &key process-row-fn)
  "Extract MySQL data and call PROCESS-ROW-FN function with a single
   argument (a list of column values) for each row."
  (let ((table-name (table-source-name (source mysql)))
        (qmynd:*mysql-encoding*
         (when (encoding mysql)
           #+sbcl (encoding mysql)
           #+ccl  (ccl:external-format-character-encoding (encoding mysql)))))

    (with-connection (*connection* (source-db mysql))
      (when qmynd:*mysql-encoding*
        (log-message :notice "Force encoding to ~a for ~a"
                     qmynd:*mysql-encoding* table-name))
      (let* ((cols (get-column-list mysql))
             (sql  (format nil "SELECT ~{~a~^, ~} FROM `~a`" cols table-name)))

        (if (range-list mysql)
            ;; read a range at a time, in a loop
            (destructuring-bind (colname . ranges) (range-list mysql)
              (loop :for (min max) :in ranges :do
                 (let ((sql (format nil "~a WHERE `~a` >= ~a AND `~a` < ~a"
                                    sql colname min colname max)))
                   (with-encoding-handler (mysql table-name)
                     (mysql-query sql
                                  :row-fn process-row-fn
                                  :result-type 'vector)))))

            ;; read it all, no WHERE clause
            (with-encoding-handler (mysql table-name)
              (mysql-query sql
                           :row-fn process-row-fn
                           :result-type 'vector)))))))



(defmethod copy-column-list ((mysql copy-mysql))
  "We are sending the data in the MySQL columns ordering here."
  (mapcar #'apply-identifier-case (mapcar #'mysql-column-name (fields mysql))))


(defmethod fetch-metadata ((mysql copy-mysql)
                           (catalog catalog)
                           &key
                             materialize-views
                             (create-indexes   t)
                             (foreign-keys     t)
                             including
                             excluding)
  "MySQL introspection to prepare the migration."
  (let ((schema        (add-schema catalog (catalog-name catalog)
                                   :in-search-path t))
        (including     (filter-list-to-where-clause mysql including))
        (excluding     (filter-list-to-where-clause mysql excluding :not t)))
    (with-stats-collection ("fetch meta data"
                            :use-result-as-rows t
                            :use-result-as-read t
                            :section :pre)
      (with-connection (*connection* (source-db mysql))
        ;; If asked to MATERIALIZE VIEWS, now is the time to create them in
        ;; MySQL, when given definitions rather than existing view names.
        (when (and materialize-views (not (eq :all materialize-views)))
          (create-matviews materialize-views mysql))

        ;; fetch table and columns metadata, covering table and column comments
        (fetch-columns schema mysql
                       :including including
                       :excluding excluding)
        ;; fetch partition metadata
        (fetch-partitions schema mysql
                         :including including
                         :excluding excluding)
        ;; fetch view (and their columns) metadata, covering comments too
        (let* ((view-names (unless (eq :all materialize-views)
                             (mapcar #'matview-source-name materialize-views)))
               (including
                (loop :for (schema-name . view-name) :in view-names
                   :collect (make-string-match-rule :target view-name)))
               (including-clause (filter-list-to-where-clause mysql including)))
          (cond (view-names
                 (fetch-columns schema mysql
                                :including including-clause
                                :excluding excluding
                                :table-type :view))

                ((eq :all materialize-views)
                 (fetch-columns schema mysql :table-type :view))))

        (when foreign-keys
          (fetch-foreign-keys schema mysql
                              :including including
                              :excluding excluding))

        (when create-indexes
          (fetch-indexes schema mysql
                         :including including
                         :excluding excluding))

        ;; return how many objects we're going to deal with in total
        ;; for stats collection
        (+ (count-tables catalog)
           (count-views catalog)
           (count-indexes catalog)
           (count-fkeys catalog))))

    catalog))

(defmethod cleanup ((mysql copy-mysql) (catalog catalog) &key materialize-views)
  "When there is a PostgreSQL error at prepare-pgsql-database step, we might
   need to clean-up any view created in the MySQL connection for the
   migration purpose."
  (when materialize-views
    (with-connection (*connection* (source-db mysql))
      (drop-matviews materialize-views mysql))))

(defvar *decoding-as* nil
  "Special per-table encoding/decoding overloading rules for MySQL.")

(defun apply-decoding-as-filters (table-name filters)
  "Return a generialized boolean which is non-nil only if TABLE-NAME matches
   one of the FILTERS."
  (flet ((apply-filter (filter) (matches filter table-name)))
    (some #'apply-filter filters)))

(defmethod instanciate-table-copy-object ((copy copy-mysql) (table table))
  "Create an new instance for copying TABLE data."
  (let ((new-instance (change-class (call-next-method copy table) 'copy-mysql)))
    (setf (encoding new-instance)
          ;; force the data encoding when asked to
          (when *decoding-as*
            (loop :for (encoding . filters) :in *decoding-as*
               :when (apply-decoding-as-filters (table-name table) filters)
               :return encoding)))
    new-instance))