package org.leolo.rail.nrd; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.LinkedList; import java.util.Queue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; public class TiplocHandler implements Runnable{ private static Logger log = LogManager.getLogger(TiplocHandler.class); private final Object SYNC_TOKEN = new Object(); private boolean shutdown = false; private boolean terminated = false; private Queue queue = new LinkedList<>(); public TiplocHandler() { new Thread(this).start(); } public void add(JSONObject obj) { queue.add(obj); synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } } public void shutdown() { shutdown = true; synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } } public void shutdownAndWait() { shutdown(); while(true) { if(terminated) { break; } synchronized(SYNC_TOKEN) { log.debug("Waiting for termination."); try { SYNC_TOKEN.notifyAll(); SYNC_TOKEN.wait(1000); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } } } @Override public void run() { try( Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)") ){ int currentSize = 0; int commitCount = 0; // SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); while(true) { JSONObject obj = queue.poll(); if(shutdown && (queue.isEmpty()||obj==null)) { pstmt.executeBatch(); conn.commit(); log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); break; }else if(obj==null) { continue; } synchronized(FileLoader.SYNC_TOKEN) { FileLoader.SYNC_TOKEN.notifyAll(); } //Parse data pstmt.setString(1, obj.optString("tiploc_code")); if(obj.has("nalco")) pstmt.setString(2, obj.optString("nalco")); else pstmt.setNull(2, Types.VARCHAR); if(obj.has("stanox")) pstmt.setString(3, obj.optString("stanox")); else pstmt.setNull(3, Types.VARCHAR); if(obj.has("crs_code")) pstmt.setString(4, obj.optString("crs_code")); else pstmt.setNull(4, Types.VARCHAR); if(obj.has("description")) pstmt.setString(5, obj.optString("description")); else pstmt.setNull(5, Types.VARCHAR); if(obj.has("tps_description")) pstmt.setString(6, obj.optString("tps_description")); else pstmt.setNull(6, Types.VARCHAR); pstmt.addBatch(); if(++currentSize>=Constants.BATCH_SIZE) { pstmt.executeBatch(); conn.commit(); log.info("TIPLOC committed {} entry. Commit #{}", currentSize, ++commitCount); currentSize = 0; } if(queue.isEmpty()) { log.debug("Empty queue. Wait for more entries"); synchronized(SYNC_TOKEN) { try { SYNC_TOKEN.wait(); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } } } }catch(SQLException e) { log.error(e.getMessage(),e); } terminated = true; synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } } public int getQueueSize() { return queue.size(); } }