/** * 노드 기반 데이터 플로우 API */ import { Router, Request, Response } from "express"; import { query, queryOne } from "../../database/db"; import { logger } from "../../utils/logger"; import { NodeFlowExecutionService } from "../../services/nodeFlowExecutionService"; import { AuthenticatedRequest } from "../../types/auth"; import { authenticateToken } from "../../middleware/authMiddleware"; import { auditLogService, getClientIp } from "../../services/auditLogService"; const router = Router(); /** * 플로우 목록 조회 */ router.get("/", async (req: AuthenticatedRequest, res: Response) => { try { const userCompanyCode = req.user?.companyCode; let sqlQuery = ` SELECT flow_id as "flowId", flow_name as "flowName", flow_description as "flowDescription", company_code as "companyCode", created_at as "createdAt", updated_at as "updatedAt" FROM node_flows `; const params: any[] = []; // 슈퍼 관리자가 아니면 회사별 필터링 if (userCompanyCode && userCompanyCode !== "*") { sqlQuery += ` WHERE company_code = $1`; params.push(userCompanyCode); } sqlQuery += ` ORDER BY updated_at DESC`; const flows = await query(sqlQuery, params); return res.json({ success: true, data: flows, }); } catch (error) { logger.error("플로우 목록 조회 실패:", error); return res.status(500).json({ success: false, message: "플로우 목록을 조회하지 못했습니다.", }); } }); /** * 플로우 상세 조회 */ router.get("/:flowId", async (req: Request, res: Response) => { try { const { flowId } = req.params; const flow = await queryOne( ` SELECT flow_id as "flowId", flow_name as "flowName", flow_description as "flowDescription", flow_data as "flowData", created_at as "createdAt", updated_at as "updatedAt" FROM node_flows WHERE flow_id = $1 `, [flowId] ); if (!flow) { return res.status(404).json({ success: false, message: "플로우를 찾을 수 없습니다.", }); } return res.json({ success: true, data: flow, }); } catch (error) { logger.error("플로우 조회 실패:", error); return res.status(500).json({ success: false, message: "플로우를 조회하지 못했습니다.", }); } }); /** * 플로우 저장 (신규) */ router.post("/", async (req: AuthenticatedRequest, res: Response) => { try { const { flowName, flowDescription, flowData } = req.body; const userCompanyCode = req.user?.companyCode || "*"; if (!flowName || !flowData) { return res.status(400).json({ success: false, message: "플로우 이름과 데이터는 필수입니다.", }); } const result = await queryOne( ` INSERT INTO node_flows (flow_name, flow_description, flow_data, company_code) VALUES ($1, $2, $3, $4) RETURNING flow_id as "flowId" `, [flowName, flowDescription || "", flowData, userCompanyCode] ); logger.info( `플로우 저장 성공: ${result.flowId} (회사: ${userCompanyCode})` ); auditLogService.log({ companyCode: userCompanyCode, userId: req.user?.userId || "", userName: req.user?.userName, action: "CREATE", resourceType: "NODE_FLOW", resourceId: String(result.flowId), resourceName: flowName, tableName: "node_flows", summary: `노드 플로우 "${flowName}" 생성`, changes: { after: { flowName, flowDescription } }, ipAddress: getClientIp(req as any), requestPath: req.originalUrl, }); return res.json({ success: true, message: "플로우가 저장되었습니다.", data: { flowId: result.flowId, }, }); } catch (error) { logger.error("플로우 저장 실패:", error); return res.status(500).json({ success: false, message: "플로우를 저장하지 못했습니다.", }); } }); /** * 플로우 수정 */ router.put("/", async (req: AuthenticatedRequest, res: Response) => { try { const { flowId, flowName, flowDescription, flowData } = req.body; if (!flowId || !flowName || !flowData) { return res.status(400).json({ success: false, message: "플로우 ID, 이름, 데이터는 필수입니다.", }); } const oldFlow = await queryOne( `SELECT flow_name, flow_description FROM node_flows WHERE flow_id = $1`, [flowId] ); await query( ` UPDATE node_flows SET flow_name = $1, flow_description = $2, flow_data = $3, updated_at = NOW() WHERE flow_id = $4 `, [flowName, flowDescription || "", flowData, flowId] ); logger.info(`플로우 수정 성공: ${flowId}`); const userCompanyCode = req.user?.companyCode || "*"; auditLogService.log({ companyCode: userCompanyCode, userId: req.user?.userId || "", userName: req.user?.userName, action: "UPDATE", resourceType: "NODE_FLOW", resourceId: String(flowId), resourceName: flowName, tableName: "node_flows", summary: `노드 플로우 "${flowName}" 수정`, changes: { before: oldFlow ? { flowName: (oldFlow as any).flow_name, flowDescription: (oldFlow as any).flow_description } : undefined, after: { flowName, flowDescription }, }, ipAddress: getClientIp(req as any), requestPath: req.originalUrl, }); return res.json({ success: true, message: "플로우가 수정되었습니다.", data: { flowId, }, }); } catch (error) { logger.error("플로우 수정 실패:", error); return res.status(500).json({ success: false, message: "플로우를 수정하지 못했습니다.", }); } }); /** * 플로우 삭제 */ router.delete("/:flowId", async (req: AuthenticatedRequest, res: Response) => { try { const { flowId } = req.params; const oldFlow = await queryOne( `SELECT flow_name, flow_description, company_code FROM node_flows WHERE flow_id = $1`, [flowId] ); await query( ` DELETE FROM node_flows WHERE flow_id = $1 `, [flowId] ); logger.info(`플로우 삭제 성공: ${flowId}`); const userCompanyCode = req.user?.companyCode || "*"; const flowName = (oldFlow as any)?.flow_name || `ID:${flowId}`; auditLogService.log({ companyCode: userCompanyCode, userId: req.user?.userId || "", userName: req.user?.userName, action: "DELETE", resourceType: "NODE_FLOW", resourceId: String(flowId), resourceName: flowName, tableName: "node_flows", summary: `노드 플로우 "${flowName}" 삭제`, changes: { before: oldFlow ? { flowName: (oldFlow as any).flow_name, flowDescription: (oldFlow as any).flow_description } : undefined, }, ipAddress: getClientIp(req as any), requestPath: req.originalUrl, }); return res.json({ success: true, message: "플로우가 삭제되었습니다.", }); } catch (error) { logger.error("플로우 삭제 실패:", error); return res.status(500).json({ success: false, message: "플로우를 삭제하지 못했습니다.", }); } }); /** * 플로우 소스 테이블 조회 * GET /api/dataflow/node-flows/:flowId/source-table * 플로우의 첫 번째 소스 노드(tableSource, externalDBSource)에서 테이블명 추출 */ router.get("/:flowId/source-table", async (req: Request, res: Response) => { try { const { flowId } = req.params; const flow = await queryOne<{ flow_data: any }>( `SELECT flow_data FROM node_flows WHERE flow_id = $1`, [flowId] ); if (!flow) { return res.status(404).json({ success: false, message: "플로우를 찾을 수 없습니다.", }); } const flowData = typeof flow.flow_data === "string" ? JSON.parse(flow.flow_data) : flow.flow_data; const nodes = flowData.nodes || []; // 소스 노드 찾기 (tableSource, externalDBSource 타입) const sourceNode = nodes.find( (node: any) => node.type === "tableSource" || node.type === "externalDBSource" ); if (!sourceNode || !sourceNode.data?.tableName) { return res.json({ success: true, data: { sourceTable: null, sourceNodeType: null, message: "소스 노드가 없거나 테이블명이 설정되지 않았습니다.", }, }); } logger.info( `플로우 소스 테이블 조회: flowId=${flowId}, table=${sourceNode.data.tableName}` ); return res.json({ success: true, data: { sourceTable: sourceNode.data.tableName, sourceNodeType: sourceNode.type, sourceNodeId: sourceNode.id, displayName: sourceNode.data.displayName, }, }); } catch (error) { logger.error("플로우 소스 테이블 조회 실패:", error); return res.status(500).json({ success: false, message: "플로우 소스 테이블을 조회하지 못했습니다.", }); } }); /** * 플로우 실행 * POST /api/dataflow/node-flows/:flowId/execute */ router.post( "/:flowId/execute", authenticateToken, async (req: AuthenticatedRequest, res: Response) => { try { const { flowId } = req.params; const contextData = req.body; logger.info(`🚀 플로우 실행 요청: flowId=${flowId}`, { contextDataKeys: Object.keys(contextData), userId: req.user?.userId, companyCode: req.user?.companyCode, }); // 🔍 디버깅: req.user 전체 확인 logger.info(`🔍 req.user 전체 정보:`, { user: req.user, hasUser: !!req.user, }); // 사용자 정보를 contextData에 추가 const enrichedContextData = { ...contextData, userId: req.user?.userId, userName: req.user?.userName, companyCode: req.user?.companyCode, }; // 🔍 디버깅: enrichedContextData 확인 logger.info(`🔍 enrichedContextData:`, { userId: enrichedContextData.userId, companyCode: enrichedContextData.companyCode, }); // 플로우 실행 const result = await NodeFlowExecutionService.executeFlow( parseInt(flowId, 10), enrichedContextData ); return res.json({ success: result.success, message: result.message, data: result, }); } catch (error) { logger.error("플로우 실행 실패:", error); return res.status(500).json({ success: false, message: error instanceof Error ? error.message : "플로우 실행 중 오류가 발생했습니다.", }); } } ); export default router;