diff --git a/.gitignore b/.gitignore index f16ab4c..270e972 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /bin/ /target/ configuration.properties +tmpd/ diff --git a/src/org/leolo/rail/BaseProcessingThread.java b/src/org/leolo/rail/BaseProcessingThread.java index 759152b..4911815 100644 --- a/src/org/leolo/rail/BaseProcessingThread.java +++ b/src/org/leolo/rail/BaseProcessingThread.java @@ -15,7 +15,7 @@ public abstract class BaseProcessingThread extends Thread implements AutoCloseab this.connection = getConnection(); } - public abstract void _init(); + protected abstract void _init(); @Override public abstract void run(); @@ -39,7 +39,7 @@ public abstract class BaseProcessingThread extends Thread implements AutoCloseab } @Override - public void start() { + public synchronized void start() { if(isInit) { super.start(); }else { diff --git a/src/org/leolo/rail/Constants.java b/src/org/leolo/rail/Constants.java index 830191b..4612ace 100644 --- a/src/org/leolo/rail/Constants.java +++ b/src/org/leolo/rail/Constants.java @@ -4,11 +4,18 @@ public class Constants { public static class NetworkRail{ - public static final String TOPIC_BASE_PATH = "/topic/"; - public static final String TOPIC_NAME_VTSP = "VSTP_ALL"; + public static final String TOPIC_NAME_VTSP = "/topic/VSTP_ALL"; + public static final String TOPIC_NAME_MVT = "/topic/TRAIN_MVT_ALL_TOC"; + + } + + public static class Generic{ + public static final long DEFAULT_SLEEP_TIME = 1000; + public static final long INCRESE_RATIO = 2; + public static final boolean DEBUG_MODE = true; } } diff --git a/src/org/leolo/rail/CurrentTrainStatus.java b/src/org/leolo/rail/CurrentTrainStatus.java new file mode 100644 index 0000000..14452b9 --- /dev/null +++ b/src/org/leolo/rail/CurrentTrainStatus.java @@ -0,0 +1,28 @@ +package org.leolo.rail; + +public enum CurrentTrainStatus { + + ACTIVATED("A"), + CANCELLED("C"), + TERMINATED("T"); + + private String code; + + private CurrentTrainStatus(String code) { + this.code = code; + } + + public String getCode() { + return code; + } + + public static CurrentTrainStatus getCurrentTrainStatus(String code) { + for(CurrentTrainStatus cts:values()) { + if(cts.code.equals(code)) { + return cts; + } + } + return null; + } + +} diff --git a/src/org/leolo/rail/NRDataDamon.java b/src/org/leolo/rail/NRDataDamon.java index 051c14a..e252c17 100644 --- a/src/org/leolo/rail/NRDataDamon.java +++ b/src/org/leolo/rail/NRDataDamon.java @@ -17,6 +17,8 @@ public class NRDataDamon { private Logger log = LogManager.getLogger(getClass()); + NetowrkRailProcessingThread nrpt; + public static void main(String [] args) { Logger log = LogManager.getLogger(NRDataDamon.class); ConfigurationManager.getInstance().forEach((k,v)->{ @@ -37,8 +39,8 @@ public class NRDataDamon { e.printStackTrace(); } NRDataDamon ndd = new NRDataDamon(); - ndd.init(); try { + ndd.init(); ndd.run(); } catch (Exception e) { log.error(e.getMessage(), e); @@ -46,11 +48,25 @@ public class NRDataDamon { } } - public void init() { - + public void init() throws Exception { + nrpt = new NetowrkRailProcessingThread(); + nrpt.init(); + nrpt.start(); } public void run() throws Exception{ - + while(true) { + if(!nrpt.isAlive()) { + log.warn("Network Rail processing thread died. Restarting..."); + nrpt = new NetowrkRailProcessingThread(); + nrpt.init(); + nrpt.start(); + } + try { + Thread.sleep(2500); + }catch(InterruptedException e) { + log.error(e.getMessage(), e); + } + } } } diff --git a/src/org/leolo/rail/NetowrkRailProcessingThread.java b/src/org/leolo/rail/NetowrkRailProcessingThread.java new file mode 100644 index 0000000..d5fa2a6 --- /dev/null +++ b/src/org/leolo/rail/NetowrkRailProcessingThread.java @@ -0,0 +1,70 @@ +package org.leolo.rail; + +import java.io.IOException; + +import org.apache.activemq.command.Endpoint; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class NetowrkRailProcessingThread extends BaseProcessingThread { + + private static Logger log = LogManager.getLogger(NetowrkRailProcessingThread.class); + private TrainMovementProcessor procTrainMvt = new TrainMovementProcessor(); + + public NetowrkRailProcessingThread() throws Exception { + super("network"); + } + + @Override + protected void _init() { + try { + connection.subscribe(Constants.NetworkRail.TOPIC_NAME_MVT); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + + procTrainMvt.start(); + } + + private volatile long sleepTime = Constants.Generic.DEFAULT_SLEEP_TIME; + + private void sleep() { + try { + sleep(sleepTime); + } catch (InterruptedException e1) { + log.error(e1.getMessage(), e1); + } + sleepTime*=Constants.Generic.INCRESE_RATIO; + } + + @Override + public void run() { + while(true) { + try { + StompFrame frm = connection.receive(); + if("ERROR".equals(frm.getAction())) { + log.error("Error message received. {}", frm.getBody()); + sleep(); + continue; + } + log.info("HDRs: {}", frm.getHeaders()); + String topic = frm.getHeaders().get("destination"); + if(Constants.NetworkRail.TOPIC_NAME_MVT.equals(topic)) { + procTrainMvt.process(frm); + } + sleepTime = Constants.Generic.DEFAULT_SLEEP_TIME; + } catch (Exception e) { + log.error(e.getMessage(), e); + sleep(); + } + } + } + + @Override + public void close() throws IOException { + procTrainMvt.terminate(); + super.close(); + } + +} diff --git a/src/org/leolo/rail/ThreadPoolManager.java b/src/org/leolo/rail/ThreadPoolManager.java new file mode 100644 index 0000000..1ff7b47 --- /dev/null +++ b/src/org/leolo/rail/ThreadPoolManager.java @@ -0,0 +1,113 @@ +package org.leolo.rail; + +import java.util.Collection; +import java.util.List; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ThreadPoolManager { + private static Logger log = LogManager.getLogger(ThreadPoolManager.class); + + private static ThreadPoolManager instance; + + private final Object SYNC_TOKEN = new Object(); + + public static synchronized ThreadPoolManager getInstance() { + if(instance==null) { + instance = new ThreadPoolManager(); + } + return instance; + } + + private ExecutorService pool = Executors.newFixedThreadPool( + ConfigurationManager.getInstance().getInt("pool.size", 30)); + private TreeMap pendingJob = new TreeMap<>(); + private ThreadPoolManager() { + new Thread(()-> { + while(true) { + if(pendingJob.isEmpty()) { + try { + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.wait(100); + } + }catch(InterruptedException e) { + log.error(e.getMessage(), e); + } + continue; + } + Long key = pendingJob.firstKey(); + if(key==null || System.currentTimeMillis() < key) { + try { + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.wait(100); + } + }catch(InterruptedException e) { + log.error(e.getMessage(), e); + } + continue; + } + log.info("Running job#{}", key); + Runnable r = pendingJob.get(key); + pendingJob.remove(key); + pool.execute(r); + } + }).start(); + } + + public void execute(Runnable arg0) { + pool.execute(arg0); + } + + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return pool.invokeAll(tasks, timeout, unit); + } + + public List> invokeAll(Collection> tasks) throws InterruptedException { + return pool.invokeAll(tasks); + } + + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return pool.invokeAny(tasks, timeout, unit); + } + + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return pool.invokeAny(tasks); + } + + public boolean isShutdown() { + return pool.isShutdown(); + } + + public boolean isTerminated() { + return pool.isTerminated(); + } + + public Future submit(Runnable task) { + return pool.submit(task); + } + + public synchronized void schedule(Runnable task, long notBefore) { + while(true) { + if(!pendingJob.containsKey(notBefore)) { + break; + } + notBefore++; + } + pendingJob.put(notBefore, task); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + +} diff --git a/src/org/leolo/rail/TrainMovementProcessor.java b/src/org/leolo/rail/TrainMovementProcessor.java new file mode 100644 index 0000000..f57316b --- /dev/null +++ b/src/org/leolo/rail/TrainMovementProcessor.java @@ -0,0 +1,289 @@ +package org.leolo.rail; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Vector; + +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.leolo.rail.util.TUIDDateFormat; + +public class TrainMovementProcessor extends Thread{ + + private Logger log = LogManager.getLogger(TrainMovementProcessor.class); + + private final Object SYNC_TOKEN = new Object(); + + private Queue procQueue = new LinkedList<>(); + private boolean terminated = false; + + public void terminate() { + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void process(StompFrame data) { + procQueue.add(data); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + @Override + public void run() { + while(true) { + if(terminated) { + return; + } + StompFrame data = procQueue.poll(); + if(data==null) { + synchronized(SYNC_TOKEN) { + try { +// log.debug("No more data. Sleep."); + SYNC_TOKEN.wait(1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + continue; + } + //Actually handle the data + String msgId = data.getHeaders().get("message-id"); + log.info("Processing message {}", msgId); + if(Constants.Generic.DEBUG_MODE) { + new File("tmpd").mkdirs(); + try(PrintWriter out = new PrintWriter("tmpd/msg_"+msgId.replace(":", "-")+".json")){ + out.println(data.getBody()); + } catch (FileNotFoundException e) { + log.error(e.getMessage(), e); + } + } + HashMap> procMap = new HashMap<>(); + JSONArray procData = new JSONArray(data.getBody()); + log.info("{} entries expected", procData.length()); + for(int i=0;i()); + } + procMap.get(type).add(obj); + } + //[TA]0001: Train Activation + if(procMap.containsKey("0001")) { + ThreadPoolManager.getInstance().execute(()->{ + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + TUIDDateFormat tdf = new TUIDDateFormat(); + int batchSize = 0; + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmtTA = conn.prepareStatement("REPLACE INTO current_train VALUES (?,?,?,?,?,?)"); + ){ + for(JSONObject obj:procMap.get("0001")) { + String trainId = obj.getJSONObject("body").optString("train_id"); + String trainUid = obj.getJSONObject("body").optString("train_uid"); + Date startDate; + Date endDate; + Date opDate; + try { + startDate = sdf.parse(obj.getJSONObject("body").optString("schedule_start_date")); + endDate = sdf.parse(obj.getJSONObject("body").optString("schedule_end_date")); + opDate = sdf.parse(obj.getJSONObject("body").optString("tp_origin_timestamp")); + }catch(ParseException e) { + log.error(e.getMessage(), e); + continue; + } + String serviceCode = obj.getJSONObject("body").optString("train_service_code"); + long activationTime = obj.getJSONObject("body").optLong("creation_timestamp"); + String schSrc = obj.getJSONObject("body").optString("schedule_source"); + String tuid; + if("C".equals(schSrc)) { + tuid = trainUid + tdf.format(startDate) + tdf.format(endDate) + "0"; + }else { + tuid = trainUid + tdf.format(startDate) + tdf.format(endDate) + "V"; + } +// log.debug("[TA] {}({}) TSC:{}", trainId, tuid, serviceCode); + pstmtTA.setString(1, trainId); + pstmtTA.setString(2, tuid); + pstmtTA.setDate(3, new java.sql.Date(opDate.getTime())); + pstmtTA.setString(4, serviceCode); + pstmtTA.setTimestamp(5, new Timestamp(activationTime)); + pstmtTA.setString(6, CurrentTrainStatus.ACTIVATED.getCode()); + pstmtTA.addBatch(); + batchSize++; + } + pstmtTA.executeBatch(); + conn.commit(); + log.info("[TA] Record Count : {}", batchSize); + }catch(SQLException e){ + log.error(e.getMessage(), e); + } + }); + }//TA + //[TC]0002: Train Cancellation + if(procMap.containsKey("0002")) { + ThreadPoolManager.getInstance().execute(()->{ + int batchSize = 0; + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmtTC = conn.prepareStatement("REPLACE train_cancellation VALUES (?,?,?,?,?,?,?)"); + PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); + ){ + pstmtUTA.setString(1, CurrentTrainStatus.CANCELLED.getCode()); + for(JSONObject obj:procMap.get("0002")) { + String train_id = obj.getJSONObject("body").optString("train_id"); + String canx_loc = obj.getJSONObject("body").optString("loc_stanox"); + String canx_reason = obj.getJSONObject("body").optString("canx_reason_code"); + String canx_type = obj.getJSONObject("body").optString("canx_type"); + String canx_dev = obj.getJSONObject("header").optString("source_dev_id"); + String canx_usr = obj.getJSONObject("header").optString("user_id"); + long canx_time = obj.getJSONObject("body").optLong("canx_timestamp"); +// log.debug("[TC] {}@{} because {} by {}@{}", train_id, canx_loc, canx_reason, canx_usr, canx_dev); + pstmtTC.setString(1, train_id); + pstmtTC.setString(2, canx_loc); + pstmtTC.setTimestamp(3, new Timestamp(canx_time)); + pstmtTC.setString(4, canx_reason); + pstmtTC.setString(5, canx_type); + setString(pstmtTC,6, canx_dev); + setString(pstmtTC,7, canx_usr); + pstmtTC.addBatch(); + pstmtUTA.setString(2, train_id); + int rowCount = pstmtUTA.executeUpdate(); + Runnable r = new Runnable() { + + int runCount = 0; + + @Override + public void run() { + runCount++; + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); + ){ + pstmtUTA.setString(1, CurrentTrainStatus.CANCELLED.getCode()); + pstmtUTA.setString(2, train_id); + int rowCount = pstmtUTA.executeUpdate(); + if(rowCount==0) { + if(runCount > 5) { + log.warn("[TC] Cannot update {} [LAST]", train_id); + return; + } + log.warn("[TC] Cannot update {} (ROUND {})", train_id, runCount); + ThreadPoolManager.getInstance().schedule(this, System.currentTimeMillis()+(long)(1000*Math.pow(2, runCount))); + }else { + log.info("[TC] Successfully update {} (ROUND {})", train_id, runCount); + } + conn.commit(); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + } + + }; + if(rowCount==0) { + log.warn("[TC] Cannot update {}", train_id); + ThreadPoolManager.getInstance().schedule(r, System.currentTimeMillis()+1000); + } + batchSize++; + } + pstmtTC.executeBatch(); + conn.commit(); + log.info("[TC] Record Count : {}", batchSize); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + + }); + }//TC + //[TM]0003: Train Movement + if(procMap.containsKey("0003")) { + ThreadPoolManager.getInstance().execute(()->{ + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmtTM = conn.prepareStatement("REPLACE INTO current_train_movement VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"); + PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?"); + ){ + int batchSize = 0; + for(JSONObject obj:procMap.get("0003")) { + String trainId = obj.getJSONObject("body" ).optString ("train_id"); + long movtTime = obj.getJSONObject("body" ).optLong ("actual_timestamp"); + long gbttTime = obj.getJSONObject("body" ).optLong ("gbtt_timestamp", -1); + long planTime = obj.getJSONObject("body" ).optLong ("planned_timestamp", -1); + String stanox = obj.getJSONObject("body" ).optString ("loc_stanox"); + String eventType = obj.getJSONObject("body" ).optString ("event_type"); + boolean correction= obj.getJSONObject("body" ).optBoolean("correction_ind"); + String platform = obj.getJSONObject("body" ).optString ("platform"); + String line = obj.getJSONObject("body" ).optString ("line_ind"); + String route = obj.getJSONObject("body" ).optString ("route"); + String dev = obj.getJSONObject("header").optString ("source_dev_id"); + String usr = obj.getJSONObject("header").optString ("user_id"); + boolean termInd = obj.getJSONObject("body" ).optBoolean("train_terminated", false); +// log.debug("[TM] {}@{} PLAT {}/{}{} by {}@{}{}", trainId, stanox, platform, line, route, usr, dev,termInd?"[T]":""); + pstmtTM.setString(1, trainId); + pstmtTM.setTimestamp(2, new Timestamp(movtTime)); + if(gbttTime==-1) { + pstmtTM.setNull(3, Types.TIMESTAMP); + }else { + pstmtTM.setTimestamp(3, new Timestamp(gbttTime)); + } + if(planTime==-1) { + pstmtTM.setNull(4, Types.TIMESTAMP); + }else { + pstmtTM.setTimestamp(4, new Timestamp(planTime)); + } + pstmtTM.setString(5, stanox); + pstmtTM.setString(6, eventType); + pstmtTM.setString(7, correction?"Y":"N"); + setString(pstmtTM,8, platform); + setString(pstmtTM,9, line); + setString(pstmtTM,10, route); + setString(pstmtTM,11, dev); + setString(pstmtTM,12, usr); + pstmtTM.addBatch(); + if(termInd) { + pstmtUTA.setString(1, CurrentTrainStatus.TERMINATED.getCode()); + pstmtUTA.setString(2, trainId); + pstmtUTA.addBatch(); + } + batchSize++; + } + pstmtTM.executeBatch(); + pstmtUTA.executeBatch(); + conn.commit(); + log.info("[TM] Record Count : {}", batchSize); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + }); + } + } + } + + protected void setString(PreparedStatement stmt, int col, String val) throws SQLException{ + if(val==null||"".equals(val)) { + stmt.setNull(col, Types.CHAR); + }else { + stmt.setString(col, val); + } + } + +} diff --git a/src/org/leolo/rail/util/TUIDDateFormat.java b/src/org/leolo/rail/util/TUIDDateFormat.java new file mode 100644 index 0000000..8febd88 --- /dev/null +++ b/src/org/leolo/rail/util/TUIDDateFormat.java @@ -0,0 +1,32 @@ +package org.leolo.rail.util; + +import java.text.DateFormat; +import java.text.FieldPosition; +import java.text.ParsePosition; +import java.util.Date; + +public class TUIDDateFormat extends DateFormat{ + + /** + * + */ + private static final long serialVersionUID = 7274736087586430881L; + + public static final String MONTH_ID = "MBTQPHSONDUE"; + public static final String DAY_ID = "0123456789ABCDEFGHJKLMNPRSTUVWX"; + + + @Override + public StringBuffer format(Date arg0, StringBuffer arg1, FieldPosition arg2) { + // TODO Auto-generated method stub + arg1.append(MONTH_ID.charAt(arg0.getMonth())); + arg1.append(DAY_ID.charAt(arg0.getDate()-1)); + return arg1; + } + + @Override + public Date parse(String arg0, ParsePosition arg1) { + throw new UnsupportedOperationException(); + } + +}