/** * 확장된 데이터플로우 제어 서비스 * 다중 커넥션 지원 및 외부 DB 연동 기능 포함 */ import { DataflowControlService, ControlAction, ControlCondition, } from "./dataflowControlService"; import { MultiConnectionQueryService } from "./multiConnectionQueryService"; import { logger } from "../utils/logger"; export interface EnhancedControlAction extends ControlAction { // 🆕 커넥션 정보 추가 fromConnection?: { connectionId?: number; connectionName?: string; dbType?: string; }; toConnection?: { connectionId?: number; connectionName?: string; dbType?: string; }; // 🆕 명시적 테이블 정보 fromTable?: string; targetTable: string; // 🆕 UPDATE 액션 관련 필드 updateConditions?: UpdateCondition[]; updateFields?: UpdateFieldMapping[]; // 🆕 DELETE 액션 관련 필드 deleteConditions?: DeleteCondition[]; deleteWhereConditions?: DeleteWhereCondition[]; maxDeleteCount?: number; requireConfirmation?: boolean; dryRunFirst?: boolean; logAllDeletes?: boolean; } export interface UpdateCondition { id: string; fromColumn: string; operator: "=" | "!=" | ">" | "<" | ">=" | "<=" | "LIKE" | "IN" | "NOT IN"; value: string | string[]; logicalOperator?: "AND" | "OR"; } export interface UpdateFieldMapping { id: string; fromColumn: string; toColumn: string; transformFunction?: string; defaultValue?: string; } export interface WhereCondition { id: string; toColumn: string; operator: "=" | "!=" | ">" | "<" | ">=" | "<=" | "LIKE" | "IN" | "NOT IN"; valueSource: "from_column" | "static" | "current_timestamp"; fromColumn?: string; // valueSource가 "from_column"인 경우 staticValue?: string; // valueSource가 "static"인 경우 logicalOperator?: "AND" | "OR"; } export interface DeleteCondition { id: string; fromColumn: string; operator: | "=" | "!=" | ">" | "<" | ">=" | "<=" | "LIKE" | "IN" | "NOT IN" | "EXISTS" | "NOT EXISTS"; value: string | string[]; logicalOperator?: "AND" | "OR"; } export interface DeleteWhereCondition { id: string; toColumn: string; operator: "=" | "!=" | ">" | "<" | ">=" | "<=" | "LIKE" | "IN" | "NOT IN"; valueSource: "from_column" | "static" | "condition_result"; fromColumn?: string; staticValue?: string; logicalOperator?: "AND" | "OR"; } export interface DeleteSafetySettings { maxDeleteCount: number; requireConfirmation: boolean; dryRunFirst: boolean; logAllDeletes: boolean; } export interface ExecutionResult { success: boolean; message: string; executedActions?: any[]; errors?: string[]; warnings?: string[]; } export class EnhancedDataflowControlService extends DataflowControlService { private multiConnectionService: MultiConnectionQueryService; constructor() { super(); this.multiConnectionService = new MultiConnectionQueryService(); } /** * 확장된 데이터플로우 제어 실행 */ async executeDataflowControl( diagramId: number, relationshipId: string, triggerType: "insert" | "update" | "delete", sourceData: Record, tableName: string, // 🆕 추가 매개변수 sourceConnectionId?: number, targetConnectionId?: number ): Promise { try { logger.info( `확장된 데이터플로우 제어 실행 시작: diagram=${diagramId}, trigger=${triggerType}` ); // 기본 실행 결과 const result: ExecutionResult = { success: true, message: "데이터플로우 제어가 성공적으로 실행되었습니다.", executedActions: [], errors: [], warnings: [], }; // 다이어그램 설정 조회 const diagram = await this.getDiagramById(diagramId); if (!diagram) { return { success: false, message: "다이어그램을 찾을 수 없습니다.", errors: [`다이어그램 ID ${diagramId}를 찾을 수 없습니다.`], }; } // 제어 계획 파싱 const plan = this.parsePlan(diagram.plan); if (!plan.actions || plan.actions.length === 0) { return { success: true, message: "실행할 액션이 없습니다.", executedActions: [], }; } // 각 액션 실행 for (const action of plan.actions) { try { const enhancedAction = action as EnhancedControlAction; let actionResult: any; switch (enhancedAction.actionType) { case "insert": actionResult = await this.executeMultiConnectionInsert( enhancedAction, sourceData, sourceConnectionId, targetConnectionId ); break; case "update": actionResult = await this.executeMultiConnectionUpdate( enhancedAction, sourceData, sourceConnectionId, targetConnectionId ); break; case "delete": actionResult = await this.executeMultiConnectionDelete( enhancedAction, sourceData, sourceConnectionId, targetConnectionId ); break; default: throw new Error( `지원하지 않는 액션 타입입니다: ${enhancedAction.actionType}` ); } result.executedActions!.push({ actionId: enhancedAction.id, actionType: enhancedAction.actionType, result: actionResult, }); } catch (actionError) { const errorMessage = `액션 ${action.id} 실행 실패: ${actionError instanceof Error ? actionError.message : actionError}`; logger.error(errorMessage); result.errors!.push(errorMessage); } } // 실행 결과 판정 if (result.errors!.length > 0) { result.success = false; result.message = `일부 액션 실행에 실패했습니다. 성공: ${result.executedActions!.length}, 실패: ${result.errors!.length}`; } logger.info( `확장된 데이터플로우 제어 실행 완료: success=${result.success}` ); return result; } catch (error) { logger.error(`확장된 데이터플로우 제어 실행 실패: ${error}`); return { success: false, message: "데이터플로우 제어 실행 중 오류가 발생했습니다.", errors: [error instanceof Error ? error.message : String(error)], }; } } /** * 🆕 다중 커넥션 INSERT 실행 */ private async executeMultiConnectionInsert( action: EnhancedControlAction, sourceData: Record, sourceConnectionId?: number, targetConnectionId?: number ): Promise { try { logger.info(`다중 커넥션 INSERT 실행: action=${action.id}`); // 커넥션 ID 결정 const fromConnId = sourceConnectionId || action.fromConnection?.connectionId || 0; const toConnId = targetConnectionId || action.toConnection?.connectionId || 0; // FROM 테이블에서 소스 데이터 조회 (조건이 있는 경우) let fromData = sourceData; if ( action.fromTable && action.conditions && action.conditions.length > 0 ) { const queryConditions = this.buildQueryConditions( action.conditions, sourceData ); const fromResults = await this.multiConnectionService.fetchDataFromConnection( fromConnId, action.fromTable, queryConditions ); if (fromResults.length === 0) { logger.info(`FROM 테이블에서 조건에 맞는 데이터가 없습니다.`); return { inserted: 0, message: "조건에 맞는 소스 데이터가 없습니다.", }; } fromData = fromResults[0]; // 첫 번째 결과 사용 } // 필드 매핑 적용 const mappedData = this.applyFieldMappings( action.fieldMappings, fromData ); // TO 테이블에 데이터 삽입 const insertResult = await this.multiConnectionService.insertDataToConnection( toConnId, action.targetTable, mappedData ); logger.info(`다중 커넥션 INSERT 완료`); return insertResult; } catch (error) { logger.error(`다중 커넥션 INSERT 실패: ${error}`); throw error; } } /** * 🆕 다중 커넥션 UPDATE 실행 */ private async executeMultiConnectionUpdate( action: EnhancedControlAction, sourceData: Record, sourceConnectionId?: number, targetConnectionId?: number ): Promise { try { logger.info(`다중 커넥션 UPDATE 실행: action=${action.id}`); // 커넥션 ID 결정 const fromConnId = sourceConnectionId || action.fromConnection?.connectionId || 0; const toConnId = targetConnectionId || action.toConnection?.connectionId || 0; // UPDATE 조건 확인 if (!action.updateConditions || action.updateConditions.length === 0) { throw new Error("UPDATE 작업에는 업데이트 조건이 필요합니다."); } // FROM 테이블에서 업데이트 조건 확인 const updateConditions = this.buildUpdateConditions( action.updateConditions, sourceData ); const fromResults = await this.multiConnectionService.fetchDataFromConnection( fromConnId, action.fromTable || action.targetTable, updateConditions ); if (fromResults.length === 0) { logger.info(`업데이트 조건에 맞는 데이터가 없습니다.`); return { updated: 0, message: "업데이트 조건에 맞는 데이터가 없습니다.", }; } // 업데이트 필드 매핑 적용 const updateData = this.applyUpdateFieldMappings( action.updateFields || [], fromResults[0] ); // WHERE 조건 구성 (TO 테이블 대상) const whereConditions = this.buildWhereConditions( action.updateFields || [], fromResults[0] ); // TO 테이블 데이터 업데이트 const updateResult = await this.multiConnectionService.updateDataToConnection( toConnId, action.targetTable, updateData, whereConditions ); logger.info(`다중 커넥션 UPDATE 완료`); return updateResult; } catch (error) { logger.error(`다중 커넥션 UPDATE 실패: ${error}`); throw error; } } /** * 🆕 다중 커넥션 DELETE 실행 */ private async executeMultiConnectionDelete( action: EnhancedControlAction, sourceData: Record, sourceConnectionId?: number, targetConnectionId?: number ): Promise { try { logger.info(`다중 커넥션 DELETE 실행: action=${action.id}`); // 커넥션 ID 결정 const fromConnId = sourceConnectionId || action.fromConnection?.connectionId || 0; const toConnId = targetConnectionId || action.toConnection?.connectionId || 0; // DELETE 조건 확인 if (!action.deleteConditions || action.deleteConditions.length === 0) { throw new Error("DELETE 작업에는 삭제 조건이 필요합니다."); } // FROM 테이블에서 삭제 트리거 조건 확인 const deleteConditions = this.buildDeleteConditions( action.deleteConditions, sourceData ); const fromResults = await this.multiConnectionService.fetchDataFromConnection( fromConnId, action.fromTable || action.targetTable, deleteConditions ); if (fromResults.length === 0) { logger.info(`삭제 조건에 맞는 데이터가 없습니다.`); return { deleted: 0, message: "삭제 조건에 맞는 데이터가 없습니다." }; } // WHERE 조건 구성 (TO 테이블 대상) const whereConditions = this.buildDeleteWhereConditions( action.deleteWhereConditions || [], fromResults[0] ); if (!whereConditions || Object.keys(whereConditions).length === 0) { throw new Error("DELETE 작업에는 WHERE 조건이 필수입니다."); } // 안전장치 적용 const maxDeleteCount = action.maxDeleteCount || 100; // Dry Run 실행 (선택사항) if (action.dryRunFirst) { const countResult = await this.multiConnectionService.fetchDataFromConnection( toConnId, action.targetTable, whereConditions ); logger.info(`삭제 예상 개수: ${countResult.length}건`); if (countResult.length > maxDeleteCount) { throw new Error( `삭제 대상이 ${countResult.length}건으로 최대 허용 개수(${maxDeleteCount})를 초과합니다.` ); } } // TO 테이블에서 데이터 삭제 const deleteResult = await this.multiConnectionService.deleteDataFromConnection( toConnId, action.targetTable, whereConditions, maxDeleteCount ); // 삭제 로그 기록 (선택사항) if (action.logAllDeletes) { logger.info( `삭제 실행 로그: ${JSON.stringify({ action: action.id, deletedCount: deleteResult.length, conditions: whereConditions, })}` ); } logger.info(`다중 커넥션 DELETE 완료`); return deleteResult; } catch (error) { logger.error(`다중 커넥션 DELETE 실패: ${error}`); throw error; } } /** * 쿼리 조건 구성 */ private buildQueryConditions( conditions: ControlCondition[], sourceData: Record ): Record { const queryConditions: Record = {}; conditions.forEach((condition) => { if (condition.type === "condition" && condition.field) { let value = condition.value; // 소스 데이터에서 값 참조 if ( typeof value === "string" && value.startsWith("${") && value.endsWith("}") ) { const fieldName = value.slice(2, -1); value = sourceData[fieldName]; } queryConditions[condition.field] = value; } }); return queryConditions; } /** * UPDATE 조건 구성 */ private buildUpdateConditions( updateConditions: UpdateCondition[], sourceData: Record ): Record { const conditions: Record = {}; updateConditions.forEach((condition) => { let value = condition.value; // 소스 데이터에서 값 참조 if ( typeof value === "string" && value.startsWith("${") && value.endsWith("}") ) { const fieldName = value.slice(2, -1); value = sourceData[fieldName]; } conditions[condition.fromColumn] = value; }); return conditions; } /** * UPDATE 필드 매핑 적용 */ private applyUpdateFieldMappings( updateFields: UpdateFieldMapping[], fromData: Record ): Record { const updateData: Record = {}; updateFields.forEach((mapping) => { let value = fromData[mapping.fromColumn]; // 기본값 사용 if (value === undefined || value === null) { value = mapping.defaultValue; } // 변환 함수 적용 (추후 구현 가능) if (mapping.transformFunction) { // TODO: 변환 함수 로직 구현 } updateData[mapping.toColumn] = value; }); return updateData; } /** * WHERE 조건 구성 (UPDATE용) */ private buildWhereConditions( updateFields: UpdateFieldMapping[], fromData: Record ): Record { const whereConditions: Record = {}; // 기본적으로 ID 필드로 WHERE 조건 구성 if (fromData.id) { whereConditions.id = fromData.id; } return whereConditions; } /** * DELETE 조건 구성 */ private buildDeleteConditions( deleteConditions: DeleteCondition[], sourceData: Record ): Record { const conditions: Record = {}; deleteConditions.forEach((condition) => { let value = condition.value; // 소스 데이터에서 값 참조 if ( typeof value === "string" && value.startsWith("${") && value.endsWith("}") ) { const fieldName = value.slice(2, -1); value = sourceData[fieldName]; } conditions[condition.fromColumn] = value; }); return conditions; } /** * DELETE WHERE 조건 구성 */ private buildDeleteWhereConditions( whereConditions: DeleteWhereCondition[], fromData: Record ): Record { const conditions: Record = {}; whereConditions.forEach((condition) => { let value: any; switch (condition.valueSource) { case "from_column": if (condition.fromColumn) { value = fromData[condition.fromColumn]; } break; case "static": value = condition.staticValue; break; case "condition_result": // 조건 결과를 사용 (추후 구현) break; } if (value !== undefined && value !== null) { conditions[condition.toColumn] = value; } }); return conditions; } /** * 필드 매핑 적용 */ private applyFieldMappings( fieldMappings: any[], sourceData: Record ): Record { const mappedData: Record = {}; fieldMappings.forEach((mapping) => { let value: any; if (mapping.sourceField) { value = sourceData[mapping.sourceField]; } else if (mapping.defaultValue !== undefined) { value = mapping.defaultValue; } if (value !== undefined) { mappedData[mapping.targetField] = value; } }); return mappedData; } /** * 다이어그램 조회 (부모 클래스에서 가져오기) */ private async getDiagramById(diagramId: number): Promise { // 부모 클래스의 메서드 호출 또는 직접 구현 // 임시로 간단한 구현 return { id: diagramId, plan: "{}" }; } /** * 계획 파싱 (부모 클래스에서 가져오기) */ private parsePlan(planJson: string): any { try { return JSON.parse(planJson); } catch (error) { logger.error(`계획 파싱 실패: ${error}`); return { actions: [] }; } } }