package org.leolo.rail; import java.io.File; import java.io.FileNotFoundException; import java.io.PrintWriter; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.Queue; import java.util.Vector; import org.apache.activemq.transport.stomp.StompFrame; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.leolo.rail.util.TUIDDateFormat; public class TrainMovementProcessor extends Thread{ private Logger log = LogManager.getLogger(TrainMovementProcessor.class); private final Object SYNC_TOKEN = new Object(); private Queue procQueue = new LinkedList<>(); private boolean terminated = false; public void terminate() { terminated = true; synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } } public void process(StompFrame data) { procQueue.add(data); synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } } @Override public void run() { while(true) { if(terminated) { return; } StompFrame data = procQueue.poll(); if(data==null) { synchronized(SYNC_TOKEN) { try { // log.debug("No more data. Sleep."); SYNC_TOKEN.wait(1000); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } continue; } //Actually handle the data String msgId = data.getHeaders().get("message-id"); log.info("Processing message {}", msgId); if(Constants.Generic.DEBUG_MODE) { new File("tmpd").mkdirs(); try(PrintWriter out = new PrintWriter("tmpd/msg_"+msgId.replace(":", "-")+".json")){ out.println(data.getBody()); } catch (FileNotFoundException e) { log.error(e.getMessage(), e); } } HashMap> procMap = new HashMap<>(); JSONArray procData = new JSONArray(data.getBody()); log.info("{} entries expected", procData.length()); for(int i=0;i()); } procMap.get(type).add(obj); } //[TA]0001: Train Activation if(procMap.containsKey("0001")) { ThreadPoolManager.getInstance().execute(()->{ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); TUIDDateFormat tdf = new TUIDDateFormat(); int batchSize = 0; try( Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmtTA = conn.prepareStatement("REPLACE INTO current_train VALUES (?,?,?,?,?,?)"); ){ for(JSONObject obj:procMap.get("0001")) { String trainId = obj.getJSONObject("body").optString("train_id"); String trainUid = obj.getJSONObject("body").optString("train_uid"); Date startDate; Date endDate; Date opDate; try { startDate = sdf.parse(obj.getJSONObject("body").optString("schedule_start_date")); endDate = sdf.parse(obj.getJSONObject("body").optString("schedule_end_date")); opDate = sdf.parse(obj.getJSONObject("body").optString("tp_origin_timestamp")); }catch(ParseException e) { log.error(e.getMessage(), e); continue; } String serviceCode = obj.getJSONObject("body").optString("train_service_code"); long activationTime = obj.getJSONObject("body").optLong("creation_timestamp"); String schSrc = obj.getJSONObject("body").optString("schedule_source"); String tuid; if("C".equals(schSrc)) { tuid = trainUid + tdf.format(startDate) + tdf.format(endDate) + "0"; }else { tuid = trainUid + tdf.format(startDate) + tdf.format(endDate) + "V"; } // log.debug("[TA] {}({}) TSC:{}", trainId, tuid, serviceCode); pstmtTA.setString(1, trainId); pstmtTA.setString(2, tuid); pstmtTA.setDate(3, new java.sql.Date(opDate.getTime())); pstmtTA.setString(4, serviceCode); pstmtTA.setTimestamp(5, new Timestamp(activationTime)); pstmtTA.setString(6, CurrentTrainStatus.ACTIVATED.getCode()); pstmtTA.addBatch(); batchSize++; } pstmtTA.executeBatch(); conn.commit(); log.info("[TA] Record Count : {}", batchSize); }catch(SQLException e){ log.error(e.getMessage(), e); } }); }//TA //[TC]0002: Train Cancellation if(procMap.containsKey("0002")) { ThreadPoolManager.getInstance().execute(()->{ int batchSize = 0; try( Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmtTC = conn.prepareStatement("REPLACE train_cancellation VALUES (?,?,?,?,?,?,?)"); PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); ){ pstmtUTA.setString(1, CurrentTrainStatus.CANCELLED.getCode()); for(JSONObject obj:procMap.get("0002")) { String train_id = obj.getJSONObject("body").optString("train_id"); String canx_loc = obj.getJSONObject("body").optString("loc_stanox"); String canx_reason = obj.getJSONObject("body").optString("canx_reason_code"); String canx_type = obj.getJSONObject("body").optString("canx_type"); String canx_dev = obj.getJSONObject("header").optString("source_dev_id"); String canx_usr = obj.getJSONObject("header").optString("user_id"); long canx_time = obj.getJSONObject("body").optLong("canx_timestamp"); // log.debug("[TC] {}@{} because {} by {}@{}", train_id, canx_loc, canx_reason, canx_usr, canx_dev); pstmtTC.setString(1, train_id); pstmtTC.setString(2, canx_loc); pstmtTC.setTimestamp(3, new Timestamp(canx_time)); pstmtTC.setString(4, canx_reason); pstmtTC.setString(5, canx_type); setString(pstmtTC,6, canx_dev); setString(pstmtTC,7, canx_usr); pstmtTC.addBatch(); pstmtUTA.setString(2, train_id); int rowCount = pstmtUTA.executeUpdate(); Runnable r = new Runnable() { int runCount = 0; @Override public void run() { runCount++; try( Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); ){ pstmtUTA.setString(1, CurrentTrainStatus.CANCELLED.getCode()); pstmtUTA.setString(2, train_id); int rowCount = pstmtUTA.executeUpdate(); if(rowCount==0) { if(runCount > 5) { log.warn("[TC] Cannot update {} [LAST]", train_id); return; } log.warn("[TC] Cannot update {} (ROUND {})", train_id, runCount); ThreadPoolManager.getInstance().schedule(this, System.currentTimeMillis()+(long)(1000*Math.pow(2, runCount))); }else { log.info("[TC] Successfully update {} (ROUND {})", train_id, runCount); } conn.commit(); }catch(SQLException e) { log.error(e.getMessage(), e); } } }; if(rowCount==0) { log.warn("[TC] Cannot update {}", train_id); ThreadPoolManager.getInstance().schedule(r, System.currentTimeMillis()+1000); } batchSize++; } pstmtTC.executeBatch(); conn.commit(); log.info("[TC] Record Count : {}", batchSize); }catch(SQLException e) { log.error(e.getMessage(), e); } }); }//TC //[TM]0003: Train Movement if(procMap.containsKey("0003")) { ThreadPoolManager.getInstance().execute(()->{ try( Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmtTM = conn.prepareStatement("REPLACE INTO current_train_movement VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"); PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); ){ int batchSize = 0; for(JSONObject obj:procMap.get("0003")) { String trainId = obj.getJSONObject("body" ).optString ("train_id"); long movtTime = obj.getJSONObject("body" ).optLong ("actual_timestamp"); long gbttTime = obj.getJSONObject("body" ).optLong ("gbtt_timestamp", -1); long planTime = obj.getJSONObject("body" ).optLong ("planned_timestamp", -1); String stanox = obj.getJSONObject("body" ).optString ("loc_stanox"); String eventType = obj.getJSONObject("body" ).optString ("event_type"); boolean correction= obj.getJSONObject("body" ).optBoolean("correction_ind"); String platform = obj.getJSONObject("body" ).optString ("platform"); String line = obj.getJSONObject("body" ).optString ("line_ind"); String route = obj.getJSONObject("body" ).optString ("route"); String dev = obj.getJSONObject("header").optString ("source_dev_id"); String usr = obj.getJSONObject("header").optString ("user_id"); boolean termInd = obj.getJSONObject("body" ).optBoolean("train_terminated", false); // log.debug("[TM] {}@{} PLAT {}/{}{} by {}@{}{}", trainId, stanox, platform, line, route, usr, dev,termInd?"[T]":""); pstmtTM.setString(1, trainId); pstmtTM.setTimestamp(2, new Timestamp(movtTime)); if(gbttTime==-1) { pstmtTM.setNull(3, Types.TIMESTAMP); }else { pstmtTM.setTimestamp(3, new Timestamp(gbttTime)); } if(planTime==-1) { pstmtTM.setNull(4, Types.TIMESTAMP); }else { pstmtTM.setTimestamp(4, new Timestamp(planTime)); } pstmtTM.setString(5, stanox); pstmtTM.setString(6, eventType); pstmtTM.setString(7, correction?"Y":"N"); setString(pstmtTM,8, platform); setString(pstmtTM,9, line); setString(pstmtTM,10, route); setString(pstmtTM,11, dev); setString(pstmtTM,12, usr); pstmtTM.addBatch(); if(termInd) { pstmtUTA.setString(1, CurrentTrainStatus.TERMINATED.getCode()); pstmtUTA.setString(2, trainId); pstmtUTA.addBatch(); } batchSize++; } pstmtTM.executeBatch(); pstmtUTA.executeBatch(); conn.commit(); log.info("[TM] Record Count : {}", batchSize); }catch(SQLException e) { log.error(e.getMessage(), e); } }); } } } protected void setString(PreparedStatement stmt, int col, String val) throws SQLException{ if(val==null||"".equals(val)) { stmt.setNull(col, Types.CHAR); }else { stmt.setString(col, val); } } }