0e5de4a8创建于 2025年11月16日历史提交
import 'reflect-metadata';
import cluster, { type Worker } from 'cluster';
import compression from 'compression';
import cors from 'cors';
import express from 'express';
import helmet from 'helmet';
import { Container } from 'typedi';
import config from './config';
import Logger from './loaders/logger';
import { monitoringMiddleware } from './middlewares/monitoring';
import { type GrpcServerService } from './services/grpc';
import { type HttpServerService } from './services/http';

interface WorkerMetadata {
  id: number;
  pid: number;
  serviceType: string;
  startTime: Date;
}

class Application {
  private app: express.Application;
  private httpServerService?: HttpServerService;
  private grpcServerService?: GrpcServerService;
  private isShuttingDown = false;
  private workerMetadataMap = new Map<number, WorkerMetadata>();
  private httpWorker?: Worker;

  constructor() {
    this.app = express();
    // 创建一个全局中间件,删除查询参数中的t
    this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
      if (req.query.t) {
        delete req.query.t;
      }
      next();
    });
  }

  async start() {
    try {
      if (cluster.isPrimary) {
        await this.initializeDatabase();
      }
      if (cluster.isPrimary) {
        this.startMasterProcess();
      } else {
        await this.startWorkerProcess();
      }
    } catch (error) {
      Logger.error('Failed to start application:', error);
      process.exit(1);
    }
  }

  private startMasterProcess() {
    // Fork gRPC worker first and wait for it to be ready
    const grpcWorker = this.forkWorker('grpc');
    
    // Wait for gRPC worker to signal it's ready before starting HTTP worker
    this.waitForWorkerReady(grpcWorker, 30000)
      .then(() => {
        Logger.info('✌️ gRPC worker is ready, starting HTTP worker');
        this.httpWorker = this.forkWorker('http');
      })
      .catch((error) => {
        Logger.error('✌️ Failed to wait for gRPC worker:', error);
        process.exit(1);
      });

    cluster.on('exit', (worker, code, signal) => {
      const metadata = this.workerMetadataMap.get(worker.id);
      if (metadata) {
        if (!this.isShuttingDown) {
          Logger.error(
            `✌️ ${metadata.serviceType} worker ${worker.process.pid} died (${signal || code
            }). Restarting...`,
          );
          // If gRPC worker died, restart it and wait for it to be ready
          if (metadata.serviceType === 'grpc') {
            const newGrpcWorker = this.forkWorker('grpc');
            this.waitForWorkerReady(newGrpcWorker, 30000)
              .then(() => {
                Logger.info('✌️ gRPC worker restarted and ready');
                // Re-register cron jobs by notifying the HTTP worker
                if (this.httpWorker) {
                  try {
                    this.httpWorker.send('reregister-crons');
                    Logger.info('✌️ Sent reregister-crons message to HTTP worker');
                  } catch (error) {
                    Logger.error('✌️ Failed to send reregister-crons message:', error);
                  }
                }
              })
              .catch((error) => {
                Logger.error('✌️ Failed to restart gRPC worker:', error);
                process.exit(1);
              });
          } else {
            // For HTTP worker, just restart it
            const newWorker = this.forkWorker(metadata.serviceType);
            this.httpWorker = newWorker;
            Logger.info(`✌️ Restarted ${metadata.serviceType} worker (PID: ${newWorker.process.pid})`);
          }
        }

        this.workerMetadataMap.delete(worker.id);
      }
    });

    this.setupMasterShutdown();
  }

  private waitForWorkerReady(worker: Worker, timeoutMs: number): Promise<void> {
    return new Promise<void>((resolve, reject) => {
      const messageHandler = (msg: any) => {
        if (msg === 'ready') {
          worker.removeListener('message', messageHandler);
          clearTimeout(timeoutId);
          resolve();
        }
      };
      worker.on('message', messageHandler);
      
      // Timeout after specified milliseconds
      const timeoutId = setTimeout(() => {
        worker.removeListener('message', messageHandler);
        reject(new Error(`Worker failed to start within ${timeoutMs / 1000} seconds`));
      }, timeoutMs);
    });
  }

  private forkWorker(serviceType: string): Worker {
    const worker = cluster.fork({ SERVICE_TYPE: serviceType });

    this.workerMetadataMap.set(worker.id, {
      id: worker.id,
      pid: worker.process.pid!,
      serviceType,
      startTime: new Date(),
    });

    return worker;
  }

  private async initializeDatabase() {
    const dbLoader = await import('./loaders/db');
    await dbLoader.default();
  }

  private setupMiddlewares() {
    this.app.use(helmet({
      contentSecurityPolicy: false,
    }));
    this.app.use(cors(config.cors));
    this.app.use(compression());
    this.app.use(monitoringMiddleware);
  }

  private setupMasterShutdown() {
    const shutdown = async () => {
      if (this.isShuttingDown) return;
      this.isShuttingDown = true;

      const workers = Object.values(cluster.workers || {});
      const workerPromises: Promise<void>[] = [];

      workers.forEach((worker) => {
        if (worker) {
          const exitPromise = new Promise<void>((resolve) => {
            worker.once('exit', () => {
              Logger.info(`✌️ Worker ${worker.process.pid} exited`);
              resolve();
            });

            try {
              worker.send('shutdown');
            } catch (error) {
              Logger.warn(
                `✌️ Failed to send shutdown to worker ${worker.process.pid}:`,
                error,
              );
            }
          });

          workerPromises.push(exitPromise);
        }
      });

      try {
        await Promise.race([
          Promise.all(workerPromises),
          new Promise<void>((resolve) => {
            setTimeout(() => {
              Logger.warn('✌️ Worker shutdown timeout reached');
              resolve();
            }, 10000);
          }),
        ]);
        process.exit(0);
      } catch (error) {
        Logger.error('✌️ Error during worker shutdown:', error);
        process.exit(1);
      }
    };

    process.on('SIGTERM', shutdown);
    process.on('SIGINT', shutdown);
  }

  private async startWorkerProcess() {
    const serviceType = process.env.SERVICE_TYPE;
    if (!serviceType || !['http', 'grpc'].includes(serviceType)) {
      Logger.error('✌️ Invalid SERVICE_TYPE:', serviceType);
      process.exit(1);
    }

    Logger.info(`✌️ ${serviceType} worker started (PID: ${process.pid})`);

    try {
      if (serviceType === 'http') {
        await this.startHttpService();
      } else {
        await this.startGrpcService();
      }

      process.send?.('ready');
    } catch (error) {
      Logger.error(`✌️ ${serviceType} worker failed:`, error);
      process.exit(1);
    }
  }

  private async startHttpService() {
    this.setupMiddlewares();

    const { HttpServerService } = await import('./services/http');
    this.httpServerService = Container.get(HttpServerService);

    const appLoader = await import('./loaders/app');
    await appLoader.default({ app: this.app });

    const server = await this.httpServerService.initialize(
      this.app,
      config.port,
    );

    const serverLoader = await import('./loaders/server');
    await (serverLoader.default as any)({ server });
    this.setupWorkerShutdown('http');
  }

  private async startGrpcService() {
    const { GrpcServerService } = await import('./services/grpc');
    this.grpcServerService = Container.get(GrpcServerService);

    await this.grpcServerService.initialize();
    this.setupWorkerShutdown('grpc');
  }

  private setupWorkerShutdown(serviceType: string) {
    process.on('message', async (msg) => {
      if (msg === 'shutdown') {
        this.gracefulShutdown(serviceType);
      } else if (msg === 'reregister-crons' && serviceType === 'http') {
        // Re-register cron jobs when gRPC worker restarts
        try {
          Logger.info('✌️ Received reregister-crons message, re-registering cron jobs...');
          const CronService = (await import('./services/cron')).default;
          const cronService = Container.get(CronService);
          await cronService.autosave_crontab();
          Logger.info('✌️ Cron jobs re-registered successfully');
        } catch (error) {
          Logger.error('✌️ Failed to re-register cron jobs:', error);
        }
      }
    });

    const shutdown = () => this.gracefulShutdown(serviceType);
    process.on('SIGTERM', shutdown);
    process.on('SIGINT', shutdown);
  }

  private async gracefulShutdown(serviceType: string) {
    if (this.isShuttingDown) return;
    this.isShuttingDown = true;

    try {
      if (serviceType === 'http') {
        await this.httpServerService?.shutdown();
      } else {
        await this.grpcServerService?.shutdown();
      }
      process.exit(0);
    } catch (error) {
      Logger.error(`✌️ [${serviceType}] Error during shutdown:`, error);
      process.exit(1);
    }
  }
}

const app = new Application();
app.start().catch((error) => {
  Logger.error('🙅‍♀️ Application failed to start:', error);
  process.exit(1);
});