import multiprocessing as mp
import os
import time
from multiprocessing import Event, Process
os.environ["VLLM_USE_MODELSCOPE"] = "True"
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
def clean_up():
import gc
import torch
from vllm.distributed.parallel_state import destroy_distributed_environment, destroy_model_parallel
destroy_model_parallel()
destroy_distributed_environment()
gc.collect()
torch.npu.empty_cache()
def run_prefill(prefill_done, process_close):
os.environ["ASCEND_RT_VISIBLE_DEVICES"] = "0"
from vllm import LLM, SamplingParams
from vllm.config import KVTransferConfig
prompts = [
"Hello, how are you today?",
"Hi, what is your name?",
"Tell me a very long story.",
"what is your favourite book?",
]
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=1)
ktc = KVTransferConfig(
kv_connector="MooncakeConnectorV1",
kv_role="kv_producer",
kv_port="30000",
engine_id="0",
kv_connector_extra_config={"prefill": {"dp_size": 1, "tp_size": 1}, "decode": {"dp_size": 1, "tp_size": 1}},
)
llm = LLM(
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
kv_transfer_config=ktc,
max_model_len=2000,
gpu_memory_utilization=0.8,
tensor_parallel_size=1,
)
llm.generate(prompts, sampling_params)
print("Prefill node is finished.")
prefill_done.set()
try:
while not process_close.is_set():
time.sleep(1)
except KeyboardInterrupt:
print("Script stopped by user.")
finally:
print("Cleanup prefill resources")
del llm
clean_up()
def run_decode(prefill_done):
os.environ["ASCEND_RT_VISIBLE_DEVICES"] = "1"
from vllm import LLM, SamplingParams
from vllm.config import KVTransferConfig
prompts = [
"Hello, how are you today?",
"Hi, what is your name?",
"Tell me a very long story.",
"what is your favourite book?",
]
sampling_params = SamplingParams(temperature=0, top_p=0.95)
ktc = KVTransferConfig(
kv_connector="MooncakeConnectorV1",
kv_role="kv_consumer",
kv_port="30100",
engine_id="1",
kv_connector_extra_config={"prefill": {"dp_size": 1, "tp_size": 1}, "decode": {"dp_size": 1, "tp_size": 1}},
)
llm = LLM(
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
kv_transfer_config=ktc,
max_model_len=2000,
gpu_memory_utilization=0.8,
tensor_parallel_size=1,
)
print("Waiting for prefill node to finish...")
prefill_done.wait()
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}")
del llm
clean_up()
if __name__ == "__main__":
mp.get_context("spawn")
prefill_done = Event()
process_close = Event()
prefill_process = Process(
target=run_prefill,
args=(
prefill_done,
process_close,
),
)
decode_process = Process(target=run_decode, args=(prefill_done,))
prefill_process.start()
decode_process.start()
decode_process.join()
process_close.set()
prefill_process.join()
prefill_process.terminate()
print("All process done!")