ERP-node/backend-node/src/services/batchExternalDbService.ts

744 lines
22 KiB
TypeScript

// 배치관리 전용 외부 DB 서비스
// 기존 ExternalDbConnectionService와 분리하여 배치관리 시스템에 특화된 기능 제공
// 작성일: 2024-12-24
import { query, queryOne } from "../database/db";
import { PasswordEncryption } from "../utils/passwordEncryption";
import { DatabaseConnectorFactory } from "../database/DatabaseConnectorFactory";
import { RestApiConnector } from "../database/RestApiConnector";
import { ApiResponse, ColumnInfo, TableInfo } from "../types/batchTypes";
export class BatchExternalDbService {
/**
* 배치관리용 외부 DB 연결 목록 조회
*/
static async getAvailableConnections(): Promise<
ApiResponse<
Array<{
type: "internal" | "external";
id?: number;
name: string;
db_type?: string;
}>
>
> {
try {
const connections: Array<{
type: "internal" | "external";
id?: number;
name: string;
db_type?: string;
}> = [];
// 내부 DB 추가
connections.push({
type: "internal",
name: "내부 데이터베이스 (PostgreSQL)",
db_type: "postgresql",
});
// 활성화된 외부 DB 연결 조회
const externalConnections = await query<{
id: number;
connection_name: string;
db_type: string;
description: string;
}>(
`SELECT id, connection_name, db_type, description
FROM external_db_connections
WHERE is_active = 'Y'
ORDER BY connection_name ASC`,
[]
);
// 외부 DB 연결 추가
externalConnections.forEach((conn) => {
connections.push({
type: "external",
id: conn.id,
name: `${conn.connection_name} (${conn.db_type?.toUpperCase()})`,
db_type: conn.db_type || undefined,
});
});
return {
success: true,
data: connections,
message: `${connections.length}개의 연결을 조회했습니다.`,
};
} catch (error) {
console.error("배치관리 연결 목록 조회 실패:", error);
return {
success: false,
message: "연결 목록 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 테이블 목록 조회
*/
static async getTables(
connectionId: number
): Promise<ApiResponse<TableInfo[]>> {
try {
// 연결 정보 조회
const connection = await this.getConnectionById(connectionId);
if (!connection) {
throw new Error("외부 DB 연결 정보를 찾을 수 없습니다.");
}
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
{
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: connection.password,
},
connectionId
);
// 연결
await connector.connect();
// 테이블 목록 조회
const tables = await connector.getTables();
// 연결 종료
await connector.disconnect();
return {
success: true,
data: tables,
message: `${tables.length}개의 테이블을 조회했습니다.`,
};
} catch (error) {
console.error("테이블 목록 조회 실패:", error);
return {
success: false,
message: "테이블 목록 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 컬럼 목록 조회
*/
static async getColumns(
connectionId: number,
tableName: string
): Promise<ApiResponse<ColumnInfo[]>> {
try {
// 연결 정보 조회
const connection = await this.getConnectionById(connectionId);
if (!connection) {
throw new Error("외부 DB 연결 정보를 찾을 수 없습니다.");
}
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
{
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: connection.password,
},
connectionId
);
// 연결
await connector.connect();
// 컬럼 목록 조회
const columns = await connector.getColumns(tableName);
// 연결 종료
await connector.disconnect();
// BatchColumnInfo 형식으로 변환
const batchColumns: ColumnInfo[] = columns.map((col) => ({
column_name: col.column_name,
data_type: col.data_type,
is_nullable: col.is_nullable,
column_default: col.column_default,
}));
return {
success: true,
data: batchColumns,
message: `${batchColumns.length}개의 컬럼을 조회했습니다.`,
};
} catch (error) {
console.error("컬럼 목록 조회 실패:", error);
return {
success: false,
message: "컬럼 목록 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 연결 정보 조회 (내부 메서드)
*/
private static async getConnectionById(id: number) {
const connections = await query<any>(
`SELECT * FROM external_db_connections WHERE id = $1`,
[id]
);
if (connections.length === 0) {
return null;
}
const connection = connections[0];
// 비밀번호 복호화
if (connection.password) {
try {
const passwordEncryption = new PasswordEncryption();
connection.password = passwordEncryption.decrypt(connection.password);
} catch (error) {
console.error("비밀번호 복호화 실패:", error);
// 복호화 실패 시 원본 사용 (또는 에러 처리)
}
}
return connection;
}
/**
* REST API 데이터 미리보기
*/
static async previewRestApiData(
apiUrl: string,
apiKey: string,
endpoint: string,
method: "GET" | "POST" | "PUT" | "DELETE" = "GET",
paramInfo?: {
paramType: "url" | "query";
paramName: string;
paramValue: string;
paramSource: "static" | "dynamic";
},
// 👇 body 파라미터 추가
body?: string
): Promise<ApiResponse<any>> {
try {
// REST API 커넥터 생성
const connector = new RestApiConnector({
baseUrl: apiUrl,
apiKey: apiKey,
timeout: 10000, // 미리보기는 짧은 타임아웃
});
// 파라미터 적용
let finalEndpoint = endpoint;
if (
paramInfo &&
paramInfo.paramName &&
paramInfo.paramValue &&
paramInfo.paramSource === "static"
) {
if (paramInfo.paramType === "url") {
finalEndpoint = endpoint.replace(
`{${paramInfo.paramName}}`,
paramInfo.paramValue
);
} else if (paramInfo.paramType === "query") {
const separator = endpoint.includes("?") ? "&" : "?";
finalEndpoint = `${endpoint}${separator}${paramInfo.paramName}=${paramInfo.paramValue}`;
}
}
// JSON body 파싱
let requestData;
if (body) {
try {
requestData = JSON.parse(body);
} catch (e) {
console.warn("JSON 파싱 실패, 원본 문자열 전송");
requestData = body;
}
}
// 데이터 조회 (직접 RestApiConnector 메서드 호출)
// 타입 단언을 사용하여 private/protected 메서드 우회 또는 인터페이스 확장 필요
// 여기서는 executeRequest가 public이라고 가정
const result = await (connector as any).executeRequest(
finalEndpoint,
method,
requestData
);
return {
success: true,
data: result.data || result, // 데이터가 없으면 전체 결과 반환
message: "데이터 미리보기 성공",
};
} catch (error) {
return {
success: false,
message: "데이터 미리보기 실패",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 외부 DB 테이블 데이터 조회
*/
static async getDataFromTable(
connectionId: number,
tableName: string
): Promise<ApiResponse<any[]>> {
try {
// 연결 정보 조회
const connection = await this.getConnectionById(connectionId);
if (!connection) {
throw new Error("외부 DB 연결 정보를 찾을 수 없습니다.");
}
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
{
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: connection.password,
},
connectionId
);
// 연결
await connector.connect();
// 데이터 조회 (기본 100건)
const result = await connector.executeQuery(
`SELECT * FROM ${tableName} LIMIT 100`
);
// 연결 종료
await connector.disconnect();
return {
success: true,
data: result.rows,
message: `${result.rows.length}개의 데이터를 조회했습니다.`,
};
} catch (error) {
console.error("테이블 데이터 조회 실패:", error);
return {
success: false,
message: "테이블 데이터 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 외부 DB 테이블 데이터 조회 (컬럼 지정)
*/
static async getDataFromTableWithColumns(
connectionId: number,
tableName: string,
columns: string[]
): Promise<ApiResponse<any[]>> {
try {
// 연결 정보 조회
const connection = await this.getConnectionById(connectionId);
if (!connection) {
throw new Error("외부 DB 연결 정보를 찾을 수 없습니다.");
}
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
{
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: connection.password,
},
connectionId
);
// 연결
await connector.connect();
// 컬럼 목록 쿼리 구성
const columnString = columns.join(", ");
// 데이터 조회 (기본 100건)
const result = await connector.executeQuery(
`SELECT ${columnString} FROM ${tableName} LIMIT 100`
);
// 연결 종료
await connector.disconnect();
return {
success: true,
data: result.rows,
message: `${result.rows.length}개의 데이터를 조회했습니다.`,
};
} catch (error) {
console.error("테이블 데이터 조회 실패:", error);
return {
success: false,
message: "테이블 데이터 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 테이블에 데이터 삽입
*/
static async insertDataToTable(
connectionId: number,
tableName: string,
data: any[]
): Promise<ApiResponse<{ successCount: number; failedCount: number }>> {
try {
// 연결 정보 조회
const connection = await this.getConnectionById(connectionId);
if (!connection) {
throw new Error("외부 DB 연결 정보를 찾을 수 없습니다.");
}
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
{
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: connection.password,
},
connectionId
);
// 연결
await connector.connect();
let successCount = 0;
let failedCount = 0;
// 트랜잭션 시작 (지원하는 경우)
// await connector.beginTransaction();
try {
// 각 레코드를 개별적으로 삽입
for (const record of data) {
try {
// 쿼리 빌더 사용 (간단한 구현)
const columns = Object.keys(record);
const values = Object.values(record);
const placeholders = values
.map((_, i) => (connection.db_type === "postgresql" ? `$${i + 1}` : "?"))
.join(", ");
const query = `INSERT INTO ${tableName} (${columns.join(
", "
)}) VALUES (${placeholders})`;
// 파라미터 매핑 (PostgreSQL은 $1, $2..., MySQL은 ?)
await connector.executeQuery(query, values);
successCount++;
} catch (insertError) {
console.error("레코드 삽입 실패:", insertError);
failedCount++;
}
}
// 트랜잭션 커밋
// await connector.commit();
} catch (txError) {
// 트랜잭션 롤백
// await connector.rollback();
throw txError;
} finally {
// 연결 종료
await connector.disconnect();
}
return {
success: true,
data: { successCount, failedCount },
message: `${successCount}건 성공, ${failedCount}건 실패`,
};
} catch (error) {
console.error("데이터 삽입 실패:", error);
return {
success: false,
message: "데이터 삽입 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* REST API에서 데이터 조회
*/
static async getDataFromRestApi(
apiUrl: string,
apiKey: string,
endpoint: string,
method: "GET" | "POST" | "PUT" | "DELETE" = "GET",
columns?: string[],
limit: number = 100,
// 파라미터 정보 추가
paramType?: "url" | "query",
paramName?: string,
paramValue?: string,
paramSource?: "static" | "dynamic",
// 👇 body 파라미터 추가
body?: string
): Promise<ApiResponse<any[]>> {
try {
console.log(
`[BatchExternalDbService] REST API 데이터 조회: ${apiUrl}${endpoint}`
);
// REST API 커넥터 생성
const connector = new RestApiConnector({
baseUrl: apiUrl,
apiKey: apiKey,
timeout: 30000,
});
// 연결 테스트
await connector.connect();
// 파라미터가 있는 경우 엔드포인트 수정
const { logger } = await import("../utils/logger");
logger.info(`[BatchExternalDbService] 파라미터 정보`, {
paramType,
paramName,
paramValue,
paramSource,
});
let finalEndpoint = endpoint;
if (paramType && paramName && paramValue) {
if (paramType === "url") {
// URL 파라미터: /api/users/{userId} → /api/users/123
if (endpoint.includes(`{${paramName}}`)) {
finalEndpoint = endpoint.replace(`{${paramName}}`, paramValue);
} else {
// 엔드포인트에 {paramName}이 없으면 뒤에 추가
finalEndpoint = `${endpoint}/${paramValue}`;
}
} else if (paramType === "query") {
// 쿼리 파라미터: /api/users?userId=123
const separator = endpoint.includes("?") ? "&" : "?";
finalEndpoint = `${endpoint}${separator}${paramName}=${paramValue}`;
}
logger.info(
`[BatchExternalDbService] 파라미터 적용된 엔드포인트: ${finalEndpoint}`
);
}
// 👇 Body 파싱 (POST/PUT 요청 시)
let requestData;
if (body && (method === 'POST' || method === 'PUT')) {
try {
// 템플릿 변수가 있을 수 있으므로 여기서는 원본 문자열을 사용하거나
// 정적 값만 파싱. 여기서는 일단 정적 JSON으로 가정하고 파싱 시도.
// (BatchScheduler에서 템플릿 처리 후 전달하는 것이 이상적이나,
// 현재 구조상 여기서 파싱 시도하고 실패하면 문자열 그대로 전송)
requestData = JSON.parse(body);
} catch (e) {
console.warn("JSON 파싱 실패, 원본 문자열 전송");
requestData = body;
}
}
// 데이터 조회 (REST API는 executeRequest 사용)
let result;
if ((connector as any).executeRequest) {
// executeRequest(endpoint, method, data)
result = await (connector as any).executeRequest(
finalEndpoint,
method,
requestData // body 전달
);
} else {
// Fallback (GET only)
result = await connector.executeQuery(finalEndpoint);
}
let data = result.rows || result.data || result;
// 👇 단일 객체 응답(토큰 등)인 경우 배열로 래핑하여 리스트처럼 처리
if (!Array.isArray(data)) {
data = [data];
}
return {
success: true,
data: data,
message: `${data.length}개의 데이터를 조회했습니다.`,
};
} catch (error) {
console.error("REST API 데이터 조회 실패:", error);
return {
success: false,
message: "REST API 데이터 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 템플릿 기반 REST API로 데이터 전송 (DB → REST API 배치용)
*/
static async sendDataToRestApiWithTemplate(
apiUrl: string,
apiKey: string,
endpoint: string,
method: "POST" | "PUT" | "DELETE" = "POST",
templateBody: string,
data: any[],
urlPathColumn?: string // URL 경로에 사용할 컬럼명 (PUT/DELETE용)
): Promise<ApiResponse<{ successCount: number; failedCount: number }>> {
try {
console.log(
`[BatchExternalDbService] 템플릿 기반 REST API 데이터 전송: ${apiUrl}${endpoint}, ${data.length}개 레코드`
);
console.log(
`[BatchExternalDbService] Request Body 템플릿:`,
templateBody
);
// REST API 커넥터 생성
const connector = new RestApiConnector({
baseUrl: apiUrl,
apiKey: apiKey,
timeout: 30000,
});
// 연결 테스트
await connector.connect();
let successCount = 0;
let failedCount = 0;
// 각 레코드를 개별적으로 전송
for (const record of data) {
try {
// 템플릿 처리: {{컬럼명}} → 실제 값으로 치환
let processedBody = templateBody;
for (const [key, value] of Object.entries(record)) {
const placeholder = `{{${key}}}`;
let stringValue = "";
if (value !== null && value !== undefined) {
// Date 객체인 경우 다양한 포맷으로 변환
if (value instanceof Date) {
// ISO 형식: 2025-09-25T07:22:52.000Z
stringValue = value.toISOString();
// 다른 포맷이 필요한 경우 여기서 처리
// 예: YYYY-MM-DD 형식
// stringValue = value.toISOString().split('T')[0];
// 예: YYYY-MM-DD HH:mm:ss 형식
// stringValue = value.toISOString().replace('T', ' ').replace(/\.\d{3}Z$/, '');
} else {
stringValue = String(value);
}
}
processedBody = processedBody.replace(
new RegExp(placeholder.replace(/[{}]/g, "\\$&"), "g"),
stringValue
);
}
console.log(`[BatchExternalDbService] 원본 레코드:`, record);
console.log(
`[BatchExternalDbService] 처리된 Request Body:`,
processedBody
);
// JSON 파싱하여 객체로 변환
let requestData;
try {
requestData = JSON.parse(processedBody);
} catch (parseError) {
console.error(
`[BatchExternalDbService] JSON 파싱 오류:`,
parseError
);
throw new Error(`Request Body JSON 파싱 실패: ${parseError}`);
}
// URL 경로 파라미터 처리 (PUT/DELETE용)
let finalEndpoint = endpoint;
if (
(method === "PUT" || method === "DELETE") &&
urlPathColumn &&
record[urlPathColumn]
) {
// endpoint 마지막에 ID 추가 (예: /api/users -> /api/users/123)
// 이미 /로 끝나는지 확인
const separator = finalEndpoint.endsWith("/") ? "" : "/";
finalEndpoint = `${finalEndpoint}${separator}${record[urlPathColumn]}`;
console.log(`[BatchExternalDbService] 동적 엔드포인트: ${finalEndpoint}`);
}
// 데이터 전송 (REST API는 executeRequest 사용)
if ((connector as any).executeRequest) {
await (connector as any).executeRequest(
finalEndpoint,
method,
requestData
);
} else {
// Fallback
// @ts-ignore
await connector.httpClient.request({
method: method,
url: finalEndpoint,
data: requestData
});
}
successCount++;
} catch (sendError) {
console.error("데이터 전송 실패:", sendError);
failedCount++;
}
}
return {
success: true,
data: { successCount, failedCount },
message: `${successCount}건 성공, ${failedCount}건 실패`,
};
} catch (error) {
console.error("데이터 전송 실패:", error);
return {
success: false,
message: "데이터 전송 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
}