"""
-------------------------------------------------------------------------
This file is part of the RAGSDK project.
Copyright (c) 2025 Huawei Technologies Co.,Ltd.
RAGSDK is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
-------------------------------------------------------------------------
"""
from datetime import datetime, timedelta
from typing import Iterator
import xlrd
from langchain_community.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from loguru import logger
from openpyxl import load_workbook, Workbook
from openpyxl.cell import MergedCell
from mx_rag.document.loader.base_loader import BaseLoader as mxBaseLoader
from mx_rag.utils import file_check
from mx_rag.utils.common import validate_params, STR_TYPE_CHECK_TIP_1024, MAX_ROW_NUM, MAX_COL_NUM
OPENPYXL_EXTENSION = (".xlsx",)
XLRD_EXTENSION = (".xls",)
class ExcelLoader(BaseLoader, mxBaseLoader):
@validate_params(
line_sep=dict(validator=lambda x: isinstance(x, str) and 0 < len(x) <= 1024, message=STR_TYPE_CHECK_TIP_1024)
)
def __init__(self, file_path: str, line_sep: str = "**;"):
super().__init__(file_path)
self.line_sep = str(line_sep)
@staticmethod
def _exceltime_to_datetime(exceltime):
"""
将excel储存的时间格式转换为可读的格式
:param exceltime: (0,1)区间的浮点数,表示时间在一天中的位置
:return: 以 时:分 的格式返回字符串
"""
base_date = datetime(1899, 12, 30)
time = base_date + timedelta(exceltime)
return time.strftime("%H:%M")
@staticmethod
def _get_xlsx_blank_rows_and_cols(worksheet):
"""
功能:获取所有空行、空列的索引
"""
row_count = 0
col_count = 0
null_rows = {}
for row in worksheet.iter_rows(values_only=True):
row_count += 1
if all(not cell for cell in row):
null_rows[row_count] = True
blank_cols = {}
for col in worksheet.iter_cols(values_only=True):
col_count += 1
if all(not cell for cell in col):
blank_cols[col_count] = True
return null_rows, blank_cols
@staticmethod
def _get_xls_blank_rows_and_cols(worksheet):
"""
功能:获取所有空行、空列的索引
"""
null_rows = {}
blank_cols = {}
for i in range(worksheet.nrows):
row = worksheet.row_values(i)
if all(not cell for cell in row):
null_rows[i] = True
for i in range(worksheet.ncols):
col = worksheet.col_values(i)
if all(not cell for cell in col):
blank_cols[i] = True
return null_rows, blank_cols
@staticmethod
def _parse_xlsx_cell(sheet: Workbook, row: int, col: int):
"""
功能:读取xlsx cell值,如果是合并项,使用左上cell值替代
"""
cell = sheet.cell(row=row, column=col)
if not isinstance(cell, MergedCell):
return cell.value
for merged_range in sheet.merged_cells.ranges:
if cell.coordinate in merged_range:
cell = sheet.cell(row=merged_range.min_row, column=merged_range.min_col)
return cell.value
return cell.value
@staticmethod
def _parse_xls_cell(sheet, row: int, col: int):
"""
功能:读取xls cell值,如果是合并项,使用左上cell值替代
"""
cell = sheet.cell(rowx=row, colx=col)
if cell.value:
return cell.value
for crange in sheet.merged_cells:
rlo, rhi, clo, chi = crange
if rlo <= row < rhi and clo <= col < chi:
return sheet.cell(rowx=rlo, colx=clo).value
return cell.value
@staticmethod
def _get_xlsx_first_not_blank_row_and_col(ws):
"""
功能:读取xlsx 非空白首行、首列索引
"""
first_row = 1
first_col = 1
for row in ws.iter_rows(values_only=True):
if any(cell for cell in row):
break
first_row += 1
for col in ws.iter_cols(values_only=True):
if any(cell for cell in col):
break
first_col += 1
return first_row, first_col
@staticmethod
def _get_xls_first_not_blank_row_and_col(ws):
"""
功能:获取xls 非空白首行、首列索引
"""
first_row = 0
first_col = 0
for i in range(ws.nrows):
if any(cell for cell in ws.row_values(i)):
first_row = i
break
for i in range(ws.ncols):
if any(cell for cell in ws.col_values(i)):
first_col = i
break
return first_row, first_col
def lazy_load(self) -> Iterator[Document]:
"""
:返回:逐行读取表,返回 string list
"""
file_check.SecFileCheck(self.file_path, self.MAX_SIZE).check()
file_format = xlrd.inspect_format(self.file_path)
if not file_format or (file_format != "xlsx" and file_format != 'xls'):
raise ValueError('file type is not xlsx and xls')
if self.file_path.endswith(XLRD_EXTENSION):
return self._load_xls()
elif self.file_path.endswith(OPENPYXL_EXTENSION):
if self._is_zip_bomb():
raise ValueError(f"file is a risk of zip bombs")
else:
return self._load_xlsx()
else:
raise TypeError(f"'{self.file_path}' file type is not one of (xlsx, xls).")
def _get_xlsx_title(self, ws, first_row, first_col):
title = []
for col in range(first_col, ws.max_column + 1):
ti = self._parse_xlsx_cell(ws, first_row, col)
title.append(str(ti) if ti is not None else "")
return title
def _get_xls_title(self, ws, first_row, first_col):
title = []
for col in range(first_col, ws.ncols):
ti = str(self._parse_xls_cell(ws, first_row, col))
title.append(str(ti) if ti is not None else "")
return title
def _load_xlsx_one_sheet(self, ws, sheet_name):
"""
功能:读取一个xlsx表单的值,每行值以title:value;title:value....的格式
"""
lines = []
if ws.max_row > MAX_ROW_NUM:
logger.error(f"Exceeded maximum row limit of {MAX_ROW_NUM} in sheet '{sheet_name}'"
f" of file '{self.file_path}': {ws.max_row} rows found.")
return lines
if ws.max_column > MAX_COL_NUM:
logger.error(f"Exceeded maximum row limit of {MAX_COL_NUM} in sheet '{sheet_name}'"
f" of file '{self.file_path}': {ws.max_column} rows found.")
return lines
blank_rows, blank_cols = self._get_xlsx_blank_rows_and_cols(ws)
first_row, first_col = self._get_xlsx_first_not_blank_row_and_col(ws)
if ws.max_row - len(blank_rows.keys()) < 2:
return lines
title = self._get_xlsx_title(ws, first_row, first_col)
column_end = ws.max_column + 1
for row_index in range(first_row + 1, ws.max_row + 1):
if row_index in blank_rows.keys():
continue
text_line = ""
for col_index in range(1, column_end):
if col_index in blank_cols.keys():
continue
val = self._parse_xlsx_cell(ws, row_index, col_index)
ti = title[col_index - first_col]
if not val:
continue
text_line += str(ti).strip() + ":" + str(val).strip() + ";"
lines.append(text_line)
return lines
def _load_xls_one_sheet(self, ws):
"""
功能:读取一个xls表单的值,每行值以title:value;title:value....的格式
"""
lines = []
if ws.nrows > MAX_ROW_NUM:
logger.error(f"Exceeded maximum row limit of {MAX_ROW_NUM} in sheet '{ws.name}'"
f" of file '{self.file_path}': {ws.nrows} rows found.")
return lines
if ws.ncols > MAX_COL_NUM:
logger.error(f"Exceeded maximum row limit of {MAX_COL_NUM} in sheet '{ws.name}'"
f" of file '{self.file_path}': {ws.ncols} rows found.")
return lines
blank_rows, blank_cols = self._get_xls_blank_rows_and_cols(ws)
first_row, first_col = self._get_xls_first_not_blank_row_and_col(ws)
if ws.nrows - len(blank_rows.keys()) < 2:
logger.info(f"In file ['{self.file_path}'], sheet ['{ws.name}'],"
f" not enough valid rows (at least two rows required). ")
return lines
title = self._get_xls_title(ws, first_row, first_col)
ncols = ws.ncols
for row_index in range(first_row + 1, ws.nrows):
if row_index in blank_rows.keys():
continue
text_line = ""
for col_index in range(ncols):
if col_index in blank_cols.keys():
continue
val = self._parse_xls_cell(ws, row_index, col_index)
if not val:
continue
ti = title[col_index - first_col]
if ti in ["time"] and 0 <= float(val) <= 1:
text_line += str(ti) + ":" + str(self._exceltime_to_datetime(float(val))) + ";"
else:
text_line += str(ti).strip() + ":" + str(val).strip() + ";"
lines.append(text_line)
return lines
def _load_xls(self):
wb = None
try:
wb = xlrd.open_workbook(self.file_path, formatting_info=True)
if wb.nsheets > self.MAX_PAGE_NUM:
logger.error(f"file '{self.file_path}' sheets number more than limit")
return
for i in range(wb.nsheets):
ws = wb.sheet_by_index(i)
lines = self._load_xls_one_sheet(ws)
if not lines:
logger.info(f"In file ['{self.file_path}'] sheet ['{ws.name}'] is empty")
continue
for line in lines:
yield Document(page_content=line,
metadata={"source": self.file_path, "sheet": ws.name, "type": "text"})
except xlrd.biffh.XLRDError as e:
logger.error(f"Excel parsing error for file '{self.file_path}': {e}")
return
except IndexError as e:
logger.error(f"Sheet index error in file '{self.file_path}': {str(e)}")
return
except Exception as e:
logger.error(f"An error occurred while loading file '{self.file_path}': {e}")
return
finally:
if wb:
wb.release_resources()
logger.info(f"file '{self.file_path}' Loading completed")
def _load_xlsx(self):
wb = None
try:
wb = load_workbook(self.file_path, data_only=True, keep_links=False)
if len(wb.sheetnames) > self.MAX_PAGE_NUM:
logger.error(f"file '{self.file_path}' sheets number more than limit")
return
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
lines = self._load_xlsx_one_sheet(ws, sheet_name)
if not lines:
logger.info(f"In file ['{self.file_path}'] sheet ['{sheet_name}'] is empty")
continue
for line in lines:
yield Document(page_content=line, metadata={"source": self.file_path, "sheet": sheet_name})
except KeyError as e:
logger.error(f"Sheet not found in file '{self.file_path}': {str(e)}")
return
except Exception as e:
logger.error(f"An error occurred while loading file '{self.file_path}': {e}")
return
finally:
if wb:
wb.close()
logger.info(f"file '{self.file_path}' Loading completed")