| New file |
| | |
| | | package cn.net.communion.dbdatasync; |
| | | |
| | | import cn.net.communion.dbdatasync.dbhelper.DbHelper; |
| | | import cn.net.communion.dbdatasync.dbhelper.Factory; |
| | | import cn.net.communion.dbdatasync.entity.DbInfo; |
| | | import cn.net.communion.dbdatasync.entity.JobInfo; |
| | | |
| | | import java.sql.Connection; |
| | | import java.sql.DriverManager; |
| | | import java.sql.SQLException; |
| | | import java.util.Date; |
| | | |
| | | import org.apache.log4j.Logger; |
| | | import org.quartz.Job; |
| | | import org.quartz.JobDataMap; |
| | | import org.quartz.JobExecutionContext; |
| | | import org.quartz.JobExecutionException; |
| | | |
| | | public class DataTask implements Job { |
| | | private Logger logger = Logger.getLogger(DataTask.class); |
| | | |
| | | @Override |
| | | public void execute(JobExecutionContext context) throws JobExecutionException { |
| | | logger.info("开始任务调度: " + new Date()); |
| | | Connection inConn = null; |
| | | Connection outConn = null; |
| | | JobDataMap data = context.getJobDetail().getJobDataMap(); |
| | | DbInfo srcDb = (DbInfo) data.get("srcDb"); |
| | | DbInfo destDb = (DbInfo) data.get("destDb"); |
| | | JobInfo jobInfo = (JobInfo) data.get("jobInfo"); |
| | | String logTitle = (String) data.get("logTitle"); |
| | | |
| | | try { |
| | | // 校验任务信息 |
| | | if (jobInfo == null || jobInfo.getSrcSql() == null || jobInfo.getSrcSql().isEmpty()) { |
| | | logger.error("任务 SQL 语句为空,请检查配置"); |
| | | return; |
| | | } |
| | | |
| | | // 创建数据库连接 |
| | | inConn = createConnection(srcDb); |
| | | outConn = createConnection(destDb); |
| | | if (inConn == null || outConn == null) { |
| | | logger.error("数据库连接失败,请检查配置"); |
| | | return; |
| | | } |
| | | |
| | | // 拼接并执行 SQL |
| | | DbHelper dbHelper = Factory.create(destDb.getDbtype()); |
| | | long start = new Date().getTime(); |
| | | |
| | | logger.debug("srcDb: " + srcDb); |
| | | logger.debug("destDb: " + destDb); |
| | | logger.debug("jobInfo: " + jobInfo); |
| | | logger.debug("dbHelper: " + dbHelper); |
| | | |
| | | if (dbHelper == null) { |
| | | logger.error("无法创建 DbHelper 实例,请检查数据库类型配置"); |
| | | return; |
| | | } |
| | | |
| | | String sql = dbHelper.assembleSQL(jobInfo.getSrcSql(), inConn, jobInfo); |
| | | logger.info("组装SQL耗时: " + (new Date().getTime() - start) + "ms"); |
| | | if (sql != null) { |
| | | logger.debug("Generated SQL: " + sql); |
| | | long eStart = new Date().getTime(); |
| | | dbHelper.executeSQL(sql, outConn); |
| | | outConn.commit(); // 提交事务 |
| | | logger.info("执行SQL语句耗时: " + (new Date().getTime() - eStart) + "ms"); |
| | | } |
| | | } catch (SQLException e) { |
| | | logger.error(logTitle + " SQL执行出错: " + e.getMessage()); |
| | | try { |
| | | if (outConn != null) outConn.rollback(); // 回滚事务 |
| | | } catch (SQLException rollbackEx) { |
| | | logger.error("事务回滚失败: " + rollbackEx.getMessage()); |
| | | } |
| | | } finally { |
| | | destoryConnection(inConn); // 关闭源数据库连接 |
| | | destoryConnection(outConn); // 关闭目标数据库连接 |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 创建数据库连接 |
| | | * |
| | | * @param db 数据库配置信息 |
| | | * @return 数据库连接对象,失败返回 null |
| | | */ |
| | | private Connection createConnection(DbInfo db) { |
| | | try { |
| | | Class.forName(db.getDriver()); |
| | | Connection conn = DriverManager.getConnection(db.getUrl(), db.getUsername(), db.getPassword()); |
| | | conn.setAutoCommit(false); |
| | | return conn; |
| | | } catch (Exception e) { |
| | | logger.error("数据库连接创建失败: " + e.getMessage()); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * 关闭数据库连接 |
| | | * |
| | | * @param conn 数据库连接对象 |
| | | */ |
| | | private void destoryConnection(Connection conn) { |
| | | try { |
| | | if (conn != null) { |
| | | conn.close(); |
| | | logger.info("数据库连接已关闭"); |
| | | } |
| | | } catch (SQLException e) { |
| | | logger.error("关闭数据库连接失败: " + e.getMessage()); |
| | | } |
| | | } |
| | | } |