package cn.net.communion.dbdatasync;
|
|
import cn.net.communion.dbdatasync.dbhelper.DbHelper;
|
import cn.net.communion.dbdatasync.dbhelper.Factory;
|
import cn.net.communion.dbdatasync.entity.DbInfo;
|
import cn.net.communion.dbdatasync.entity.JobInfo;
|
|
import java.sql.Connection;
|
import java.sql.DriverManager;
|
import java.sql.SQLException;
|
import java.util.Date;
|
|
import org.apache.log4j.Logger;
|
import org.quartz.Job;
|
import org.quartz.JobDataMap;
|
import org.quartz.JobExecutionContext;
|
import org.quartz.JobExecutionException;
|
|
public class DataTask implements Job {
|
private Logger logger = Logger.getLogger(DataTask.class);
|
|
@Override
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
logger.info("开始任务调度: " + new Date());
|
Connection inConn = null;
|
Connection outConn = null;
|
JobDataMap data = context.getJobDetail().getJobDataMap();
|
DbInfo srcDb = (DbInfo) data.get("srcDb");
|
DbInfo destDb = (DbInfo) data.get("destDb");
|
JobInfo jobInfo = (JobInfo) data.get("jobInfo");
|
String logTitle = (String) data.get("logTitle");
|
|
try {
|
// 校验任务信息
|
if (jobInfo == null || jobInfo.getSrcSql() == null || jobInfo.getSrcSql().isEmpty()) {
|
logger.error("任务 SQL 语句为空,请检查配置");
|
return;
|
}
|
|
// 创建数据库连接
|
inConn = createConnection(srcDb);
|
outConn = createConnection(destDb);
|
if (inConn == null || outConn == null) {
|
logger.error("数据库连接失败,请检查配置");
|
return;
|
}
|
|
// 拼接并执行 SQL
|
DbHelper dbHelper = Factory.create(destDb.getDbtype());
|
long start = new Date().getTime();
|
|
logger.debug("srcDb: " + srcDb);
|
logger.debug("destDb: " + destDb);
|
logger.debug("jobInfo: " + jobInfo);
|
logger.debug("dbHelper: " + dbHelper);
|
|
if (dbHelper == null) {
|
logger.error("无法创建 DbHelper 实例,请检查数据库类型配置");
|
return;
|
}
|
|
String sql = dbHelper.assembleSQL(jobInfo.getSrcSql(), inConn, jobInfo);
|
logger.info("组装SQL耗时: " + (new Date().getTime() - start) + "ms");
|
if (sql != null) {
|
logger.debug("Generated SQL: " + sql);
|
long eStart = new Date().getTime();
|
dbHelper.executeSQL(sql, outConn);
|
outConn.commit(); // 提交事务
|
logger.info("执行SQL语句耗时: " + (new Date().getTime() - eStart) + "ms");
|
}
|
} catch (SQLException e) {
|
logger.error(logTitle + " SQL执行出错: " + e.getMessage());
|
try {
|
if (outConn != null) outConn.rollback(); // 回滚事务
|
} catch (SQLException rollbackEx) {
|
logger.error("事务回滚失败: " + rollbackEx.getMessage());
|
}
|
} finally {
|
destoryConnection(inConn); // 关闭源数据库连接
|
destoryConnection(outConn); // 关闭目标数据库连接
|
}
|
}
|
|
/**
|
* 创建数据库连接
|
*
|
* @param db 数据库配置信息
|
* @return 数据库连接对象,失败返回 null
|
*/
|
private Connection createConnection(DbInfo db) {
|
try {
|
Class.forName(db.getDriver());
|
Connection conn = DriverManager.getConnection(db.getUrl(), db.getUsername(), db.getPassword());
|
conn.setAutoCommit(false);
|
return conn;
|
} catch (Exception e) {
|
logger.error("数据库连接创建失败: " + e.getMessage());
|
}
|
return null;
|
}
|
|
/**
|
* 关闭数据库连接
|
*
|
* @param conn 数据库连接对象
|
*/
|
private void destoryConnection(Connection conn) {
|
try {
|
if (conn != null) {
|
conn.close();
|
logger.info("数据库连接已关闭");
|
}
|
} catch (SQLException e) {
|
logger.error("关闭数据库连接失败: " + e.getMessage());
|
}
|
}
|
}
|