diff --git a/.gitignore b/.gitignore index 270e972..e71fc3a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /target/ configuration.properties tmpd/ +*_log* diff --git a/src/log4j2.xml b/src/log4j2.xml index 6e6f5f9..36b41b5 100644 --- a/src/log4j2.xml +++ b/src/log4j2.xml @@ -5,10 +5,18 @@ + + + + + + + \ No newline at end of file diff --git a/src/org/leolo/rail/NetowrkRailProcessingThread.java b/src/org/leolo/rail/NetowrkRailProcessingThread.java index d5fa2a6..2a63682 100644 --- a/src/org/leolo/rail/NetowrkRailProcessingThread.java +++ b/src/org/leolo/rail/NetowrkRailProcessingThread.java @@ -11,6 +11,8 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread { private static Logger log = LogManager.getLogger(NetowrkRailProcessingThread.class); private TrainMovementProcessor procTrainMvt = new TrainMovementProcessor(); + private VTSPProcessor procVTSP = new VTSPProcessor(); + private boolean stopThread = false; public NetowrkRailProcessingThread() throws Exception { super("network"); @@ -20,11 +22,13 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread { protected void _init() { try { connection.subscribe(Constants.NetworkRail.TOPIC_NAME_MVT); + connection.subscribe(Constants.NetworkRail.TOPIC_NAME_VTSP); } catch (Exception e) { log.error(e.getMessage(), e); } procTrainMvt.start(); + procVTSP.start(); } private volatile long sleepTime = Constants.Generic.DEFAULT_SLEEP_TIME; @@ -36,12 +40,18 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread { log.error(e1.getMessage(), e1); } sleepTime*=Constants.Generic.INCRESE_RATIO; + if(sleepTime>128000) { + log.atError().log("Failed too many times. Restart thread."); + } } @Override public void run() { while(true) { try { + if(stopThread) { + break; + } StompFrame frm = connection.receive(); if("ERROR".equals(frm.getAction())) { log.error("Error message received. {}", frm.getBody()); @@ -52,6 +62,8 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread { String topic = frm.getHeaders().get("destination"); if(Constants.NetworkRail.TOPIC_NAME_MVT.equals(topic)) { procTrainMvt.process(frm); + }else if(Constants.NetworkRail.TOPIC_NAME_VTSP.equals(topic)) { + procVTSP.process(frm); } sleepTime = Constants.Generic.DEFAULT_SLEEP_TIME; } catch (Exception e) { diff --git a/src/org/leolo/rail/TrainMovementProcessor.java b/src/org/leolo/rail/TrainMovementProcessor.java index c73bcdd..1a4b3a0 100644 --- a/src/org/leolo/rail/TrainMovementProcessor.java +++ b/src/org/leolo/rail/TrainMovementProcessor.java @@ -27,8 +27,9 @@ import org.json.JSONObject; import org.leolo.rail.util.TUIDDateFormat; public class TrainMovementProcessor extends Thread{ - + private Logger log = LogManager.getLogger(TrainMovementProcessor.class); + private Logger logRI = LogManager.getLogger("org.leolo.RI"); private final Object SYNC_TOKEN = new Object(); @@ -51,6 +52,7 @@ public class TrainMovementProcessor extends Thread{ @Override public void run() { + logRI.debug("Started."); while(true) { if(terminated) { return; @@ -283,6 +285,78 @@ public class TrainMovementProcessor extends Thread{ } }); }//TM + //[RI]0005: Train re-instatement + if(procMap.containsKey("0005")) { + ThreadPoolManager.getInstance().execute(()->{ + int batchSize = 0; + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmtTC = conn.prepareStatement("REPLACE current_train_reinstatement VALUES (?,?,?,?,?)"); + PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); + ){ + pstmtUTA.setString(1, CurrentTrainStatus.ACTIVATED.getCode()); + for(JSONObject obj:procMap.get("0005")) { + String train_id = obj.getJSONObject("body").optString("train_id"); + String canx_loc = obj.getJSONObject("body").optString("loc_stanox"); + String canx_dev = obj.getJSONObject("header").optString("source_dev_id"); + String canx_usr = obj.getJSONObject("header").optString("user_id"); + long canx_time = obj.getJSONObject("body").optLong("reinstatement_timestamp"); + logRI.atInfo().log("TS: {}", canx_time); + logRI.atInfo().log(obj.getJSONObject("body").toString()); +// log.debug("[TC] {}@{} because {} by {}@{}", train_id, canx_loc, canx_reason, canx_usr, canx_dev); + pstmtTC.setString(1, train_id); + pstmtTC.setString(2, canx_loc); + pstmtTC.setTimestamp(3, new Timestamp(canx_time)); + setString(pstmtTC,4, canx_dev); + setString(pstmtTC,5, canx_usr); + pstmtTC.addBatch(); + pstmtUTA.setString(2, train_id); + int rowCount = pstmtUTA.executeUpdate(); + Runnable r = new Runnable() { + + int runCount = 0; + + @Override + public void run() { + runCount++; + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); + ){ + pstmtUTA.setString(1, CurrentTrainStatus.ACTIVATED.getCode()); + pstmtUTA.setString(2, train_id); + int rowCount = pstmtUTA.executeUpdate(); + if(rowCount==0) { + if(runCount > 5) { + log.warn("[RI] Cannot update {} [LAST]", train_id); + return; + } + log.warn("[RI] Cannot update {} (ROUND {})", train_id, runCount); + ThreadPoolManager.getInstance().schedule(this, System.currentTimeMillis()+(long)(1000*Math.pow(2, runCount))); + }else { + log.info("[RI] Successfully update {} (ROUND {})", train_id, runCount); + } + conn.commit(); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + } + + }; + if(rowCount==0) { + log.warn("[RI] Cannot update {}", train_id); + ThreadPoolManager.getInstance().schedule(r, System.currentTimeMillis()+1000); + } + batchSize++; + } + pstmtTC.executeBatch(); + conn.commit(); + log.info("[RI] Record Count : {}", batchSize); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + }); + } } } diff --git a/src/org/leolo/rail/VTSPProcessor.java b/src/org/leolo/rail/VTSPProcessor.java new file mode 100644 index 0000000..412ab9b --- /dev/null +++ b/src/org/leolo/rail/VTSPProcessor.java @@ -0,0 +1,275 @@ +package org.leolo.rail; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.leolo.rail.model.LocationRecordType; +import org.leolo.rail.model.ScheduleType; +import org.leolo.rail.model.TrainSchedule; +import org.leolo.rail.model.TrainScheduleLocation; + +public class VTSPProcessor extends Thread { + private Logger log = LogManager.getLogger(VTSPProcessor.class); + private Logger logRI = LogManager.getLogger("org.leolo.RI"); + + private final Object SYNC_TOKEN = new Object(); + + private Queue procQueue = new LinkedList<>(); + private boolean terminated = false; + + public void terminate() { + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void process(StompFrame data) { + procQueue.add(data); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + @Override + public void run() { + logRI.debug("Started."); + while(true) { + if(terminated) { + return; + } + StompFrame data = procQueue.poll(); + if(data==null) { + synchronized(SYNC_TOKEN) { + try { +// log.debug("No more data. Sleep."); + SYNC_TOKEN.wait(1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + continue; + } + //Actually handle the data + String msgId = data.getHeaders().get("message-id"); + log.info("Processing message {}", msgId); + if(true||Constants.Generic.DEBUG_MODE) { + new File("tmpd").mkdirs(); + try(PrintWriter out = new PrintWriter("tmpd/VTSP-msg_"+msgId.replace(":", "-")+".json")){ + out.println(data.getBody()); + } catch (FileNotFoundException e) { + log.error(e.getMessage(), e); + } + } + JSONObject obj = new JSONObject(data.getBody()).getJSONObject("VSTPCIFMsgV1").optJSONObject("schedule"); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + String txType = obj.optString("transaction_type"); + if("Delete".equalsIgnoreCase(txType)) { + //Remove the schedule + TrainSchedule ts = new TrainSchedule(); + ts.setTrainUId(obj.optString("CIF_train_uid")); + ts.setScheduleType(ScheduleType.STP); + try { + ts.setStartDate(sdf.parse(obj.optString("schedule_start_date"))); + ts.setEndDate(sdf.parse(obj.optString("schedule_end_date"))); + }catch(ParseException e) { + log.atError().log(e.getMessage(), e); + } + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement psLoc = conn.prepareStatement("DELETE FROM stp_location WHERE suid = ?"); + PreparedStatement psSch = conn.prepareStatement("DELETE FROM stp_schedule WHERE suid = ?"); + ){ + psLoc.setString(1, ts.getSUID()); + psSch.setString(1, ts.getSUID()); + psLoc.executeUpdate(); + psSch.executeUpdate(); + conn.commit(); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + return; + } + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement psSch = conn.prepareStatement("INSERT INTO stp_schedule VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); + PreparedStatement psLoc = conn.prepareStatement("INSERT INTO stp_location VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + ){ + + TrainSchedule ts = new TrainSchedule(); + ts.setTrainUId(obj.optString("CIF_train_uid")); + ts.setScheduleType(ScheduleType.STP); + try { + ts.setStartDate(sdf.parse(obj.optString("schedule_start_date"))); + ts.setEndDate(sdf.parse(obj.optString("schedule_end_date"))); + }catch(ParseException e) { + log.atError().log(e.getMessage(), e); + } + ts.setDays(obj.optString("schedule_days_runs")); + ts.setBankHolidayInd(obj.optString("CIF_bank_holiday_running")); + ts.setSignalId(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("signalling_id")); + ts.setRsid(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_headcode")); + ts.setTrainStatus(obj.optString("train_status")); + ts.setTrainCategory(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_train_category")); + ts.setSection(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_business_sector")); + ts.setPowerType(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_power_type")); + ts.setTimingLoad(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_timing_load")); + ts.setPlannedSpeed(Integer.toString((int) (obj.getJSONArray("schedule_segment").getJSONObject(0).optInt("CIF_speed")/2.24))); + ts.setOperatingCharacter(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_operating_characteristics")); + ts.setClassAvailable(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_train_class")); + ts.setSleeper(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_sleepers")); + ts.setReservation(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_reservations")); + ts.setCatering(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_catering_code")); + ts.setAtocCode(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("atoc_code")); + JSONArray locs = obj.getJSONArray("schedule_segment").getJSONObject(0).getJSONArray("schedule_location"); + for(int i=0;i1) { + setString(psSch,23, ts.getLocations().get(0).getTiplocCode()); + setTime(psSch,24, ts.getLocations().get(0).getDeparture()); + setString(psSch,25, ts.getLocations().get(ts.getLocations().size()-1).getTiplocCode()); + setTime(psSch,26, ts.getLocations().get(ts.getLocations().size()-1).getArrival()); + }else { + psSch.setNull(23, Types.CHAR); + psSch.setNull(24, Types.TIME); + psSch.setNull(25, Types.CHAR); + psSch.setNull(26, Types.TIME); + } + psSch.setInt(27, ts.hashCode()); + int seq = 1; + for(TrainScheduleLocation tsl:ts.getLocations()) { + setString(psLoc, 1, ts.getSUID()); + psLoc.setInt(2, seq++); + setString(psLoc, 3, tsl.getTiplocCode()); + psLoc.setInt(4, tsl.getTiplocInstance()); + setTime(psLoc, 5, tsl.getArrival()); + setTime(psLoc, 6, tsl.getDeparture()); + setTime(psLoc, 7, tsl.getPass()); + setTime(psLoc, 8, tsl.getPubArrival()); + setTime(psLoc, 9, tsl.getPubDeparture()); + setString(psLoc,10, tsl.getPlatform()); + setString(psLoc,11, tsl.getLine()); + setString(psLoc,12, tsl.getPath()); + setTime(psLoc,13, tsl.getEngineeringAllowance()); + setTime(psLoc,14, tsl.getPathingAllowance()); + setTime(psLoc,15, tsl.getPerformanceAllowance()); + setString(psLoc,16, tsl.getRecordType().getRecordCode()); + psLoc.addBatch(); + } + psSch.executeUpdate(); + psLoc.executeBatch(); + conn.commit(); + }catch(SQLException e) { + log.atError().log(e.getMessage(),e); + } + + } + } + + protected void setString(PreparedStatement stmt, int col, String val) throws SQLException{ + if(val==null||"".equals(val)) { + stmt.setNull(col, Types.CHAR); + }else { + stmt.setString(col, val); + } + } + + private long getScheduleTime(String t) { + //HHMMSS + t = t.trim(); + if("".equals(t)) { + return 0; + } + int hour = Integer.parseInt(t.substring(0, 2)); + int min = Integer.parseInt(t.substring(2, 4)); + int sec = Integer.parseInt(t.substring(4, 6)); + return hour*3_600_000+min*60_000+sec; + } + + private long getAllowanceTime(String t) { + if("".equals(t.trim())) { + return 0; + } + if("H".equals(t.trim())) + return 30_000; + if(t.endsWith("H")) { + //nH + return Integer.parseInt(t.substring(0,1))*60_000+30_000; + }else { + //nn + return Integer.parseInt(t)*60_000; + } + } + + public void setTime(PreparedStatement stmt, int col, long time) throws SQLException{ + if(time==0) { + stmt.setNull(col, Types.TIME); + }else { + int h = (int)(time/3_600_000); + int m = (int)((time/60_000)%60); + int s = (int)((time/1_000)%60); + stmt.setString(col, h+":"+m+":"+s); + } + } +}