Browse Source

Process train reinstatement and very short term planning schedule

develop
LO Kam Tao Leo 4 years ago
parent
commit
a9d87baa6c
  1. 1
      .gitignore
  2. 8
      src/log4j2.xml
  3. 12
      src/org/leolo/rail/NetowrkRailProcessingThread.java
  4. 76
      src/org/leolo/rail/TrainMovementProcessor.java
  5. 275
      src/org/leolo/rail/VTSPProcessor.java

1
.gitignore vendored

@ -2,3 +2,4 @@
/target/
configuration.properties
tmpd/
*_log*

8
src/log4j2.xml

@ -5,10 +5,18 @@
<PatternLayout
pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
</Console>
<File name="RIlog" fileName="ri_log">
<PatternLayout
pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
</File>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console" />
</Root>
<Logger name="org.leolo.RI">
<AppenderRef ref="Console" />
<AppenderRef ref="RIlog" />
</Logger>
</Loggers>
</Configuration>

12
src/org/leolo/rail/NetowrkRailProcessingThread.java

@ -11,6 +11,8 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread {
private static Logger log = LogManager.getLogger(NetowrkRailProcessingThread.class);
private TrainMovementProcessor procTrainMvt = new TrainMovementProcessor();
private VTSPProcessor procVTSP = new VTSPProcessor();
private boolean stopThread = false;
public NetowrkRailProcessingThread() throws Exception {
super("network");
@ -20,11 +22,13 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread {
protected void _init() {
try {
connection.subscribe(Constants.NetworkRail.TOPIC_NAME_MVT);
connection.subscribe(Constants.NetworkRail.TOPIC_NAME_VTSP);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
procTrainMvt.start();
procVTSP.start();
}
private volatile long sleepTime = Constants.Generic.DEFAULT_SLEEP_TIME;
@ -36,12 +40,18 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread {
log.error(e1.getMessage(), e1);
}
sleepTime*=Constants.Generic.INCRESE_RATIO;
if(sleepTime>128000) {
log.atError().log("Failed too many times. Restart thread.");
}
}
@Override
public void run() {
while(true) {
try {
if(stopThread) {
break;
}
StompFrame frm = connection.receive();
if("ERROR".equals(frm.getAction())) {
log.error("Error message received. {}", frm.getBody());
@ -52,6 +62,8 @@ public class NetowrkRailProcessingThread extends BaseProcessingThread {
String topic = frm.getHeaders().get("destination");
if(Constants.NetworkRail.TOPIC_NAME_MVT.equals(topic)) {
procTrainMvt.process(frm);
}else if(Constants.NetworkRail.TOPIC_NAME_VTSP.equals(topic)) {
procVTSP.process(frm);
}
sleepTime = Constants.Generic.DEFAULT_SLEEP_TIME;
} catch (Exception e) {

76
src/org/leolo/rail/TrainMovementProcessor.java

@ -27,8 +27,9 @@ import org.json.JSONObject;
import org.leolo.rail.util.TUIDDateFormat;
public class TrainMovementProcessor extends Thread{
private Logger log = LogManager.getLogger(TrainMovementProcessor.class);
private Logger logRI = LogManager.getLogger("org.leolo.RI");
private final Object SYNC_TOKEN = new Object();
@ -51,6 +52,7 @@ public class TrainMovementProcessor extends Thread{
@Override
public void run() {
logRI.debug("Started.");
while(true) {
if(terminated) {
return;
@ -283,6 +285,78 @@ public class TrainMovementProcessor extends Thread{
}
});
}//TM
//[RI]0005: Train re-instatement
if(procMap.containsKey("0005")) {
ThreadPoolManager.getInstance().execute(()->{
int batchSize = 0;
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmtTC = conn.prepareStatement("REPLACE current_train_reinstatement VALUES (?,?,?,?,?)");
PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?");
){
pstmtUTA.setString(1, CurrentTrainStatus.ACTIVATED.getCode());
for(JSONObject obj:procMap.get("0005")) {
String train_id = obj.getJSONObject("body").optString("train_id");
String canx_loc = obj.getJSONObject("body").optString("loc_stanox");
String canx_dev = obj.getJSONObject("header").optString("source_dev_id");
String canx_usr = obj.getJSONObject("header").optString("user_id");
long canx_time = obj.getJSONObject("body").optLong("reinstatement_timestamp");
logRI.atInfo().log("TS: {}", canx_time);
logRI.atInfo().log(obj.getJSONObject("body").toString());
// log.debug("[TC] {}@{} because {} by {}@{}", train_id, canx_loc, canx_reason, canx_usr, canx_dev);
pstmtTC.setString(1, train_id);
pstmtTC.setString(2, canx_loc);
pstmtTC.setTimestamp(3, new Timestamp(canx_time));
setString(pstmtTC,4, canx_dev);
setString(pstmtTC,5, canx_usr);
pstmtTC.addBatch();
pstmtUTA.setString(2, train_id);
int rowCount = pstmtUTA.executeUpdate();
Runnable r = new Runnable() {
int runCount = 0;
@Override
public void run() {
runCount++;
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmtUTA = conn.prepareStatement("UPDATE current_train SET `status` = ? WHERE train_id = ?");
){
pstmtUTA.setString(1, CurrentTrainStatus.ACTIVATED.getCode());
pstmtUTA.setString(2, train_id);
int rowCount = pstmtUTA.executeUpdate();
if(rowCount==0) {
if(runCount > 5) {
log.warn("[RI] Cannot update {} [LAST]", train_id);
return;
}
log.warn("[RI] Cannot update {} (ROUND {})", train_id, runCount);
ThreadPoolManager.getInstance().schedule(this, System.currentTimeMillis()+(long)(1000*Math.pow(2, runCount)));
}else {
log.info("[RI] Successfully update {} (ROUND {})", train_id, runCount);
}
conn.commit();
}catch(SQLException e) {
log.error(e.getMessage(), e);
}
}
};
if(rowCount==0) {
log.warn("[RI] Cannot update {}", train_id);
ThreadPoolManager.getInstance().schedule(r, System.currentTimeMillis()+1000);
}
batchSize++;
}
pstmtTC.executeBatch();
conn.commit();
log.info("[RI] Record Count : {}", batchSize);
}catch(SQLException e) {
log.error(e.getMessage(), e);
}
});
}
}
}

275
src/org/leolo/rail/VTSPProcessor.java

@ -0,0 +1,275 @@
package org.leolo.rail;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.leolo.rail.model.LocationRecordType;
import org.leolo.rail.model.ScheduleType;
import org.leolo.rail.model.TrainSchedule;
import org.leolo.rail.model.TrainScheduleLocation;
public class VTSPProcessor extends Thread {
private Logger log = LogManager.getLogger(VTSPProcessor.class);
private Logger logRI = LogManager.getLogger("org.leolo.RI");
private final Object SYNC_TOKEN = new Object();
private Queue<StompFrame> procQueue = new LinkedList<>();
private boolean terminated = false;
public void terminate() {
terminated = true;
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
public void process(StompFrame data) {
procQueue.add(data);
synchronized(SYNC_TOKEN) {
SYNC_TOKEN.notifyAll();
}
}
@Override
public void run() {
logRI.debug("Started.");
while(true) {
if(terminated) {
return;
}
StompFrame data = procQueue.poll();
if(data==null) {
synchronized(SYNC_TOKEN) {
try {
// log.debug("No more data. Sleep.");
SYNC_TOKEN.wait(1000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
continue;
}
//Actually handle the data
String msgId = data.getHeaders().get("message-id");
log.info("Processing message {}", msgId);
if(true||Constants.Generic.DEBUG_MODE) {
new File("tmpd").mkdirs();
try(PrintWriter out = new PrintWriter("tmpd/VTSP-msg_"+msgId.replace(":", "-")+".json")){
out.println(data.getBody());
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
}
}
JSONObject obj = new JSONObject(data.getBody()).getJSONObject("VSTPCIFMsgV1").optJSONObject("schedule");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String txType = obj.optString("transaction_type");
if("Delete".equalsIgnoreCase(txType)) {
//Remove the schedule
TrainSchedule ts = new TrainSchedule();
ts.setTrainUId(obj.optString("CIF_train_uid"));
ts.setScheduleType(ScheduleType.STP);
try {
ts.setStartDate(sdf.parse(obj.optString("schedule_start_date")));
ts.setEndDate(sdf.parse(obj.optString("schedule_end_date")));
}catch(ParseException e) {
log.atError().log(e.getMessage(), e);
}
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement psLoc = conn.prepareStatement("DELETE FROM stp_location WHERE suid = ?");
PreparedStatement psSch = conn.prepareStatement("DELETE FROM stp_schedule WHERE suid = ?");
){
psLoc.setString(1, ts.getSUID());
psSch.setString(1, ts.getSUID());
psLoc.executeUpdate();
psSch.executeUpdate();
conn.commit();
}catch(SQLException e) {
log.error(e.getMessage(), e);
}
return;
}
try(
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement psSch = conn.prepareStatement("INSERT INTO stp_schedule VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement psLoc = conn.prepareStatement("INSERT INTO stp_location VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){
TrainSchedule ts = new TrainSchedule();
ts.setTrainUId(obj.optString("CIF_train_uid"));
ts.setScheduleType(ScheduleType.STP);
try {
ts.setStartDate(sdf.parse(obj.optString("schedule_start_date")));
ts.setEndDate(sdf.parse(obj.optString("schedule_end_date")));
}catch(ParseException e) {
log.atError().log(e.getMessage(), e);
}
ts.setDays(obj.optString("schedule_days_runs"));
ts.setBankHolidayInd(obj.optString("CIF_bank_holiday_running"));
ts.setSignalId(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("signalling_id"));
ts.setRsid(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_headcode"));
ts.setTrainStatus(obj.optString("train_status"));
ts.setTrainCategory(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_train_category"));
ts.setSection(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_business_sector"));
ts.setPowerType(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_power_type"));
ts.setTimingLoad(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_timing_load"));
ts.setPlannedSpeed(Integer.toString((int) (obj.getJSONArray("schedule_segment").getJSONObject(0).optInt("CIF_speed")/2.24)));
ts.setOperatingCharacter(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_operating_characteristics"));
ts.setClassAvailable(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_train_class"));
ts.setSleeper(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_sleepers"));
ts.setReservation(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_reservations"));
ts.setCatering(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("CIF_catering_code"));
ts.setAtocCode(obj.getJSONArray("schedule_segment").getJSONObject(0).optString("atoc_code"));
JSONArray locs = obj.getJSONArray("schedule_segment").getJSONObject(0).getJSONArray("schedule_location");
for(int i=0;i<locs.length();i++) {
JSONObject loc = locs.getJSONObject(i);
TrainScheduleLocation tsl = new TrainScheduleLocation();
tsl.setTiplocCode(loc.getJSONObject("location").getJSONObject("tiploc").optString("tiploc_id"));
tsl.setTiplocInstance(0);
tsl.setArrival(getScheduleTime(loc.optString("scheduled_arrival_time")));
tsl.setDeparture(getScheduleTime(loc.optString("scheduled_departure_time")));
tsl.setPass(getScheduleTime(loc.optString("scheduled_pass_time")));
tsl.setPubArrival(getScheduleTime(loc.optString("public_arrival")));
tsl.setPubDeparture(getScheduleTime(loc.optString("public_departure")));
tsl.setPlatform(loc.optString("CIF_platform"));
tsl.setLine(loc.optString("CIF_line"));
tsl.setPath(loc.optString("CIF_path"));
tsl.setEngineeringAllowance(getAllowanceTime(loc.optString("CIF_engineering_allowance")));
tsl.setPathingAllowance(getAllowanceTime(loc.optString("CIF_pathing_allowance")));
tsl.setPerformanceAllowance(getAllowanceTime(loc.optString("CIF_performance_allowance")));
// String locType = loc.optString("location_type");
if(i==0) {
tsl.setRecordType(LocationRecordType.ORIGIN);
}else if((i+1)==locs.length()) {
tsl.setRecordType(LocationRecordType.TERMINATE);
}else{
tsl.setRecordType(LocationRecordType.INTERMEDIATE);
}
ts.getLocations().add(tsl);
}
setString(psSch, 1, ts.getSUID());
setString(psSch, 2, ts.getTrainUId());
setString(psSch, 3, ts.getScheduleType().name());
setString(psSch, 4, sdf.format(ts.getStartDate()));
setString(psSch, 5, sdf.format(ts.getEndDate()));
setString(psSch, 6, ts.getDays());
setString(psSch, 7, ts.getBankHolidayInd());
setString(psSch, 8, ts.getSignalId());
setString(psSch, 9, ts.getRsid());
setString(psSch,10, ts.getTrainStatus());
setString(psSch,11, ts.getTrainCategory());
setString(psSch,12, ts.getSection());
setString(psSch,13, ts.getPowerType());
setString(psSch,14, ts.getTimingLoad());
setString(psSch,15, ts.getPlannedSpeed());
setString(psSch,16, ts.getOperatingCharacter());
setString(psSch,17, ts.getClassAvailable());
setString(psSch,18, ts.getSleeper());
setString(psSch,19, ts.getReservation());
setString(psSch,20, ts.getCatering());
setString(psSch,21, ts.getUicCode());
setString(psSch,22, ts.getAtocCode());
if(ts.getLocations().size()>1) {
setString(psSch,23, ts.getLocations().get(0).getTiplocCode());
setTime(psSch,24, ts.getLocations().get(0).getDeparture());
setString(psSch,25, ts.getLocations().get(ts.getLocations().size()-1).getTiplocCode());
setTime(psSch,26, ts.getLocations().get(ts.getLocations().size()-1).getArrival());
}else {
psSch.setNull(23, Types.CHAR);
psSch.setNull(24, Types.TIME);
psSch.setNull(25, Types.CHAR);
psSch.setNull(26, Types.TIME);
}
psSch.setInt(27, ts.hashCode());
int seq = 1;
for(TrainScheduleLocation tsl:ts.getLocations()) {
setString(psLoc, 1, ts.getSUID());
psLoc.setInt(2, seq++);
setString(psLoc, 3, tsl.getTiplocCode());
psLoc.setInt(4, tsl.getTiplocInstance());
setTime(psLoc, 5, tsl.getArrival());
setTime(psLoc, 6, tsl.getDeparture());
setTime(psLoc, 7, tsl.getPass());
setTime(psLoc, 8, tsl.getPubArrival());
setTime(psLoc, 9, tsl.getPubDeparture());
setString(psLoc,10, tsl.getPlatform());
setString(psLoc,11, tsl.getLine());
setString(psLoc,12, tsl.getPath());
setTime(psLoc,13, tsl.getEngineeringAllowance());
setTime(psLoc,14, tsl.getPathingAllowance());
setTime(psLoc,15, tsl.getPerformanceAllowance());
setString(psLoc,16, tsl.getRecordType().getRecordCode());
psLoc.addBatch();
}
psSch.executeUpdate();
psLoc.executeBatch();
conn.commit();
}catch(SQLException e) {
log.atError().log(e.getMessage(),e);
}
}
}
protected void setString(PreparedStatement stmt, int col, String val) throws SQLException{
if(val==null||"".equals(val)) {
stmt.setNull(col, Types.CHAR);
}else {
stmt.setString(col, val);
}
}
private long getScheduleTime(String t) {
//HHMMSS
t = t.trim();
if("".equals(t)) {
return 0;
}
int hour = Integer.parseInt(t.substring(0, 2));
int min = Integer.parseInt(t.substring(2, 4));
int sec = Integer.parseInt(t.substring(4, 6));
return hour*3_600_000+min*60_000+sec;
}
private long getAllowanceTime(String t) {
if("".equals(t.trim())) {
return 0;
}
if("H".equals(t.trim()))
return 30_000;
if(t.endsWith("H")) {
//nH
return Integer.parseInt(t.substring(0,1))*60_000+30_000;
}else {
//nn
return Integer.parseInt(t)*60_000;
}
}
public void setTime(PreparedStatement stmt, int col, long time) throws SQLException{
if(time==0) {
stmt.setNull(col, Types.TIME);
}else {
int h = (int)(time/3_600_000);
int m = (int)((time/60_000)%60);
int s = (int)((time/1_000)%60);
stmt.setString(col, h+":"+m+":"+s);
}
}
}
Loading…
Cancel
Save