ERP-node/frontend/lib/services/dataflowJobQueue.ts

436 lines
12 KiB
TypeScript
Raw Normal View History

2025-09-18 10:05:50 +09:00
/**
* 🔥 최적화: 백그라운드
*
*
* .
*/
import { ButtonActionType, ButtonTypeConfig, DataflowExecutionResult } from "@/types/screen";
import { apiClient } from "@/lib/api/client";
export type JobPriority = "high" | "normal" | "low";
export type JobStatus = "pending" | "processing" | "completed" | "failed";
export interface DataflowJob {
id: string;
buttonId: string;
actionType: ButtonActionType;
config: ButtonTypeConfig;
contextData: Record<string, any>;
companyCode: string;
priority: JobPriority;
status: JobStatus;
createdAt: number;
startedAt?: number;
completedAt?: number;
result?: DataflowExecutionResult;
error?: string;
retryCount: number;
maxRetries: number;
}
export interface QueueMetrics {
totalJobs: number;
pendingJobs: number;
processingJobs: number;
completedJobs: number;
failedJobs: number;
averageProcessingTime: number; // 평균 처리 시간 (ms)
throughput: number; // 처리량 (jobs/min)
}
/**
* 🔥
*
* -
* - ( 3 )
* -
* -
*/
export class DataflowJobQueue {
private queue: DataflowJob[] = [];
private processing = false;
private readonly maxConcurrentJobs = 3;
private activeJobs = new Map<string, DataflowJob>();
private completedJobs: DataflowJob[] = [];
private maxCompletedJobs = 100; // 최대 완료된 작업 보관 개수
private metrics: QueueMetrics = {
totalJobs: 0,
pendingJobs: 0,
processingJobs: 0,
completedJobs: 0,
failedJobs: 0,
averageProcessingTime: 0,
throughput: 0,
};
// 상태 변경 이벤트 리스너
private statusChangeListeners = new Map<string, (job: DataflowJob) => void>();
/**
* 🔥 ( )
*/
enqueue(
buttonId: string,
actionType: ButtonActionType,
config: ButtonTypeConfig,
contextData: Record<string, any>,
companyCode: string,
priority: JobPriority = "normal",
maxRetries: number = 3,
): string {
const jobId = this.generateJobId();
const now = Date.now();
const job: DataflowJob = {
id: jobId,
buttonId,
actionType,
config,
contextData,
companyCode,
priority,
status: "pending",
createdAt: now,
retryCount: 0,
maxRetries,
};
// 큐에 추가
this.queue.push(job);
this.metrics.totalJobs++;
this.metrics.pendingJobs++;
// 우선순위 정렬
this.sortQueueByPriority();
// 비동기 처리 시작
setTimeout(() => this.processQueue(), 0);
console.log(`📋 Job enqueued: ${jobId} (priority: ${priority})`);
return jobId;
}
/**
* 🔥
*/
getJobStatus(jobId: string): { status: JobStatus; result?: any; progress?: number } {
// 활성 작업에서 찾기
const activeJob = this.activeJobs.get(jobId);
if (activeJob) {
return {
status: activeJob.status,
result: activeJob.result,
progress: this.calculateProgress(activeJob),
};
}
// 완료된 작업에서 찾기
const completedJob = this.completedJobs.find((job) => job.id === jobId);
if (completedJob) {
return {
status: completedJob.status,
result: completedJob.result,
progress: 100,
};
}
// 대기 중인 작업에서 찾기
const pendingJob = this.queue.find((job) => job.id === jobId);
if (pendingJob) {
const queuePosition = this.queue.indexOf(pendingJob) + 1;
return {
status: "pending",
progress: 0,
};
}
throw new Error(`Job not found: ${jobId}`);
}
/**
* 🔥
*/
onStatusChange(jobId: string, callback: (job: DataflowJob) => void): () => void {
this.statusChangeListeners.set(jobId, callback);
// 해제 함수 반환
return () => {
this.statusChangeListeners.delete(jobId);
};
}
/**
* 🔥 ( )
*/
private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
if (this.activeJobs.size >= this.maxConcurrentJobs) return;
this.processing = true;
try {
// 처리할 수 있는 만큼 작업 선택
const availableSlots = this.maxConcurrentJobs - this.activeJobs.size;
const jobsToProcess = this.queue.splice(0, availableSlots);
if (jobsToProcess.length > 0) {
console.log(`🔄 Processing ${jobsToProcess.length} jobs (${this.activeJobs.size} active)`);
// 병렬 처리
const promises = jobsToProcess.map((job) => this.executeJob(job));
await Promise.allSettled(promises);
}
} finally {
this.processing = false;
// 큐에 더 많은 작업이 있으면 계속 처리
if (this.queue.length > 0 && this.activeJobs.size < this.maxConcurrentJobs) {
setTimeout(() => this.processQueue(), 10);
}
}
}
/**
* 🔥
*/
private async executeJob(job: DataflowJob): Promise<void> {
const startTime = performance.now();
// 활성 작업으로 이동
this.activeJobs.set(job.id, job);
this.updateJobStatus(job, "processing");
this.metrics.pendingJobs--;
this.metrics.processingJobs++;
job.startedAt = Date.now();
try {
console.log(`⚡ Starting job: ${job.id}`);
// 실제 제어관리 실행
const result = await this.executeDataflowLogic(job);
// 성공 처리
job.result = result;
job.completedAt = Date.now();
this.updateJobStatus(job, "completed");
const executionTime = performance.now() - startTime;
this.updateProcessingTimeMetrics(executionTime);
console.log(`✅ Job completed: ${job.id} (${executionTime.toFixed(2)}ms)`);
} catch (error) {
console.error(`❌ Job failed: ${job.id}`, error);
job.error = error.message || "Unknown error";
job.retryCount++;
// 재시도 로직
if (job.retryCount < job.maxRetries) {
console.log(`🔄 Retrying job: ${job.id} (${job.retryCount}/${job.maxRetries})`);
// 지수 백오프로 재시도 지연
const retryDelay = Math.pow(2, job.retryCount) * 1000; // 2^n 초
setTimeout(() => {
job.status = "pending";
this.queue.unshift(job); // 우선순위로 다시 큐에 추가
this.processQueue();
}, retryDelay);
return;
}
// 최대 재시도 횟수 초과 시 실패 처리
job.completedAt = Date.now();
this.updateJobStatus(job, "failed");
this.metrics.failedJobs++;
} finally {
// 활성 작업에서 제거
this.activeJobs.delete(job.id);
this.metrics.processingJobs--;
// 완료된 작업 목록에 추가
this.addToCompletedJobs(job);
}
}
/**
* 🔥
*/
private async executeDataflowLogic(job: DataflowJob): Promise<DataflowExecutionResult> {
const { config, contextData, companyCode } = job;
try {
const response = await apiClient.post("/api/button-dataflow/execute-background", {
buttonId: job.buttonId,
actionType: job.actionType,
buttonConfig: config,
contextData,
companyCode,
timing: config.dataflowTiming || "after",
});
if (response.data.success) {
return response.data.data as DataflowExecutionResult;
} else {
throw new Error(response.data.message || "Dataflow execution failed");
}
} catch (error) {
if (error.response?.data?.message) {
throw new Error(error.response.data.message);
}
throw error;
}
}
/**
* 🔥
*/
private updateJobStatus(job: DataflowJob, status: JobStatus): void {
job.status = status;
// 리스너에게 알림
const listener = this.statusChangeListeners.get(job.id);
if (listener) {
listener(job);
}
}
/**
*
*/
private sortQueueByPriority(): void {
const priorityWeights = { high: 3, normal: 2, low: 1 };
this.queue.sort((a, b) => {
// 우선순위 우선
const priorityDiff = priorityWeights[b.priority] - priorityWeights[a.priority];
if (priorityDiff !== 0) return priorityDiff;
// 같은 우선순위면 생성 시간 순
return a.createdAt - b.createdAt;
});
}
/**
* ID
*/
private generateJobId(): string {
return `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* ()
*/
private calculateProgress(job: DataflowJob): number {
if (job.status === "completed") return 100;
if (job.status === "failed") return 0;
if (job.status === "pending") return 0;
if (job.status === "processing") {
// 처리 중인 경우 경과 시간 기반으로 추정
const elapsed = Date.now() - (job.startedAt || job.createdAt);
const estimatedDuration = 5000; // 5초로 추정
return Math.min(90, (elapsed / estimatedDuration) * 100);
}
return 0;
}
/**
*
*/
private addToCompletedJobs(job: DataflowJob): void {
this.completedJobs.push(job);
if (job.status === "completed") {
this.metrics.completedJobs++;
}
// 오래된 완료 작업 제거
if (this.completedJobs.length > this.maxCompletedJobs) {
this.completedJobs.shift();
}
}
/**
*
*/
private updateProcessingTimeMetrics(processingTime: number): void {
if (this.metrics.averageProcessingTime === 0) {
this.metrics.averageProcessingTime = processingTime;
} else {
// 이동 평균
this.metrics.averageProcessingTime = this.metrics.averageProcessingTime * 0.9 + processingTime * 0.1;
}
// 처리량 계산 (간단한 추정)
this.metrics.throughput = 60000 / this.metrics.averageProcessingTime; // jobs/min
}
/**
* 🔥
*/
getMetrics(): QueueMetrics {
this.metrics.pendingJobs = this.queue.length;
this.metrics.processingJobs = this.activeJobs.size;
return { ...this.metrics };
}
/**
* 🔥 ()
*/
getQueueInfo(): {
pending: DataflowJob[];
active: DataflowJob[];
recentCompleted: DataflowJob[];
} {
return {
pending: [...this.queue],
active: Array.from(this.activeJobs.values()),
recentCompleted: this.completedJobs.slice(-10), // 최근 10개
};
}
/**
* 🔥
*/
cancelJob(jobId: string): boolean {
// 대기 중인 작업에서 제거
const queueIndex = this.queue.findIndex((job) => job.id === jobId);
if (queueIndex !== -1) {
this.queue.splice(queueIndex, 1);
this.metrics.pendingJobs--;
console.log(`❌ Job cancelled: ${jobId}`);
return true;
}
// 활성 작업은 취소할 수 없음 (이미 실행 중)
return false;
}
/**
* 🔥
*/
clearQueue(): number {
const cancelledCount = this.queue.length;
this.queue = [];
this.metrics.pendingJobs = 0;
console.log(`🗑️ Cleared ${cancelledCount} pending jobs`);
return cancelledCount;
}
}
// 🔥 전역 싱글톤 인스턴스
export const dataflowJobQueue = new DataflowJobQueue();
// 🔥 개발 모드에서 큐 정보를 전역 객체에 노출
if (typeof window !== "undefined" && process.env.NODE_ENV === "development") {
(window as any).dataflowQueue = {
getMetrics: () => dataflowJobQueue.getMetrics(),
getQueueInfo: () => dataflowJobQueue.getQueueInfo(),
clearQueue: () => dataflowJobQueue.clearQueue(),
};
}