use ek_computation::shmq::{RdmaQueue, RdmaQueueError};
use std::io::{self, BufRead, BufReader, Write};
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
const QUEUE_CAPACITY: usize = 10;
#[derive(serde::Serialize, serde::Deserialize)]
struct ConnectionInfo {
endpoint: String,
memory_region: String,
}
fn get_user_input(prompt: &str) -> io::Result<String> {
println!("{}", prompt);
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
Ok(input.trim().to_string())
}
fn main() -> io::Result<()> {
env_logger::init();
println!("🔧 Starting RDMA Worker (Receiver)");
let mut receiver_queue: RdmaQueue<String> = RdmaQueue::new(None, QUEUE_CAPACITY, false)?;
println!(
"✅ Created receiver queue with capacity: {}",
receiver_queue.capacity()
);
let local_endpoint = receiver_queue.endpoint()?;
let local_memory = receiver_queue.memory_region();
println!("📡 Generated local RDMA endpoint and memory region");
let endpoint_json = serde_json::to_string(&local_endpoint)
.map_err(|e| io::Error::other(format!("Failed to serialize endpoint: {}", e)))?;
let memory_json = serde_json::to_string(&local_memory)
.map_err(|e| io::Error::other(format!("Failed to serialize memory region: {}", e)))?;
let worker_info = ConnectionInfo {
endpoint: endpoint_json,
memory_region: memory_json,
};
println!("\n🌐 TCP Connection Setup");
let controller_ip = get_user_input("📝 Enter controller IP address (default: 127.0.0.1):")?;
let controller_ip = if controller_ip.is_empty() {
"127.0.0.1".to_string()
} else {
controller_ip
};
let controller_port = get_user_input("📝 Enter controller port (default: 8888):")?;
let controller_port: u16 = if controller_port.is_empty() {
8888
} else {
controller_port.parse().map_err(|e| {
io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e))
})?
};
println!(
"\n🔗 Connecting to controller at {}:{}...",
controller_ip, controller_port
);
let stream = TcpStream::connect(format!("{}:{}", controller_ip, controller_port))?;
println!("✅ Connected to controller");
println!("\n🔄 Exchanging RDMA connection information...");
let mut reader = BufReader::new(stream);
let mut controller_info_line = String::new();
reader.read_line(&mut controller_info_line)?;
let controller_info: ConnectionInfo = serde_json::from_str(controller_info_line.trim())
.map_err(|e| io::Error::other(format!("Failed to parse controller info: {}", e)))?;
println!("📥 Received controller RDMA info");
let mut stream = reader.into_inner();
let worker_info_json = serde_json::to_string(&worker_info)
.map_err(|e| io::Error::other(format!("Failed to serialize worker info: {}", e)))?;
writeln!(stream, "{}", worker_info_json)?;
stream.flush()?;
println!("📤 Sent worker RDMA info to controller");
let controller_endpoint: ibverbs::QueuePairEndpoint =
serde_json::from_str(&controller_info.endpoint)
.map_err(|e| io::Error::other(format!("Failed to parse controller endpoint: {}", e)))?;
let controller_memory: ibverbs::RemoteMemoryRegion =
serde_json::from_str(&controller_info.memory_region).map_err(|e| {
io::Error::other(format!("Failed to parse controller memory region: {}", e))
})?;
println!("✅ Successfully parsed controller RDMA connection info");
println!("\n🔗 Establishing RDMA connection...");
match receiver_queue.connect(controller_endpoint, controller_memory) {
Ok(()) => {
println!("✅ RDMA connection established successfully!");
}
Err(e) => {
println!("❌ Failed to establish RDMA connection: {}", e);
return Err(e);
}
}
let mut reader = BufReader::new(stream);
let mut ready_line = String::new();
reader.read_line(&mut ready_line)?;
if ready_line.trim() != "RDMA_READY" {
return Err(io::Error::other("Controller not ready"));
}
let mut stream = reader.into_inner();
writeln!(stream, "RDMA_READY")?;
stream.flush()?;
println!("✅ Both sides ready for RDMA communication");
drop(stream);
println!("🔌 TCP connection closed, switching to RDMA");
println!("\n📥 Starting to receive messages via RDMA...");
let mut received_count = 0;
let max_attempts = 100;
for attempt in 1..=max_attempts {
match receiver_queue.recv() {
Ok(message) => {
received_count += 1;
println!("✅ Received message {}: '{}'", received_count, message);
if received_count >= 4 {
println!("🎉 Received expected number of messages!");
break;
}
}
Err(RdmaQueueError::Empty) => {
if attempt % 20 == 0 {
println!("⏳ Waiting for RDMA messages... (attempt {})", attempt);
}
thread::sleep(Duration::from_millis(100));
}
Err(e) => {
println!("❌ Error receiving message (attempt {}): {}", attempt, e);
thread::sleep(Duration::from_millis(100));
}
}
}
if received_count == 0 {
println!("⚠️ No messages received. Possible issues:");
println!(" 1. Controller not sending messages");
println!(" 2. RDMA devices not properly configured");
println!(" 3. RDMA connection establishment failed");
println!(" 4. Network connectivity issues");
} else {
println!(
"\n🎉 Worker successfully received {} messages via RDMA!",
received_count
);
}
println!("\n⏳ Keeping connection alive for 2 seconds...");
thread::sleep(Duration::from_secs(2));
receiver_queue.disconnect();
println!("🔄 Worker shutting down");
Ok(())
}