ERP-node/backend-node/src/services/batchService.ts

1074 lines
36 KiB
TypeScript

// 배치관리 서비스
// 작성일: 2024-12-24
import { query, queryOne, transaction } from "../database/db";
import {
BatchConfig,
BatchMapping,
BatchConfigFilter,
BatchMappingRequest,
BatchValidationResult,
ApiResponse,
ConnectionInfo,
TableInfo,
ColumnInfo,
CreateBatchConfigRequest,
UpdateBatchConfigRequest,
} from "../types/batchTypes";
import { BatchExternalDbService } from "./batchExternalDbService";
import { DbConnectionManager } from "./dbConnectionManager";
export class BatchService {
/**
* 배치 설정 목록 조회
*/
static async getBatchConfigs(
filter: BatchConfigFilter
): Promise<ApiResponse<BatchConfig[]>> {
try {
const whereConditions: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// 필터 조건 적용
if (filter.is_active) {
whereConditions.push(`bc.is_active = $${paramIndex++}`);
values.push(filter.is_active);
}
if (filter.company_code) {
whereConditions.push(`bc.company_code = $${paramIndex++}`);
values.push(filter.company_code);
}
// 검색 조건 적용 (OR)
if (filter.search && filter.search.trim()) {
whereConditions.push(
`(bc.batch_name ILIKE $${paramIndex} OR bc.description ILIKE $${paramIndex})`
);
values.push(`%${filter.search.trim()}%`);
paramIndex++;
}
const whereClause =
whereConditions.length > 0
? `WHERE ${whereConditions.join(" AND ")}`
: "";
const page = filter.page || 1;
const limit = filter.limit || 10;
const offset = (page - 1) * limit;
// 배치 설정 조회 (매핑 포함 - 서브쿼리 사용)
const batchConfigs = await query<any>(
`SELECT bc.*,
COALESCE(
json_agg(
json_build_object(
'mapping_id', bm.mapping_id,
'batch_id', bm.batch_id,
'source_column', bm.source_column,
'target_column', bm.target_column,
'transformation_rule', bm.transformation_rule
)
) FILTER (WHERE bm.mapping_id IS NOT NULL),
'[]'
) as batch_mappings
FROM batch_configs bc
LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id
${whereClause}
GROUP BY bc.batch_id
ORDER BY bc.is_active DESC, bc.batch_name ASC
LIMIT $${paramIndex} OFFSET $${paramIndex + 1}`,
[...values, limit, offset]
);
// 전체 개수 조회
const countResult = await queryOne<{ count: string }>(
`SELECT COUNT(DISTINCT bc.batch_id) as count
FROM batch_configs bc
${whereClause}`,
values
);
const total = parseInt(countResult?.count || "0");
return {
success: true,
data: batchConfigs as BatchConfig[],
pagination: {
page,
limit,
total,
totalPages: Math.ceil(total / limit),
},
};
} catch (error) {
console.error("배치 설정 목록 조회 오류:", error);
return {
success: false,
message: "배치 설정 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 특정 배치 설정 조회
*/
static async getBatchConfigById(
id: number
): Promise<ApiResponse<BatchConfig>> {
try {
const batchConfig = await queryOne<any>(
`SELECT bc.*,
COALESCE(
json_agg(
json_build_object(
'mapping_id', bm.mapping_id,
'batch_id', bm.batch_id,
'from_table_name', bm.from_table_name,
'from_column_name', bm.from_column_name,
'to_table_name', bm.to_table_name,
'to_column_name', bm.to_column_name,
'mapping_order', bm.mapping_order,
'source_column', bm.source_column,
'target_column', bm.target_column,
'transformation_rule', bm.transformation_rule
)
ORDER BY bm.from_table_name ASC, bm.from_column_name ASC, bm.mapping_order ASC
) FILTER (WHERE bm.mapping_id IS NOT NULL),
'[]'
) as batch_mappings
FROM batch_configs bc
LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id
WHERE bc.id = $1
GROUP BY bc.batch_id`,
[id]
);
if (!batchConfig) {
return {
success: false,
message: "배치 설정을 찾을 수 없습니다.",
};
}
return {
success: true,
data: batchConfig as BatchConfig,
};
} catch (error) {
console.error("배치 설정 조회 오류:", error);
return {
success: false,
message: "배치 설정 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 설정 생성
*/
static async createBatchConfig(
data: CreateBatchConfigRequest,
userId?: string
): Promise<ApiResponse<BatchConfig>> {
try {
// 트랜잭션으로 배치 설정과 매핑 생성
const result = await transaction(async (client) => {
// 배치 설정 생성
const batchConfigResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, created_by, updated_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
RETURNING *`,
[data.batchName, data.description, data.cronSchedule, userId, userId]
);
const batchConfig = batchConfigResult.rows[0];
// 배치 매핑 생성
const mappings = [];
for (let index = 0; index < data.mappings.length; index++) {
const mapping = data.mappings[index];
const mappingResult = await client.query(
`INSERT INTO batch_mappings
(batch_config_id, from_connection_type, from_connection_id, from_table_name, from_column_name,
from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type,
from_api_param_name, from_api_param_value, from_api_param_source,
to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type,
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, NOW())
RETURNING *`,
[
batchConfig.id,
mapping.from_connection_type,
mapping.from_connection_id,
mapping.from_table_name,
mapping.from_column_name,
mapping.from_column_type,
mapping.from_api_url,
mapping.from_api_key,
mapping.from_api_method,
mapping.from_api_param_type,
mapping.from_api_param_name,
mapping.from_api_param_value,
mapping.from_api_param_source,
mapping.to_connection_type,
mapping.to_connection_id,
mapping.to_table_name,
mapping.to_column_name,
mapping.to_column_type,
mapping.to_api_url,
mapping.to_api_key,
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
userId,
]
);
mappings.push(mappingResult.rows[0]);
}
return {
...batchConfig,
batch_mappings: mappings,
};
});
return {
success: true,
data: result as BatchConfig,
message: "배치 설정이 성공적으로 생성되었습니다.",
};
} catch (error) {
console.error("배치 설정 생성 오류:", error);
return {
success: false,
message: "배치 설정 생성에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 설정 수정
*/
static async updateBatchConfig(
id: number,
data: UpdateBatchConfigRequest,
userId?: string
): Promise<ApiResponse<BatchConfig>> {
try {
// 기존 배치 설정 확인
const existingConfig = await queryOne<any>(
`SELECT bc.*,
COALESCE(
json_agg(
json_build_object(
'mapping_id', bm.mapping_id,
'batch_id', bm.batch_id
)
) FILTER (WHERE bm.mapping_id IS NOT NULL),
'[]'
) as batch_mappings
FROM batch_configs bc
LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id
WHERE bc.id = $1
GROUP BY bc.batch_id`,
[id]
);
if (!existingConfig) {
return {
success: false,
message: "배치 설정을 찾을 수 없습니다.",
};
}
// 트랜잭션으로 업데이트
const result = await transaction(async (client) => {
// 동적 UPDATE 쿼리 생성
const updateFields: string[] = [
"updated_by = $1",
"updated_date = NOW()",
];
const updateValues: any[] = [userId];
let paramIndex = 2;
if (data.batchName) {
updateFields.push(`batch_name = $${paramIndex++}`);
updateValues.push(data.batchName);
}
if (data.description !== undefined) {
updateFields.push(`description = $${paramIndex++}`);
updateValues.push(data.description);
}
if (data.cronSchedule) {
updateFields.push(`cron_schedule = $${paramIndex++}`);
updateValues.push(data.cronSchedule);
}
if (data.isActive !== undefined) {
updateFields.push(`is_active = $${paramIndex++}`);
updateValues.push(data.isActive);
}
// 배치 설정 업데이트
const batchConfigResult = await client.query(
`UPDATE batch_configs
SET ${updateFields.join(", ")}
WHERE id = $${paramIndex}
RETURNING *`,
[...updateValues, id]
);
const batchConfig = batchConfigResult.rows[0];
// 매핑이 제공된 경우 기존 매핑 삭제 후 새로 생성
if (data.mappings) {
await client.query(
`DELETE FROM batch_mappings WHERE batch_config_id = $1`,
[id]
);
const mappings = [];
for (let index = 0; index < data.mappings.length; index++) {
const mapping = data.mappings[index];
const mappingResult = await client.query(
`INSERT INTO batch_mappings
(batch_config_id, from_connection_type, from_connection_id, from_table_name, from_column_name,
from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type,
from_api_param_name, from_api_param_value, from_api_param_source,
to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type,
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, NOW())
RETURNING *`,
[
id,
mapping.from_connection_type,
mapping.from_connection_id,
mapping.from_table_name,
mapping.from_column_name,
mapping.from_column_type,
mapping.from_api_url,
mapping.from_api_key,
mapping.from_api_method,
mapping.from_api_param_type,
mapping.from_api_param_name,
mapping.from_api_param_value,
mapping.from_api_param_source,
mapping.to_connection_type,
mapping.to_connection_id,
mapping.to_table_name,
mapping.to_column_name,
mapping.to_column_type,
mapping.to_api_url,
mapping.to_api_key,
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
userId,
]
);
mappings.push(mappingResult.rows[0]);
}
return {
...batchConfig,
batch_mappings: mappings,
};
} else {
return {
...batchConfig,
batch_mappings: existingConfig.batch_mappings,
};
}
});
return {
success: true,
data: result as BatchConfig,
message: "배치 설정이 성공적으로 수정되었습니다.",
};
} catch (error) {
console.error("배치 설정 수정 오류:", error);
return {
success: false,
message: "배치 설정 수정에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 설정 삭제 (논리 삭제)
*/
static async deleteBatchConfig(
id: number,
userId?: string
): Promise<ApiResponse<void>> {
try {
const existingConfig = await queryOne<any>(
`SELECT * FROM batch_configs WHERE id = $1`,
[id]
);
if (!existingConfig) {
return {
success: false,
message: "배치 설정을 찾을 수 없습니다.",
};
}
// 트랜잭션으로 삭제
await transaction(async (client) => {
// 배치 매핑 먼저 삭제 (외래키 제약)
await client.query(
`DELETE FROM batch_mappings WHERE batch_config_id = $1`,
[id]
);
// 배치 설정 삭제
await client.query(`DELETE FROM batch_configs WHERE id = $1`, [id]);
});
return {
success: true,
message: "배치 설정이 성공적으로 삭제되었습니다.",
};
} catch (error) {
console.error("배치 설정 삭제 오류:", error);
return {
success: false,
message: "배치 설정 삭제에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 사용 가능한 커넥션 목록 조회
*/
static async getAvailableConnections(): Promise<
ApiResponse<ConnectionInfo[]>
> {
try {
const connections: ConnectionInfo[] = [];
// 내부 DB 추가
connections.push({
type: "internal",
name: "Internal Database",
db_type: "postgresql",
});
// 외부 DB 연결 조회
const externalConnections =
await BatchExternalDbService.getAvailableConnections();
if (externalConnections.success && externalConnections.data) {
externalConnections.data.forEach((conn) => {
connections.push({
type: "external",
id: conn.id,
name: conn.name,
db_type: conn.db_type,
});
});
}
return {
success: true,
data: connections,
};
} catch (error) {
console.error("커넥션 목록 조회 오류:", error);
return {
success: false,
message: "커넥션 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 특정 커넥션의 테이블 목록 조회
*/
static async getTablesFromConnection(
connectionType: "internal" | "external",
connectionId?: number
): Promise<ApiResponse<TableInfo[]>> {
try {
let tables: TableInfo[] = [];
if (connectionType === "internal") {
// 내부 DB 테이블 조회
const result = await query<{ table_name: string }>(
`SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name`
);
tables = result.map((row) => ({
table_name: row.table_name,
columns: [],
}));
} else if (connectionType === "external" && connectionId) {
// 외부 DB 테이블 조회
const tablesResult =
await BatchExternalDbService.getTablesFromConnection(
connectionType,
connectionId
);
if (tablesResult.success && tablesResult.data) {
tables = tablesResult.data;
}
}
return {
success: true,
data: tables,
};
} catch (error) {
console.error("테이블 목록 조회 오류:", error);
return {
success: false,
message: "테이블 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 특정 테이블의 컬럼 정보 조회
*/
static async getTableColumns(
connectionType: "internal" | "external",
connectionId: number | undefined,
tableName: string
): Promise<ApiResponse<ColumnInfo[]>> {
try {
console.log(`[BatchService] getTableColumns 호출:`, {
connectionType,
connectionId,
tableName,
});
let columns: ColumnInfo[] = [];
if (connectionType === "internal") {
// 내부 DB 컬럼 조회
console.log(`[BatchService] 내부 DB 컬럼 조회 시작: ${tableName}`);
const result = await query<{
column_name: string;
data_type: string;
is_nullable: string;
column_default: string | null;
}>(
`SELECT
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = $1
ORDER BY ordinal_position`,
[tableName]
);
console.log(`[BatchService] 내부 DB 컬럼 조회 결과:`, result);
columns = result.map((row) => ({
column_name: row.column_name,
data_type: row.data_type,
is_nullable: row.is_nullable,
column_default: row.column_default,
}));
} else if (connectionType === "external" && connectionId) {
// 외부 DB 컬럼 조회
console.log(
`[BatchService] 외부 DB 컬럼 조회 시작: connectionId=${connectionId}, tableName=${tableName}`
);
const columnsResult = await BatchExternalDbService.getTableColumns(
connectionType,
connectionId,
tableName
);
console.log(`[BatchService] 외부 DB 컬럼 조회 결과:`, columnsResult);
if (columnsResult.success && columnsResult.data) {
columns = columnsResult.data;
}
console.log(`[BatchService] 외부 DB 컬럼:`, columns);
}
return {
success: true,
data: columns,
};
} catch (error) {
console.error("컬럼 정보 조회 오류:", error);
return {
success: false,
message: "컬럼 정보 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 실행 로그 생성
*/
static async createExecutionLog(data: {
batch_config_id: number;
execution_status: string;
start_time: Date;
total_records: number;
success_records: number;
failed_records: number;
}): Promise<any> {
try {
const executionLog = await queryOne<any>(
`INSERT INTO batch_execution_logs
(batch_config_id, execution_status, start_time, total_records, success_records, failed_records)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *`,
[
data.batch_config_id,
data.execution_status,
data.start_time,
data.total_records,
data.success_records,
data.failed_records,
]
);
return executionLog;
} catch (error) {
console.error("배치 실행 로그 생성 오류:", error);
throw error;
}
}
/**
* 배치 실행 로그 업데이트
*/
static async updateExecutionLog(
id: number,
data: {
execution_status?: string;
end_time?: Date;
duration_ms?: number;
total_records?: number;
success_records?: number;
failed_records?: number;
error_message?: string;
}
): Promise<void> {
try {
// 동적 UPDATE 쿼리 생성
const updateFields: string[] = [];
const values: any[] = [];
let paramIndex = 1;
if (data.execution_status !== undefined) {
updateFields.push(`execution_status = $${paramIndex++}`);
values.push(data.execution_status);
}
if (data.end_time !== undefined) {
updateFields.push(`end_time = $${paramIndex++}`);
values.push(data.end_time);
}
if (data.duration_ms !== undefined) {
updateFields.push(`duration_ms = $${paramIndex++}`);
values.push(data.duration_ms);
}
if (data.total_records !== undefined) {
updateFields.push(`total_records = $${paramIndex++}`);
values.push(data.total_records);
}
if (data.success_records !== undefined) {
updateFields.push(`success_records = $${paramIndex++}`);
values.push(data.success_records);
}
if (data.failed_records !== undefined) {
updateFields.push(`failed_records = $${paramIndex++}`);
values.push(data.failed_records);
}
if (data.error_message !== undefined) {
updateFields.push(`error_message = $${paramIndex++}`);
values.push(data.error_message);
}
if (updateFields.length > 0) {
await query(
`UPDATE batch_execution_logs
SET ${updateFields.join(", ")}
WHERE id = $${paramIndex}`,
[...values, id]
);
}
} catch (error) {
console.error("배치 실행 로그 업데이트 오류:", error);
throw error;
}
}
/**
* 테이블에서 데이터 조회 (연결 타입에 따라 내부/외부 DB 구분)
*/
static async getDataFromTable(
tableName: string,
connectionType: "internal" | "external" = "internal",
connectionId?: number
): Promise<any[]> {
try {
console.log(
`[BatchService] 테이블에서 데이터 조회: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""})`
);
if (connectionType === "internal") {
// 내부 DB에서 데이터 조회 (주의: SQL 인젝션 위험 - 실제 프로덕션에서는 테이블명 검증 필요)
const result = await query<any>(`SELECT * FROM ${tableName} LIMIT 100`);
console.log(
`[BatchService] 내부 DB 데이터 조회 결과: ${result.length}개 레코드`
);
return result;
} else if (connectionType === "external" && connectionId) {
// 외부 DB에서 데이터 조회
const result = await BatchExternalDbService.getDataFromTable(
connectionId,
tableName
);
if (result.success && result.data) {
console.log(
`[BatchService] 외부 DB 데이터 조회 결과: ${result.data.length}개 레코드`
);
return result.data;
} else {
console.error(`외부 DB 데이터 조회 실패: ${result.message}`);
return [];
}
} else {
throw new Error(
`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`
);
}
} catch (error) {
console.error(`테이블 데이터 조회 오류 (${tableName}):`, error);
throw error;
}
}
/**
* 테이블에서 특정 컬럼들만 조회 (연결 타입에 따라 내부/외부 DB 구분)
*/
static async getDataFromTableWithColumns(
tableName: string,
columns: string[],
connectionType: "internal" | "external" = "internal",
connectionId?: number
): Promise<any[]> {
try {
console.log(
`[BatchService] 테이블에서 특정 컬럼 데이터 조회: ${tableName} (${columns.join(", ")}) (${connectionType}${connectionId ? `:${connectionId}` : ""})`
);
if (connectionType === "internal") {
// 내부 DB에서 특정 컬럼만 조회 (주의: SQL 인젝션 위험 - 실제 프로덕션에서는 테이블명/컬럼명 검증 필요)
const columnList = columns.join(", ");
const result = await query<any>(
`SELECT ${columnList} FROM ${tableName} LIMIT 100`
);
console.log(
`[BatchService] 내부 DB 특정 컬럼 조회 결과: ${result.length}개 레코드`
);
return result;
} else if (connectionType === "external" && connectionId) {
// 외부 DB에서 특정 컬럼만 조회
const result = await BatchExternalDbService.getDataFromTableWithColumns(
connectionId,
tableName,
columns
);
if (result.success && result.data) {
console.log(
`[BatchService] 외부 DB 특정 컬럼 조회 결과: ${result.data.length}개 레코드`
);
return result.data;
} else {
console.error(`외부 DB 특정 컬럼 조회 실패: ${result.message}`);
return [];
}
} else {
throw new Error(
`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`
);
}
} catch (error) {
console.error(`테이블 특정 컬럼 조회 오류 (${tableName}):`, error);
throw error;
}
}
/**
* 테이블에 데이터 삽입 (연결 타입에 따라 내부/외부 DB 구분)
*/
static async insertDataToTable(
tableName: string,
data: any[],
connectionType: "internal" | "external" = "internal",
connectionId?: number
): Promise<{
successCount: number;
failedCount: number;
}> {
try {
console.log(
`[BatchService] 테이블에 데이터 삽입: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""}), ${data.length}개 레코드`
);
if (!data || data.length === 0) {
return { successCount: 0, failedCount: 0 };
}
if (connectionType === "internal") {
// 내부 DB에 데이터 삽입
let successCount = 0;
let failedCount = 0;
// 각 레코드를 개별적으로 삽입 (UPSERT 방식으로 중복 처리)
for (const record of data) {
try {
// 동적 UPSERT 쿼리 생성 (PostgreSQL ON CONFLICT 사용)
const columns = Object.keys(record);
const values = Object.values(record).map((value) => {
// Date 객체를 ISO 문자열로 변환 (PostgreSQL이 자동으로 파싱)
if (value instanceof Date) {
return value.toISOString();
}
// JavaScript Date 문자열을 Date 객체로 변환 후 ISO 문자열로
if (typeof value === "string") {
const dateRegex =
/^(Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{2}\s+\d{4}\s+\d{2}:\d{2}:\d{2}/;
if (dateRegex.test(value)) {
return new Date(value).toISOString();
}
// ISO 날짜 문자열 형식 체크 (2025-09-24T06:29:01.351Z)
const isoDateRegex =
/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z?$/;
if (isoDateRegex.test(value)) {
return new Date(value).toISOString();
}
}
return value;
});
// PostgreSQL 타입 캐스팅을 위한 placeholder 생성
const placeholders = columns
.map((col, index) => {
// 날짜/시간 관련 컬럼명 패턴 체크
if (
col.toLowerCase().includes("date") ||
col.toLowerCase().includes("time") ||
col.toLowerCase().includes("created") ||
col.toLowerCase().includes("updated") ||
col.toLowerCase().includes("reg")
) {
return `$${index + 1}::timestamp`;
}
return `$${index + 1}`;
})
.join(", ");
// Primary Key 컬럼 추정 (일반적으로 id 또는 첫 번째 컬럼)
const primaryKeyColumn = columns.includes("id")
? "id"
: columns.includes("user_id")
? "user_id"
: columns[0];
// UPDATE SET 절 생성 (Primary Key 제외)
const updateColumns = columns.filter(
(col) => col !== primaryKeyColumn
);
const updateSet = updateColumns
.map((col) => `${col} = EXCLUDED.${col}`)
.join(", ");
// 트랜잭션 내에서 처리하여 연결 관리 최적화
const result = await transaction(async (client) => {
// 먼저 해당 레코드가 존재하는지 확인
const checkQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${primaryKeyColumn} = $1`;
const existsResult = await client.query(checkQuery, [
record[primaryKeyColumn],
]);
const exists = parseInt(existsResult.rows[0]?.count || "0") > 0;
let operationResult = "no_change";
if (exists && updateSet) {
// 기존 레코드가 있으면 UPDATE (값이 다른 경우에만)
const whereConditions = updateColumns
.map((col, index) => {
// 날짜/시간 컬럼에 대한 타입 캐스팅 처리
if (
col.toLowerCase().includes("date") ||
col.toLowerCase().includes("time") ||
col.toLowerCase().includes("created") ||
col.toLowerCase().includes("updated") ||
col.toLowerCase().includes("reg")
) {
return `${col} IS DISTINCT FROM $${index + 2}::timestamp`;
}
return `${col} IS DISTINCT FROM $${index + 2}`;
})
.join(" OR ");
const query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, "")}
WHERE ${primaryKeyColumn} = $1 AND (${whereConditions})`;
// 파라미터: [primaryKeyValue, ...updateValues]
const updateValues = [
record[primaryKeyColumn],
...updateColumns.map((col) => record[col]),
];
const updateResult = await client.query(query, updateValues);
if (updateResult.rowCount && updateResult.rowCount > 0) {
console.log(
`[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}`
);
operationResult = "updated";
} else {
console.log(
`[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}`
);
operationResult = "no_change";
}
} else if (!exists) {
// 새 레코드 삽입
const query = `INSERT INTO ${tableName} (${columns.join(", ")}) VALUES (${placeholders})`;
await client.query(query, values);
console.log(
`[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}`
);
operationResult = "inserted";
} else {
console.log(
`[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}`
);
operationResult = "no_change";
}
return operationResult;
});
successCount++;
} catch (error) {
console.error(`레코드 UPSERT 실패:`, error);
failedCount++;
}
}
console.log(
`[BatchService] 내부 DB 데이터 삽입 완료: 성공 ${successCount}개, 실패 ${failedCount}`
);
return { successCount, failedCount };
} else if (connectionType === "external" && connectionId) {
// 외부 DB에 데이터 삽입
const result = await BatchExternalDbService.insertDataToTable(
connectionId,
tableName,
data
);
if (result.success && result.data) {
console.log(
`[BatchService] 외부 DB 데이터 삽입 완료: 성공 ${result.data.successCount}개, 실패 ${result.data.failedCount}`
);
return result.data;
} else {
console.error(`외부 DB 데이터 삽입 실패: ${result.message}`);
return { successCount: 0, failedCount: data.length };
}
} else {
console.log(`[BatchService] 연결 정보 디버그:`, {
connectionType,
connectionId,
});
throw new Error(
`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`
);
}
} catch (error) {
console.error(`테이블 데이터 삽입 오류 (${tableName}):`, error);
throw error;
}
}
/**
* 배치 매핑 유효성 검사
*/
private static async validateBatchMappings(
mappings: BatchMapping[]
): Promise<BatchValidationResult> {
const errors: string[] = [];
const warnings: string[] = [];
if (!mappings || mappings.length === 0) {
errors.push("최소 하나 이상의 매핑이 필요합니다.");
return { isValid: false, errors, warnings };
}
// n:1 매핑 검사 (여러 FROM이 같은 TO로 매핑되는 것 방지)
const toMappings = new Map<string, number>();
mappings.forEach((mapping, index) => {
const toKey = `${mapping.to_connection_type}:${mapping.to_connection_id || "internal"}:${mapping.to_table_name}:${mapping.to_column_name}`;
if (toMappings.has(toKey)) {
errors.push(
`매핑 ${index + 1}: TO 컬럼 '${mapping.to_table_name}.${mapping.to_column_name}'에 중복 매핑이 있습니다. n:1 매핑은 허용되지 않습니다.`
);
} else {
toMappings.set(toKey, index);
}
});
// 1:n 매핑 경고 (같은 FROM에서 여러 TO로 매핑)
const fromMappings = new Map<string, number[]>();
mappings.forEach((mapping, index) => {
const fromKey = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}:${mapping.from_column_name}`;
if (!fromMappings.has(fromKey)) {
fromMappings.set(fromKey, []);
}
fromMappings.get(fromKey)!.push(index);
});
fromMappings.forEach((indices, fromKey) => {
if (indices.length > 1) {
const [, , tableName, columnName] = fromKey.split(":");
warnings.push(
`FROM 컬럼 '${tableName}.${columnName}'에서 ${indices.length}개의 TO 컬럼으로 매핑됩니다. (1:n 매핑)`
);
}
});
return {
isValid: errors.length === 0,
errors,
warnings,
};
}
}