02241130创建于 2021年6月9日历史提交
;;;
;;; Tools to handle MySQL data fetching
;;;

(in-package :pgloader.source.csv)

(defclass csv-connection (md-connection) ())

(defmethod initialize-instance :after ((csv csv-connection) &key)
  "Assign the type slot to sqlite."
  (setf (slot-value csv 'type) "csv"))

;;;
;;; Implementing the pgloader source API
;;;
(defclass copy-csv (md-copy)
  ((source-type :accessor source-type	  ; one of :inline, :stdin, :regex
		:initarg :source-type)	  ;  or :filename
   (separator   :accessor csv-separator	  ; CSV separator
	        :initarg :separator	  ;
	        :initform nil)            ;
   (newline     :accessor csv-newline     ; CSV line ending
                :initarg :newline         ;
                :initform #\Newline)
   (quote       :accessor csv-quote	  ; CSV quoting
	        :initarg :quote		  ;
	        :initform cl-csv:*quote*) ;
   (escape      :accessor csv-escape	  ; CSV quote escaping
	        :initarg :escape	  ;
	        :initform cl-csv:*quote-escape*)
   (escape-mode :accessor csv-escape-mode ; CSV quote escaping mode
	        :initarg :escape-mode     ;
	        :initform cl-csv::*escape-mode*)
   (trim-blanks :accessor csv-trim-blanks ; CSV blank and NULLs
		:initarg :trim-blanks	  ;
		:initform t))
  (:documentation "pgloader CSV Data Source"))

(defmethod clone-copy-for ((csv copy-csv) path-spec)
  "Create a copy of CSV for loading data from PATH-SPEC."
  (let ((csv-for-path-spec
         (change-class (call-next-method csv path-spec) 'copy-csv)))
    (loop :for slot-name :in '(source-type
                               separator
                               newline
                               quote
                               escape
                               escape-mode
                               trim-blanks)
       :do (when (slot-boundp csv slot-name)
             (setf (slot-value csv-for-path-spec slot-name)
                   (slot-value csv slot-name))))

    ;; return the new instance!
    csv-for-path-spec))

;;;
;;; Read a file format in CSV format, and call given function on each line.
;;;
(defmethod parse-header ((csv copy-csv))
  "Parse the header line given csv setup."
  ;; a field entry is a list of field name and options
  (with-connection (cnx (source csv)
                        :direction :input
                        :external-format (encoding csv)
                        :if-does-not-exist nil)
    (let ((input (md-strm cnx)))
      (loop :repeat (skip-lines csv) :do (read-line input nil nil))
      (let* ((header-line (read-line input nil nil))
             (field-name-list
              (mapcar #'list            ; we need each field to be a list
                      (car              ; parsing a single line
                       (cl-csv:read-csv header-line
                                        :separator (csv-separator csv)
                                        :quote (csv-quote csv)
                                        :escape (csv-escape csv)
                                        :unquoted-empty-string-is-nil t
                                        :quoted-empty-string-is-nil nil
                                        :trim-outer-whitespace (csv-trim-blanks csv)
                                        :newline (csv-newline csv))))))
        (log-message :notice "Parsed header columns ~s" (fields csv))
        (setf (fields csv) field-name-list )))))

(defmethod process-rows ((csv copy-csv) stream process-fn)
  "Process rows from STREAM according to COPY specifications and PROCESS-FN."
  (let ((separator (csv-separator csv))
        (quote     (csv-quote csv))
        (escape    (csv-escape csv)))
    (unless separator
      ;; try to guess the CSV format
      (let ((nb-columns (length (columns csv))))
        (destructuring-bind (&key
                             ((:separator sep) #\Tab)
                             ((:quote q)       cl-csv:*quote*)
                             ((:escape esc)    cl-csv:*quote-escape*))
            (guess-csv-params stream nb-columns)
          (setf separator sep
                quote     q
                escape    esc)

          (log-message :notice "Guessed following CSV parameters:~%  ~a"
                       (format nil "~
  fields terminated by '~a',
  fields optionally enclosed by '~a',
  fields escaped by '~a'"
                               sep q esc)))))

    (handler-case
        (handler-bind ((cl-csv:csv-parse-error
                        #'(lambda (c)
                            (log-message :error "~a" c)
                            (update-stats :data (target csv) :errs 1)
                            (cl-csv::continue))))
          (cl-csv:read-csv stream
                           :row-fn process-fn
                           :separator separator
                           :quote quote
                           :escape escape
                           :escape-mode (csv-escape-mode csv)
                           :unquoted-empty-string-is-nil t
                           :quoted-empty-string-is-nil nil
                           :trim-outer-whitespace (csv-trim-blanks csv)
                           :newline (csv-newline csv)))
      (condition (e)
        (progn
          (log-message :fatal "~a" e)
          (update-stats :data (target csv) :errs 1))))))