// @ts-nocheck /** * 플로우 실행 서비스 * 단계별 데이터 카운트 및 리스트 조회 */ import db from "../database/db"; import { FlowStepDataCount, FlowStepDataList } from "../types/flow"; import { FlowDefinitionService } from "./flowDefinitionService"; import { FlowStepService } from "./flowStepService"; import { FlowConditionParser } from "./flowConditionParser"; import { executeExternalQuery } from "./externalDbHelper"; import { getPlaceholder, buildPaginationClause } from "./dbQueryBuilder"; export class FlowExecutionService { private flowDefinitionService: FlowDefinitionService; private flowStepService: FlowStepService; constructor() { this.flowDefinitionService = new FlowDefinitionService(); this.flowStepService = new FlowStepService(); } /** * 특정 플로우 단계에 해당하는 데이터 카운트 */ async getStepDataCount(flowId: number, stepId: number): Promise { // 1. 플로우 정의 조회 const flowDef = await this.flowDefinitionService.findById(flowId); if (!flowDef) { throw new Error(`Flow definition not found: ${flowId}`); } console.log("🔍 [getStepDataCount] Flow Definition:", { flowId, dbSourceType: flowDef.dbSourceType, dbConnectionId: flowDef.dbConnectionId, tableName: flowDef.tableName, }); // 2. 플로우 단계 조회 const step = await this.flowStepService.findById(stepId); if (!step) { throw new Error(`Flow step not found: ${stepId}`); } if (step.flowDefinitionId !== flowId) { throw new Error(`Step ${stepId} does not belong to flow ${flowId}`); } // 3. 테이블명 결정: 단계에 지정된 테이블이 있으면 사용, 없으면 플로우의 기본 테이블 사용 const tableName = step.tableName || flowDef.tableName; // 4. 조건 JSON을 SQL WHERE절로 변환 const { where, params } = FlowConditionParser.toSqlWhere( step.conditionJson ); // 5. 카운트 쿼리 실행 (내부 또는 외부 DB) const query = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${where}`; console.log("🔍 [getStepDataCount] Query Info:", { tableName, query, params, isExternal: flowDef.dbSourceType === "external", connectionId: flowDef.dbConnectionId, }); let result: any; if (flowDef.dbSourceType === "external" && flowDef.dbConnectionId) { // 외부 DB 조회 console.log( "✅ [getStepDataCount] Using EXTERNAL DB:", flowDef.dbConnectionId ); const externalResult = await executeExternalQuery( flowDef.dbConnectionId, query, params ); console.log("📦 [getStepDataCount] External result:", externalResult); result = externalResult.rows; } else { // 내부 DB 조회 console.log("✅ [getStepDataCount] Using INTERNAL DB"); result = await db.query(query, params); } const count = parseInt(result[0].count || result[0].COUNT); console.log("✅ [getStepDataCount] Final count:", count); return count; } /** * 특정 플로우 단계에 해당하는 데이터 리스트 */ async getStepDataList( flowId: number, stepId: number, page: number = 1, pageSize: number = 20 ): Promise { // 1. 플로우 정의 조회 const flowDef = await this.flowDefinitionService.findById(flowId); if (!flowDef) { throw new Error(`Flow definition not found: ${flowId}`); } // 2. 플로우 단계 조회 const step = await this.flowStepService.findById(stepId); if (!step) { throw new Error(`Flow step not found: ${stepId}`); } if (step.flowDefinitionId !== flowId) { throw new Error(`Step ${stepId} does not belong to flow ${flowId}`); } // 3. 테이블명 결정: 단계에 지정된 테이블이 있으면 사용, 없으면 플로우의 기본 테이블 사용 const tableName = step.tableName || flowDef.tableName; // 4. 조건 JSON을 SQL WHERE절로 변환 const { where, params } = FlowConditionParser.toSqlWhere( step.conditionJson ); const offset = (page - 1) * pageSize; const isExternalDb = flowDef.dbSourceType === "external" && flowDef.dbConnectionId; // 5. 전체 카운트 const countQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${where}`; let countResult: any; let total: number; if (isExternalDb) { const externalCountResult = await executeExternalQuery( flowDef.dbConnectionId!, countQuery, params ); countResult = externalCountResult.rows; total = parseInt(countResult[0].count || countResult[0].COUNT); } else { countResult = await db.query(countQuery, params); total = parseInt(countResult[0].count); } // 6. 데이터 조회 (DB 타입별 페이징 처리) let dataQuery: string; let dataParams: any[]; if (isExternalDb) { // 외부 DB는 id 컬럼으로 정렬 (가정) // DB 타입에 따른 페이징 절은 빌더에서 처리하지 않고 직접 작성 // PostgreSQL, MySQL, MSSQL, Oracle 모두 지원하도록 단순화 dataQuery = ` SELECT * FROM ${tableName} WHERE ${where} ORDER BY id DESC LIMIT ${pageSize} OFFSET ${offset} `; dataParams = params; const externalDataResult = await executeExternalQuery( flowDef.dbConnectionId!, dataQuery, dataParams ); return { records: externalDataResult.rows, total, page, pageSize, }; } else { // 내부 DB (PostgreSQL) // Primary Key 컬럼 찾기 let orderByColumn = ""; try { const pkQuery = ` SELECT a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = $1::regclass AND i.indisprimary LIMIT 1 `; const pkResult = await db.query(pkQuery, [tableName]); if (pkResult.length > 0) { orderByColumn = pkResult[0].attname; } } catch (err) { console.warn(`Could not find primary key for table ${tableName}:`, err); } const orderByClause = orderByColumn ? `ORDER BY ${orderByColumn} DESC` : ""; dataQuery = ` SELECT * FROM ${tableName} WHERE ${where} ${orderByClause} LIMIT $${params.length + 1} OFFSET $${params.length + 2} `; const dataResult = await db.query(dataQuery, [ ...params, pageSize, offset, ]); return { records: dataResult, total, page, pageSize, }; } } /** * 플로우의 모든 단계별 데이터 카운트 */ async getAllStepCounts(flowId: number): Promise { const steps = await this.flowStepService.findByFlowId(flowId); const counts: FlowStepDataCount[] = []; for (const step of steps) { const count = await this.getStepDataCount(flowId, step.id); counts.push({ stepId: step.id, count, }); } return counts; } /** * 특정 레코드의 현재 플로우 상태 조회 */ async getCurrentStatus( flowId: number, recordId: string ): Promise<{ currentStepId: number | null; tableName: string } | null> { const query = ` SELECT current_step_id, table_name FROM flow_data_status WHERE flow_definition_id = $1 AND record_id = $2 `; const result = await db.query(query, [flowId, recordId]); if (result.length === 0) { return null; } return { currentStepId: result[0].current_step_id, tableName: result[0].table_name, }; } }