package cn.net.communion.dbdatasync; import cn.net.communion.dbdatasync.dbhelper.DbHelper; import cn.net.communion.dbdatasync.dbhelper.Factory; import cn.net.communion.dbdatasync.entity.DbInfo; import cn.net.communion.dbdatasync.entity.JobInfo; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Date; import org.apache.log4j.Logger; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class DataTask implements Job { private Logger logger = Logger.getLogger(DataTask.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { logger.info("开始任务调度: " + new Date()); Connection inConn = null; Connection outConn = null; JobDataMap data = context.getJobDetail().getJobDataMap(); DbInfo srcDb = (DbInfo) data.get("srcDb"); DbInfo destDb = (DbInfo) data.get("destDb"); JobInfo jobInfo = (JobInfo) data.get("jobInfo"); String logTitle = (String) data.get("logTitle"); try { // 校验任务信息 if (jobInfo == null || jobInfo.getSrcSql() == null || jobInfo.getSrcSql().isEmpty()) { logger.error("任务 SQL 语句为空,请检查配置"); return; } // 创建数据库连接 inConn = createConnection(srcDb); outConn = createConnection(destDb); if (inConn == null || outConn == null) { logger.error("数据库连接失败,请检查配置"); return; } // 拼接并执行 SQL DbHelper dbHelper = Factory.create(destDb.getDbtype()); long start = new Date().getTime(); logger.debug("srcDb: " + srcDb); logger.debug("destDb: " + destDb); logger.debug("jobInfo: " + jobInfo); logger.debug("dbHelper: " + dbHelper); if (dbHelper == null) { logger.error("无法创建 DbHelper 实例,请检查数据库类型配置"); return; } String sql = dbHelper.assembleSQL(jobInfo.getSrcSql(), inConn, jobInfo); logger.info("组装SQL耗时: " + (new Date().getTime() - start) + "ms"); if (sql != null) { logger.debug("Generated SQL: " + sql); long eStart = new Date().getTime(); dbHelper.executeSQL(sql, outConn); outConn.commit(); // 提交事务 logger.info("执行SQL语句耗时: " + (new Date().getTime() - eStart) + "ms"); } } catch (SQLException e) { logger.error(logTitle + " SQL执行出错: " + e.getMessage()); try { if (outConn != null) outConn.rollback(); // 回滚事务 } catch (SQLException rollbackEx) { logger.error("事务回滚失败: " + rollbackEx.getMessage()); } } finally { destoryConnection(inConn); // 关闭源数据库连接 destoryConnection(outConn); // 关闭目标数据库连接 } } /** * 创建数据库连接 * * @param db 数据库配置信息 * @return 数据库连接对象,失败返回 null */ private Connection createConnection(DbInfo db) { try { Class.forName(db.getDriver()); Connection conn = DriverManager.getConnection(db.getUrl(), db.getUsername(), db.getPassword()); conn.setAutoCommit(false); return conn; } catch (Exception e) { logger.error("数据库连接创建失败: " + e.getMessage()); } return null; } /** * 关闭数据库连接 * * @param conn 数据库连接对象 */ private void destoryConnection(Connection conn) { try { if (conn != null) { conn.close(); logger.info("数据库连接已关闭"); } } catch (SQLException e) { logger.error("关闭数据库连接失败: " + e.getMessage()); } } }