"""
-------------------------------------------------------------------------
This file is part of the RAGSDK project.
Copyright (c) 2025 Huawei Technologies Co.,Ltd.
RAGSDK is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
-------------------------------------------------------------------------
"""
import os
from typing import List
import torch
import numpy as np
from langchain_core.embeddings import Embeddings
from loguru import logger
from transformers import AutoTokenizer, is_torch_npu_available, AutoModel
from mx_rag.utils.common import validate_params, MAX_DEVICE_ID, TEXT_MAX_LEN, validate_list_str, \
BOOL_TYPE_CHECK_TIP, STR_MAX_LEN, MAX_PATH_LENGTH, MAX_BATCH_SIZE, GB, get_model_max_input_length
from mx_rag.utils.file_check import SecDirCheck, safetensors_check
try:
import torch_npu
torch.npu.set_compile_mode(jit_compile=False)
except ImportError as e:
logger.warning(f"Failed to import torch_npu: {e}. TextEmbedding will run on cpu.")
except Exception as e:
logger.error(f"Unexpected error while importing torch_npu: {e}. TextEmbedding will run on cpu.")
class SparseEmbedding(Embeddings):
@validate_params(
model_path=dict(validator=lambda x: isinstance(x, str) and 0 <= len(x) <= MAX_PATH_LENGTH,
message="param must be str and str length range [0, 1024]"),
dev_id=dict(validator=lambda x: isinstance(x, int) and 0 <= x <= MAX_DEVICE_ID,
message="param must be int and value range [0, 63]"),
use_fp16=dict(validator=lambda x: isinstance(x, bool), message=BOOL_TYPE_CHECK_TIP),
)
def __init__(self,
model_path: str,
dev_id: int = 0,
use_fp16: bool = True):
self.model_path = model_path
SecDirCheck(self.model_path, 10 * GB).check()
safetensors_check(model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path, local_files_only=True)
self.model = AutoModel.from_pretrained(model_path, local_files_only=True)
if use_fp16:
self.model = self.model.half()
try:
if is_torch_npu_available():
self.model.to(f'npu:{dev_id}')
except ImportError:
logger.warning('unable to import torch_npu, please check if torch_npu is properly installed. '
'currently running on cpu.')
self.model = self.model.eval()
self.sparse_linear = torch.nn.Linear(in_features=self.model.config.hidden_size,
out_features=1).to(self.model.device)
sparse_model_path = os.path.join(self.model_path, 'sparse_linear.pt')
sparse_state_dict = torch.load(sparse_model_path, map_location=self.model.device, weights_only=True)
self.sparse_linear.load_state_dict(sparse_state_dict)
@staticmethod
def create(**kwargs):
if "model_path" not in kwargs or not isinstance(kwargs.get("model_path"), str):
logger.error("model_path param error. ")
return None
return SparseEmbedding(**kwargs)
@validate_params(
texts=dict(validator=lambda x: validate_list_str(x, [1, TEXT_MAX_LEN], [1, STR_MAX_LEN]),
message="param must meets: Type is List[str], list length range [1, 1000 * 1000], "
"str length range [1, 128 * 1024 * 1024]"),
batch_size=dict(validator=lambda x: isinstance(x, int) and 1 <= x <= MAX_BATCH_SIZE,
message=f"param must be int and value range [1, {MAX_BATCH_SIZE}]")
)
def embed_documents(self,
texts: List[str],
batch_size: int = 32) -> List[dict[int, float]]:
max_input_length = get_model_max_input_length(self.model.config)
if max_input_length == 0:
raise ValueError("get model max input length failed")
result = self._encode(texts, batch_size, max_input_length)
if len(result) == 0:
raise ValueError("embedding documents text error")
return result
@validate_params(
text=dict(validator=lambda x: isinstance(x, str) and 1 <= len(x) <= STR_MAX_LEN,
message="param must be str and value range [1, 128 * 1024 * 1024]")
)
def embed_query(self, text: str) -> dict[int, float]:
embeddings = self.embed_documents([text])
if not embeddings:
raise ValueError("embedding query text failed")
return embeddings[0]
def _process_token_weights(self, token_weights: np.ndarray, input_ids: list):
result = {}
unused_tokens = {self.tokenizer.cls_token_id, self.tokenizer.eos_token_id, self.tokenizer.pad_token_id,
self.tokenizer.unk_token_id}
for w, idx in zip(token_weights, input_ids):
if idx not in unused_tokens and w > 0:
idx = int(idx)
if idx not in result:
result[idx] = w
elif w > result[idx]:
result[idx] = w
return result
def _process_batch(self, batch_texts: List[str], max_length: int):
all_lexical_weights = []
batch_data = self.tokenizer(
batch_texts,
truncation=True,
padding=True,
return_tensors='pt',
max_length=max_length,
).to(self.model.device)
with torch.no_grad():
last_hidden_state = self.model(**batch_data, return_dict=True).last_hidden_state
sparse_vecs = torch.relu(self.sparse_linear(last_hidden_state))
token_weights = sparse_vecs.squeeze(-1)
all_lexical_weights.extend(list(map(self._process_token_weights, token_weights.detach().cpu().numpy(),
batch_data['input_ids'].cpu().numpy().tolist())))
return all_lexical_weights
def _encode(self, texts: List[str], batch_size: int = 32, max_length: int = 512) -> List[dict]:
result = []
for start_index in range(0, len(texts), batch_size):
batch_texts = texts[start_index:start_index + batch_size]
batch_result = self._process_batch(batch_texts, max_length)
result.extend(batch_result)
return result