;;;
;;; Tools to handle MySQL data fetching
;;;
(in-package :pgloader.source.csv)
(defclass csv-connection (md-connection) ())
(defmethod initialize-instance :after ((csv csv-connection) &key)
"Assign the type slot to sqlite."
(setf (slot-value csv 'type) "csv"))
;;;
;;; Implementing the pgloader source API
;;;
(defclass copy-csv (md-copy)
((source-type :accessor source-type ; one of :inline, :stdin, :regex
:initarg :source-type) ; or :filename
(separator :accessor csv-separator ; CSV separator
:initarg :separator ;
:initform nil) ;
(newline :accessor csv-newline ; CSV line ending
:initarg :newline ;
:initform #\Newline)
(quote :accessor csv-quote ; CSV quoting
:initarg :quote ;
:initform cl-csv:*quote*) ;
(escape :accessor csv-escape ; CSV quote escaping
:initarg :escape ;
:initform cl-csv:*quote-escape*)
(escape-mode :accessor csv-escape-mode ; CSV quote escaping mode
:initarg :escape-mode ;
:initform cl-csv::*escape-mode*)
(trim-blanks :accessor csv-trim-blanks ; CSV blank and NULLs
:initarg :trim-blanks ;
:initform t))
(:documentation "pgloader CSV Data Source"))
(defmethod clone-copy-for ((csv copy-csv) path-spec)
"Create a copy of CSV for loading data from PATH-SPEC."
(let ((csv-for-path-spec
(change-class (call-next-method csv path-spec) 'copy-csv)))
(loop :for slot-name :in '(source-type
separator
newline
quote
escape
escape-mode
trim-blanks)
:do (when (slot-boundp csv slot-name)
(setf (slot-value csv-for-path-spec slot-name)
(slot-value csv slot-name))))
;; return the new instance!
csv-for-path-spec))
;;;
;;; Read a file format in CSV format, and call given function on each line.
;;;
(defmethod parse-header ((csv copy-csv))
"Parse the header line given csv setup."
;; a field entry is a list of field name and options
(with-connection (cnx (source csv)
:direction :input
:external-format (encoding csv)
:if-does-not-exist nil)
(let ((input (md-strm cnx)))
(loop :repeat (skip-lines csv) :do (read-line input nil nil))
(let* ((header-line (read-line input nil nil))
(field-name-list
(mapcar #'list ; we need each field to be a list
(car ; parsing a single line
(cl-csv:read-csv header-line
:separator (csv-separator csv)
:quote (csv-quote csv)
:escape (csv-escape csv)
:unquoted-empty-string-is-nil t
:quoted-empty-string-is-nil nil
:trim-outer-whitespace (csv-trim-blanks csv)
:newline (csv-newline csv))))))
(log-message :notice "Parsed header columns ~s" (fields csv))
(setf (fields csv) field-name-list )))))
(defmethod process-rows ((csv copy-csv) stream process-fn)
"Process rows from STREAM according to COPY specifications and PROCESS-FN."
(let ((separator (csv-separator csv))
(quote (csv-quote csv))
(escape (csv-escape csv)))
(unless separator
;; try to guess the CSV format
(let ((nb-columns (length (columns csv))))
(destructuring-bind (&key
((:separator sep) #\Tab)
((:quote q) cl-csv:*quote*)
((:escape esc) cl-csv:*quote-escape*))
(guess-csv-params stream nb-columns)
(setf separator sep
quote q
escape esc)
(log-message :notice "Guessed following CSV parameters:~% ~a"
(format nil "~
fields terminated by '~a',
fields optionally enclosed by '~a',
fields escaped by '~a'"
sep q esc)))))
(handler-case
(handler-bind ((cl-csv:csv-parse-error
#'(lambda (c)
(log-message :error "~a" c)
(update-stats :data (target csv) :errs 1)
(cl-csv::continue))))
(cl-csv:read-csv stream
:row-fn process-fn
:separator separator
:quote quote
:escape escape
:escape-mode (csv-escape-mode csv)
:unquoted-empty-string-is-nil t
:quoted-empty-string-is-nil nil
:trim-outer-whitespace (csv-trim-blanks csv)
:newline (csv-newline csv)))
(condition (e)
(progn
(log-message :fatal "~a" e)
(update-stats :data (target csv) :errs 1))))))