436 lines
12 KiB
TypeScript
436 lines
12 KiB
TypeScript
/**
|
|
* 🔥 성능 최적화: 백그라운드 작업 큐 시스템
|
|
*
|
|
* 제어관리 작업을 백그라운드에서 처리하여
|
|
* 사용자에게 즉시 응답을 제공합니다.
|
|
*/
|
|
|
|
import { ButtonActionType, ButtonTypeConfig, DataflowExecutionResult } from "@/types/screen";
|
|
import { apiClient } from "@/lib/api/client";
|
|
|
|
export type JobPriority = "high" | "normal" | "low";
|
|
export type JobStatus = "pending" | "processing" | "completed" | "failed";
|
|
|
|
export interface DataflowJob {
|
|
id: string;
|
|
buttonId: string;
|
|
actionType: ButtonActionType;
|
|
config: ButtonTypeConfig;
|
|
contextData: Record<string, any>;
|
|
companyCode: string;
|
|
priority: JobPriority;
|
|
status: JobStatus;
|
|
createdAt: number;
|
|
startedAt?: number;
|
|
completedAt?: number;
|
|
result?: DataflowExecutionResult;
|
|
error?: string;
|
|
retryCount: number;
|
|
maxRetries: number;
|
|
}
|
|
|
|
export interface QueueMetrics {
|
|
totalJobs: number;
|
|
pendingJobs: number;
|
|
processingJobs: number;
|
|
completedJobs: number;
|
|
failedJobs: number;
|
|
averageProcessingTime: number; // 평균 처리 시간 (ms)
|
|
throughput: number; // 처리량 (jobs/min)
|
|
}
|
|
|
|
/**
|
|
* 🔥 백그라운드 작업 큐
|
|
*
|
|
* - 우선순위 기반 처리
|
|
* - 배치 처리 (최대 3개 동시)
|
|
* - 자동 재시도
|
|
* - 실시간 상태 추적
|
|
*/
|
|
export class DataflowJobQueue {
|
|
private queue: DataflowJob[] = [];
|
|
private processing = false;
|
|
private readonly maxConcurrentJobs = 3;
|
|
private activeJobs = new Map<string, DataflowJob>();
|
|
private completedJobs: DataflowJob[] = [];
|
|
private maxCompletedJobs = 100; // 최대 완료된 작업 보관 개수
|
|
|
|
private metrics: QueueMetrics = {
|
|
totalJobs: 0,
|
|
pendingJobs: 0,
|
|
processingJobs: 0,
|
|
completedJobs: 0,
|
|
failedJobs: 0,
|
|
averageProcessingTime: 0,
|
|
throughput: 0,
|
|
};
|
|
|
|
// 상태 변경 이벤트 리스너
|
|
private statusChangeListeners = new Map<string, (job: DataflowJob) => void>();
|
|
|
|
/**
|
|
* 🔥 작업 큐에 추가 (즉시 반환)
|
|
*/
|
|
enqueue(
|
|
buttonId: string,
|
|
actionType: ButtonActionType,
|
|
config: ButtonTypeConfig,
|
|
contextData: Record<string, any>,
|
|
companyCode: string,
|
|
priority: JobPriority = "normal",
|
|
maxRetries: number = 3,
|
|
): string {
|
|
const jobId = this.generateJobId();
|
|
const now = Date.now();
|
|
|
|
const job: DataflowJob = {
|
|
id: jobId,
|
|
buttonId,
|
|
actionType,
|
|
config,
|
|
contextData,
|
|
companyCode,
|
|
priority,
|
|
status: "pending",
|
|
createdAt: now,
|
|
retryCount: 0,
|
|
maxRetries,
|
|
};
|
|
|
|
// 큐에 추가
|
|
this.queue.push(job);
|
|
this.metrics.totalJobs++;
|
|
this.metrics.pendingJobs++;
|
|
|
|
// 우선순위 정렬
|
|
this.sortQueueByPriority();
|
|
|
|
// 비동기 처리 시작
|
|
setTimeout(() => this.processQueue(), 0);
|
|
|
|
console.log(`📋 Job enqueued: ${jobId} (priority: ${priority})`);
|
|
return jobId;
|
|
}
|
|
|
|
/**
|
|
* 🔥 작업 상태 조회
|
|
*/
|
|
getJobStatus(jobId: string): { status: JobStatus; result?: any; progress?: number } {
|
|
// 활성 작업에서 찾기
|
|
const activeJob = this.activeJobs.get(jobId);
|
|
if (activeJob) {
|
|
return {
|
|
status: activeJob.status,
|
|
result: activeJob.result,
|
|
progress: this.calculateProgress(activeJob),
|
|
};
|
|
}
|
|
|
|
// 완료된 작업에서 찾기
|
|
const completedJob = this.completedJobs.find((job) => job.id === jobId);
|
|
if (completedJob) {
|
|
return {
|
|
status: completedJob.status,
|
|
result: completedJob.result,
|
|
progress: 100,
|
|
};
|
|
}
|
|
|
|
// 대기 중인 작업에서 찾기
|
|
const pendingJob = this.queue.find((job) => job.id === jobId);
|
|
if (pendingJob) {
|
|
const queuePosition = this.queue.indexOf(pendingJob) + 1;
|
|
return {
|
|
status: "pending",
|
|
progress: 0,
|
|
};
|
|
}
|
|
|
|
throw new Error(`Job not found: ${jobId}`);
|
|
}
|
|
|
|
/**
|
|
* 🔥 작업 상태 변경 리스너 등록
|
|
*/
|
|
onStatusChange(jobId: string, callback: (job: DataflowJob) => void): () => void {
|
|
this.statusChangeListeners.set(jobId, callback);
|
|
|
|
// 해제 함수 반환
|
|
return () => {
|
|
this.statusChangeListeners.delete(jobId);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 🔥 큐 처리 (배치 처리)
|
|
*/
|
|
private async processQueue(): Promise<void> {
|
|
if (this.processing || this.queue.length === 0) return;
|
|
if (this.activeJobs.size >= this.maxConcurrentJobs) return;
|
|
|
|
this.processing = true;
|
|
|
|
try {
|
|
// 처리할 수 있는 만큼 작업 선택
|
|
const availableSlots = this.maxConcurrentJobs - this.activeJobs.size;
|
|
const jobsToProcess = this.queue.splice(0, availableSlots);
|
|
|
|
if (jobsToProcess.length > 0) {
|
|
console.log(`🔄 Processing ${jobsToProcess.length} jobs (${this.activeJobs.size} active)`);
|
|
|
|
// 병렬 처리
|
|
const promises = jobsToProcess.map((job) => this.executeJob(job));
|
|
await Promise.allSettled(promises);
|
|
}
|
|
} finally {
|
|
this.processing = false;
|
|
|
|
// 큐에 더 많은 작업이 있으면 계속 처리
|
|
if (this.queue.length > 0 && this.activeJobs.size < this.maxConcurrentJobs) {
|
|
setTimeout(() => this.processQueue(), 10);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 🔥 개별 작업 실행
|
|
*/
|
|
private async executeJob(job: DataflowJob): Promise<void> {
|
|
const startTime = performance.now();
|
|
|
|
// 활성 작업으로 이동
|
|
this.activeJobs.set(job.id, job);
|
|
this.updateJobStatus(job, "processing");
|
|
this.metrics.pendingJobs--;
|
|
this.metrics.processingJobs++;
|
|
|
|
job.startedAt = Date.now();
|
|
|
|
try {
|
|
console.log(`⚡ Starting job: ${job.id}`);
|
|
|
|
// 실제 제어관리 실행
|
|
const result = await this.executeDataflowLogic(job);
|
|
|
|
// 성공 처리
|
|
job.result = result;
|
|
job.completedAt = Date.now();
|
|
this.updateJobStatus(job, "completed");
|
|
|
|
const executionTime = performance.now() - startTime;
|
|
this.updateProcessingTimeMetrics(executionTime);
|
|
|
|
console.log(`✅ Job completed: ${job.id} (${executionTime.toFixed(2)}ms)`);
|
|
} catch (error) {
|
|
console.error(`❌ Job failed: ${job.id}`, error);
|
|
|
|
job.error = error.message || "Unknown error";
|
|
job.retryCount++;
|
|
|
|
// 재시도 로직
|
|
if (job.retryCount < job.maxRetries) {
|
|
console.log(`🔄 Retrying job: ${job.id} (${job.retryCount}/${job.maxRetries})`);
|
|
|
|
// 지수 백오프로 재시도 지연
|
|
const retryDelay = Math.pow(2, job.retryCount) * 1000; // 2^n 초
|
|
setTimeout(() => {
|
|
job.status = "pending";
|
|
this.queue.unshift(job); // 우선순위로 다시 큐에 추가
|
|
this.processQueue();
|
|
}, retryDelay);
|
|
|
|
return;
|
|
}
|
|
|
|
// 최대 재시도 횟수 초과 시 실패 처리
|
|
job.completedAt = Date.now();
|
|
this.updateJobStatus(job, "failed");
|
|
this.metrics.failedJobs++;
|
|
} finally {
|
|
// 활성 작업에서 제거
|
|
this.activeJobs.delete(job.id);
|
|
this.metrics.processingJobs--;
|
|
|
|
// 완료된 작업 목록에 추가
|
|
this.addToCompletedJobs(job);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 🔥 실제 데이터플로우 로직 실행
|
|
*/
|
|
private async executeDataflowLogic(job: DataflowJob): Promise<DataflowExecutionResult> {
|
|
const { config, contextData, companyCode } = job;
|
|
|
|
try {
|
|
const response = await apiClient.post("/api/button-dataflow/execute-background", {
|
|
buttonId: job.buttonId,
|
|
actionType: job.actionType,
|
|
buttonConfig: config,
|
|
contextData,
|
|
companyCode,
|
|
timing: config.dataflowTiming || "after",
|
|
});
|
|
|
|
if (response.data.success) {
|
|
return response.data.data as DataflowExecutionResult;
|
|
} else {
|
|
throw new Error(response.data.message || "Dataflow execution failed");
|
|
}
|
|
} catch (error) {
|
|
if (error.response?.data?.message) {
|
|
throw new Error(error.response.data.message);
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 🔥 작업 상태 업데이트
|
|
*/
|
|
private updateJobStatus(job: DataflowJob, status: JobStatus): void {
|
|
job.status = status;
|
|
|
|
// 리스너에게 알림
|
|
const listener = this.statusChangeListeners.get(job.id);
|
|
if (listener) {
|
|
listener(job);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 우선순위별 큐 정렬
|
|
*/
|
|
private sortQueueByPriority(): void {
|
|
const priorityWeights = { high: 3, normal: 2, low: 1 };
|
|
|
|
this.queue.sort((a, b) => {
|
|
// 우선순위 우선
|
|
const priorityDiff = priorityWeights[b.priority] - priorityWeights[a.priority];
|
|
if (priorityDiff !== 0) return priorityDiff;
|
|
|
|
// 같은 우선순위면 생성 시간 순
|
|
return a.createdAt - b.createdAt;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 작업 ID 생성
|
|
*/
|
|
private generateJobId(): string {
|
|
return `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
|
}
|
|
|
|
/**
|
|
* 진행률 계산 (추정)
|
|
*/
|
|
private calculateProgress(job: DataflowJob): number {
|
|
if (job.status === "completed") return 100;
|
|
if (job.status === "failed") return 0;
|
|
if (job.status === "pending") return 0;
|
|
if (job.status === "processing") {
|
|
// 처리 중인 경우 경과 시간 기반으로 추정
|
|
const elapsed = Date.now() - (job.startedAt || job.createdAt);
|
|
const estimatedDuration = 5000; // 5초로 추정
|
|
return Math.min(90, (elapsed / estimatedDuration) * 100);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* 완료된 작업 목록에 추가
|
|
*/
|
|
private addToCompletedJobs(job: DataflowJob): void {
|
|
this.completedJobs.push(job);
|
|
|
|
if (job.status === "completed") {
|
|
this.metrics.completedJobs++;
|
|
}
|
|
|
|
// 오래된 완료 작업 제거
|
|
if (this.completedJobs.length > this.maxCompletedJobs) {
|
|
this.completedJobs.shift();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 처리 시간 메트릭 업데이트
|
|
*/
|
|
private updateProcessingTimeMetrics(processingTime: number): void {
|
|
if (this.metrics.averageProcessingTime === 0) {
|
|
this.metrics.averageProcessingTime = processingTime;
|
|
} else {
|
|
// 이동 평균
|
|
this.metrics.averageProcessingTime = this.metrics.averageProcessingTime * 0.9 + processingTime * 0.1;
|
|
}
|
|
|
|
// 처리량 계산 (간단한 추정)
|
|
this.metrics.throughput = 60000 / this.metrics.averageProcessingTime; // jobs/min
|
|
}
|
|
|
|
/**
|
|
* 🔥 큐 통계 조회
|
|
*/
|
|
getMetrics(): QueueMetrics {
|
|
this.metrics.pendingJobs = this.queue.length;
|
|
this.metrics.processingJobs = this.activeJobs.size;
|
|
|
|
return { ...this.metrics };
|
|
}
|
|
|
|
/**
|
|
* 🔥 상세 큐 정보 조회 (디버깅용)
|
|
*/
|
|
getQueueInfo(): {
|
|
pending: DataflowJob[];
|
|
active: DataflowJob[];
|
|
recentCompleted: DataflowJob[];
|
|
} {
|
|
return {
|
|
pending: [...this.queue],
|
|
active: Array.from(this.activeJobs.values()),
|
|
recentCompleted: this.completedJobs.slice(-10), // 최근 10개
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 🔥 특정 작업 취소
|
|
*/
|
|
cancelJob(jobId: string): boolean {
|
|
// 대기 중인 작업에서 제거
|
|
const queueIndex = this.queue.findIndex((job) => job.id === jobId);
|
|
if (queueIndex !== -1) {
|
|
this.queue.splice(queueIndex, 1);
|
|
this.metrics.pendingJobs--;
|
|
console.log(`❌ Job cancelled: ${jobId}`);
|
|
return true;
|
|
}
|
|
|
|
// 활성 작업은 취소할 수 없음 (이미 실행 중)
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* 🔥 모든 대기 작업 취소
|
|
*/
|
|
clearQueue(): number {
|
|
const cancelledCount = this.queue.length;
|
|
this.queue = [];
|
|
this.metrics.pendingJobs = 0;
|
|
console.log(`🗑️ Cleared ${cancelledCount} pending jobs`);
|
|
return cancelledCount;
|
|
}
|
|
}
|
|
|
|
// 🔥 전역 싱글톤 인스턴스
|
|
export const dataflowJobQueue = new DataflowJobQueue();
|
|
|
|
// 🔥 개발 모드에서 큐 정보를 전역 객체에 노출
|
|
if (typeof window !== "undefined" && process.env.NODE_ENV === "development") {
|
|
(window as any).dataflowQueue = {
|
|
getMetrics: () => dataflowJobQueue.getMetrics(),
|
|
getQueueInfo: () => dataflowJobQueue.getQueueInfo(),
|
|
clearQueue: () => dataflowJobQueue.clearQueue(),
|
|
};
|
|
}
|