2025-10-27 16:40:59 +09:00
// @ts-nocheck
2025-10-20 10:55:33 +09:00
/ * *
* 플 로 우 실 행 서 비 스
* 단 계 별 데 이 터 카 운 트 및 리 스 트 조 회
* /
import db from "../database/db" ;
import { FlowStepDataCount , FlowStepDataList } from "../types/flow" ;
import { FlowDefinitionService } from "./flowDefinitionService" ;
import { FlowStepService } from "./flowStepService" ;
import { FlowConditionParser } from "./flowConditionParser" ;
2025-10-21 13:19:18 +09:00
import { executeExternalQuery } from "./externalDbHelper" ;
import { getPlaceholder , buildPaginationClause } from "./dbQueryBuilder" ;
2025-10-20 10:55:33 +09:00
export class FlowExecutionService {
private flowDefinitionService : FlowDefinitionService ;
private flowStepService : FlowStepService ;
constructor ( ) {
this . flowDefinitionService = new FlowDefinitionService ( ) ;
this . flowStepService = new FlowStepService ( ) ;
}
/ * *
* 특 정 플 로 우 단 계 에 해 당 하 는 데 이 터 카 운 트
* /
async getStepDataCount ( flowId : number , stepId : number ) : Promise < number > {
// 1. 플로우 정의 조회
const flowDef = await this . flowDefinitionService . findById ( flowId ) ;
if ( ! flowDef ) {
throw new Error ( ` Flow definition not found: ${ flowId } ` ) ;
}
2025-10-21 13:19:18 +09:00
console . log ( "🔍 [getStepDataCount] Flow Definition:" , {
flowId ,
dbSourceType : flowDef.dbSourceType ,
dbConnectionId : flowDef.dbConnectionId ,
tableName : flowDef.tableName ,
} ) ;
2025-10-20 10:55:33 +09:00
// 2. 플로우 단계 조회
const step = await this . flowStepService . findById ( stepId ) ;
if ( ! step ) {
throw new Error ( ` Flow step not found: ${ stepId } ` ) ;
}
if ( step . flowDefinitionId !== flowId ) {
throw new Error ( ` Step ${ stepId } does not belong to flow ${ flowId } ` ) ;
}
// 3. 테이블명 결정: 단계에 지정된 테이블이 있으면 사용, 없으면 플로우의 기본 테이블 사용
const tableName = step . tableName || flowDef . tableName ;
// 4. 조건 JSON을 SQL WHERE절로 변환
const { where , params } = FlowConditionParser . toSqlWhere (
step . conditionJson
) ;
2025-10-21 13:19:18 +09:00
// 5. 카운트 쿼리 실행 (내부 또는 외부 DB)
2025-10-20 10:55:33 +09:00
const query = ` SELECT COUNT(*) as count FROM ${ tableName } WHERE ${ where } ` ;
2025-10-21 13:19:18 +09:00
console . log ( "🔍 [getStepDataCount] Query Info:" , {
tableName ,
query ,
params ,
isExternal : flowDef.dbSourceType === "external" ,
connectionId : flowDef.dbConnectionId ,
} ) ;
let result : any ;
if ( flowDef . dbSourceType === "external" && flowDef . dbConnectionId ) {
// 외부 DB 조회
console . log (
"✅ [getStepDataCount] Using EXTERNAL DB:" ,
flowDef . dbConnectionId
) ;
const externalResult = await executeExternalQuery (
flowDef . dbConnectionId ,
query ,
params
) ;
console . log ( "📦 [getStepDataCount] External result:" , externalResult ) ;
result = externalResult . rows ;
} else {
// 내부 DB 조회
console . log ( "✅ [getStepDataCount] Using INTERNAL DB" ) ;
result = await db . query ( query , params ) ;
}
const count = parseInt ( result [ 0 ] . count || result [ 0 ] . COUNT ) ;
console . log ( "✅ [getStepDataCount] Final count:" , count ) ;
return count ;
2025-10-20 10:55:33 +09:00
}
/ * *
* 특 정 플 로 우 단 계 에 해 당 하 는 데 이 터 리 스 트
* /
async getStepDataList (
flowId : number ,
stepId : number ,
page : number = 1 ,
pageSize : number = 20
) : Promise < FlowStepDataList > {
// 1. 플로우 정의 조회
const flowDef = await this . flowDefinitionService . findById ( flowId ) ;
if ( ! flowDef ) {
throw new Error ( ` Flow definition not found: ${ flowId } ` ) ;
}
// 2. 플로우 단계 조회
const step = await this . flowStepService . findById ( stepId ) ;
if ( ! step ) {
throw new Error ( ` Flow step not found: ${ stepId } ` ) ;
}
if ( step . flowDefinitionId !== flowId ) {
throw new Error ( ` Step ${ stepId } does not belong to flow ${ flowId } ` ) ;
}
// 3. 테이블명 결정: 단계에 지정된 테이블이 있으면 사용, 없으면 플로우의 기본 테이블 사용
const tableName = step . tableName || flowDef . tableName ;
// 4. 조건 JSON을 SQL WHERE절로 변환
const { where , params } = FlowConditionParser . toSqlWhere (
step . conditionJson
) ;
const offset = ( page - 1 ) * pageSize ;
2025-10-21 13:19:18 +09:00
const isExternalDb =
flowDef . dbSourceType === "external" && flowDef . dbConnectionId ;
2025-10-20 10:55:33 +09:00
// 5. 전체 카운트
const countQuery = ` SELECT COUNT(*) as count FROM ${ tableName } WHERE ${ where } ` ;
2025-10-21 13:19:18 +09:00
let countResult : any ;
let total : number ;
if ( isExternalDb ) {
const externalCountResult = await executeExternalQuery (
flowDef . dbConnectionId ! ,
countQuery ,
params
) ;
countResult = externalCountResult . rows ;
total = parseInt ( countResult [ 0 ] . count || countResult [ 0 ] . COUNT ) ;
} else {
countResult = await db . query ( countQuery , params ) ;
total = parseInt ( countResult [ 0 ] . count ) ;
}
// 6. 데이터 조회 (DB 타입별 페이징 처리)
let dataQuery : string ;
let dataParams : any [ ] ;
if ( isExternalDb ) {
// 외부 DB는 id 컬럼으로 정렬 (가정)
// DB 타입에 따른 페이징 절은 빌더에서 처리하지 않고 직접 작성
// PostgreSQL, MySQL, MSSQL, Oracle 모두 지원하도록 단순화
dataQuery = `
SELECT * FROM $ { tableName }
WHERE $ { where }
ORDER BY id DESC
LIMIT $ { pageSize } OFFSET $ { offset }
2025-10-20 10:55:33 +09:00
` ;
2025-10-21 13:19:18 +09:00
dataParams = params ;
const externalDataResult = await executeExternalQuery (
flowDef . dbConnectionId ! ,
dataQuery ,
dataParams
) ;
return {
records : externalDataResult.rows ,
total ,
page ,
pageSize ,
} ;
} else {
// 내부 DB (PostgreSQL)
// Primary Key 컬럼 찾기
let orderByColumn = "" ;
try {
const pkQuery = `
SELECT a . attname
FROM pg_index i
JOIN pg_attribute a ON a . attrelid = i . indrelid AND a . attnum = ANY ( i . indkey )
WHERE i . indrelid = $1 : : regclass
AND i . indisprimary
LIMIT 1
` ;
const pkResult = await db . query ( pkQuery , [ tableName ] ) ;
if ( pkResult . length > 0 ) {
orderByColumn = pkResult [ 0 ] . attname ;
}
} catch ( err ) {
console . warn ( ` Could not find primary key for table ${ tableName } : ` , err ) ;
2025-10-20 10:55:33 +09:00
}
2025-10-21 13:19:18 +09:00
const orderByClause = orderByColumn
? ` ORDER BY ${ orderByColumn } DESC `
: "" ;
dataQuery = `
SELECT * FROM $ { tableName }
WHERE $ { where }
$ { orderByClause }
LIMIT $ $ { params . length + 1 } OFFSET $ $ { params . length + 2 }
` ;
const dataResult = await db . query ( dataQuery , [
. . . params ,
pageSize ,
offset ,
] ) ;
return {
records : dataResult ,
total ,
page ,
pageSize ,
} ;
}
2025-10-20 10:55:33 +09:00
}
/ * *
* 플 로 우 의 모 든 단 계 별 데 이 터 카 운 트
* /
async getAllStepCounts ( flowId : number ) : Promise < FlowStepDataCount [ ] > {
const steps = await this . flowStepService . findByFlowId ( flowId ) ;
const counts : FlowStepDataCount [ ] = [ ] ;
for ( const step of steps ) {
const count = await this . getStepDataCount ( flowId , step . id ) ;
counts . push ( {
stepId : step.id ,
count ,
} ) ;
}
return counts ;
}
/ * *
* 특 정 레 코 드 의 현 재 플 로 우 상 태 조 회
* /
async getCurrentStatus (
flowId : number ,
recordId : string
) : Promise < { currentStepId : number | null ; tableName : string } | null > {
const query = `
SELECT current_step_id , table_name
FROM flow_data_status
WHERE flow_definition_id = $1 AND record_id = $2
` ;
const result = await db . query ( query , [ flowId , recordId ] ) ;
if ( result . length === 0 ) {
return null ;
}
return {
currentStepId : result [ 0 ] . current_step_id ,
tableName : result [ 0 ] . table_name ,
} ;
}
2025-12-08 16:06:43 +09:00
/ * *
* 스 텝 데 이 터 업 데 이 트 ( 인 라 인 편 집 )
* 원 본 테 이 블 의 데 이 터 를 직 접 업 데 이 트 합 니 다 .
* /
async updateStepData (
flowId : number ,
stepId : number ,
recordId : string ,
updateData : Record < string , any > ,
userId : string ,
companyCode? : string
) : Promise < { success : boolean } > {
try {
// 1. 플로우 정의 조회
const flowDef = await this . flowDefinitionService . findById ( flowId ) ;
if ( ! flowDef ) {
throw new Error ( ` Flow definition not found: ${ flowId } ` ) ;
}
// 2. 스텝 조회
const step = await this . flowStepService . findById ( stepId ) ;
if ( ! step ) {
throw new Error ( ` Flow step not found: ${ stepId } ` ) ;
}
// 3. 테이블명 결정
const tableName = step . tableName || flowDef . tableName ;
if ( ! tableName ) {
throw new Error ( "Table name not found" ) ;
}
// 4. Primary Key 컬럼 결정 (기본값: id)
const primaryKeyColumn = flowDef . primaryKey || "id" ;
console . log ( ` 🔍 [updateStepData] Updating table: ${ tableName } , PK: ${ primaryKeyColumn } = ${ recordId } ` ) ;
// 5. SET 절 생성
const updateColumns = Object . keys ( updateData ) ;
if ( updateColumns . length === 0 ) {
throw new Error ( "No columns to update" ) ;
}
// 6. 외부 DB vs 내부 DB 구분
if ( flowDef . dbSourceType === "external" && flowDef . dbConnectionId ) {
// 외부 DB 업데이트
console . log ( "✅ [updateStepData] Using EXTERNAL DB:" , flowDef . dbConnectionId ) ;
// 외부 DB 연결 정보 조회
const connectionResult = await db . query (
"SELECT * FROM external_db_connection WHERE id = $1" ,
[ flowDef . dbConnectionId ]
) ;
if ( connectionResult . length === 0 ) {
throw new Error ( ` External DB connection not found: ${ flowDef . dbConnectionId } ` ) ;
}
const connection = connectionResult [ 0 ] ;
const dbType = connection . db_type ? . toLowerCase ( ) ;
// DB 타입에 따른 placeholder 및 쿼리 생성
let setClause : string ;
let params : any [ ] ;
if ( dbType === "mysql" || dbType === "mariadb" ) {
// MySQL/MariaDB: ? placeholder
setClause = updateColumns . map ( ( col ) = > ` \` ${ col } \` = ? ` ) . join ( ", " ) ;
params = [ . . . Object . values ( updateData ) , recordId ] ;
} else if ( dbType === "mssql" ) {
// MSSQL: @p1, @p2 placeholder
setClause = updateColumns . map ( ( col , idx ) = > ` [ ${ col } ] = @p ${ idx + 1 } ` ) . join ( ", " ) ;
params = [ . . . Object . values ( updateData ) , recordId ] ;
} else {
// PostgreSQL: $1, $2 placeholder
setClause = updateColumns . map ( ( col , idx ) = > ` " ${ col } " = $ ${ idx + 1 } ` ) . join ( ", " ) ;
params = [ . . . Object . values ( updateData ) , recordId ] ;
}
const updateQuery = ` UPDATE ${ tableName } SET ${ setClause } WHERE ${ primaryKeyColumn } = ${ dbType === "mysql" || dbType === "mariadb" ? "?" : dbType === "mssql" ? ` @p ${ params . length } ` : ` $ ${ params . length } ` } ` ;
console . log ( ` 📝 [updateStepData] Query: ${ updateQuery } ` ) ;
console . log ( ` 📝 [updateStepData] Params: ` , params ) ;
await executeExternalQuery ( flowDef . dbConnectionId , updateQuery , params ) ;
} else {
// 내부 DB 업데이트
console . log ( "✅ [updateStepData] Using INTERNAL DB" ) ;
const setClause = updateColumns . map ( ( col , idx ) = > ` " ${ col } " = $ ${ idx + 1 } ` ) . join ( ", " ) ;
const params = [ . . . Object . values ( updateData ) , recordId ] ;
const updateQuery = ` UPDATE " ${ tableName } " SET ${ setClause } WHERE " ${ primaryKeyColumn } " = $ ${ params . length } ` ;
console . log ( ` 📝 [updateStepData] Query: ${ updateQuery } ` ) ;
console . log ( ` 📝 [updateStepData] Params: ` , params ) ;
2025-12-08 16:49:28 +09:00
// 트랜잭션으로 감싸서 사용자 ID 세션 변수 설정 후 업데이트 실행
// (트리거에서 changed_by를 기록하기 위함)
await db . query ( "BEGIN" ) ;
try {
await db . query ( ` SET LOCAL app.user_id = ' ${ userId } ' ` ) ;
await db . query ( updateQuery , params ) ;
await db . query ( "COMMIT" ) ;
} catch ( txError ) {
await db . query ( "ROLLBACK" ) ;
throw txError ;
}
2025-12-08 16:06:43 +09:00
}
console . log ( ` ✅ [updateStepData] Data updated successfully: ${ tableName } . ${ primaryKeyColumn } = ${ recordId } ` , {
updatedFields : updateColumns ,
userId ,
} ) ;
return { success : true } ;
} catch ( error : any ) {
console . error ( "❌ [updateStepData] Error:" , error ) ;
throw error ;
}
}
2025-10-20 10:55:33 +09:00
}