You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

317 lines
13 KiB

package org.leolo.rail;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Vector;
import java.util.zip.GZIPInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.leolo.rail.model.LocationRecordType;
import org.leolo.rail.model.ScheduleType;
import org.leolo.rail.model.TrainSchedule;
import org.leolo.rail.model.TrainScheduleLocation;
public class ScheduleProcessor extends BaseProcessor implements Runnable {
private Logger log = LogManager.getLogger(ScheduleProcessor.class);
private File targetFile;
public ScheduleProcessor(File targetFile) {
this.targetFile = targetFile;
}
@Override
public void run() {
Vector<TrainSchedule> newSchedule = new Vector<>();
Vector<TrainSchedule> updatedSchedule = new Vector<>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
try(
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(targetFile))));
Connection conn = DatabaseManager.getInstance().getConnection();
PreparedStatement pstmt = conn.prepareStatement("SELECT hash_code FROM ltp_schedule WHERE suid = ?")//For checking duplication
){
while(true) {
String line = br.readLine();
if(line==null) {
break;
}
JSONObject obj = new JSONObject(line);
TrainSchedule ts = new TrainSchedule();
ts.setTrainUId(obj.optString("CIF_train_uid"));
String stpInd = obj.optString("CIF_stp_indicator");
if("C".equals(stpInd))
ts.setScheduleType(ScheduleType.CAN);
else if("N".equals(stpInd))
ts.setScheduleType(ScheduleType.WTT);
else if("O".equals(stpInd))
ts.setScheduleType(ScheduleType.OVL);
else if("P".equals(stpInd))
ts.setScheduleType(ScheduleType.WTT);
else
log.atWarn().log("Unknow schedule type {} for train UID {}",stpInd,obj.optString("CIF_train_uid"));
try {
ts.setStartDate(sdf.parse(obj.optString("schedule_start_date")));
ts.setEndDate(sdf.parse(obj.optString("schedule_end_date")));
}catch(ParseException e) {
log.atError().log(e.getMessage(), e);
}
ts.setDays(obj.optString("schedule_days_runs"));
ts.setBankHolidayInd(obj.optString("CIF_bank_holiday_running"));
ts.setSignalId(obj.getJSONObject("schedule_segment").optString("signalling_id"));
ts.setRsid(obj.getJSONObject("schedule_segment").optString("CIF_headcode"));
ts.setTrainStatus(obj.optString("train_status"));
ts.setTrainCategory(obj.getJSONObject("schedule_segment").optString("CIF_train_category"));
ts.setSection(obj.getJSONObject("schedule_segment").optString("CIF_business_sector"));
ts.setPowerType(obj.getJSONObject("schedule_segment").optString("CIF_power_type"));
ts.setTimingLoad(obj.getJSONObject("schedule_segment").optString("CIF_timing_load"));
ts.setPlannedSpeed(obj.getJSONObject("schedule_segment").optString("CIF_speed"));
ts.setOperatingCharacter(obj.getJSONObject("schedule_segment").optString("CIF_operating_characteristics"));
ts.setClassAvailable(obj.getJSONObject("schedule_segment").optString("CIF_train_class"));
ts.setSleeper(obj.getJSONObject("schedule_segment").optString("CIF_sleepers"));
ts.setReservation(obj.getJSONObject("schedule_segment").optString("CIF_reservations"));
ts.setCatering(obj.getJSONObject("schedule_segment").optString("CIF_catering_code"));
// if()
// ts.setUicCode(obj.getJSONObject("new_schedule_segment").optString("uic_code"));
ts.setAtocCode(obj.optString("atoc_code"));
JSONArray locs = obj.getJSONObject("schedule_segment").optJSONArray("schedule_location");
if(locs==null && !"C".equals(stpInd)) {
log.atWarn().log("No segments for {}",ts.getSUID());
continue;
}
if(locs!=null) {
for(int i=0;i<locs.length();i++) {
JSONObject loc = locs.getJSONObject(i);
TrainScheduleLocation tsl = new TrainScheduleLocation();
tsl.setTiplocCode(loc.optString("tiploc_code"));
tsl.setTiplocInstance(loc.optInt("tiploc_instance"));
tsl.setArrival(getScheduleTime(loc.optString("arrival")));
tsl.setDeparture(getScheduleTime(loc.optString("departure")));
tsl.setPass(getScheduleTime(loc.optString("pass")));
tsl.setPubArrival(getScheduleTime(loc.optString("public_arrival")));
tsl.setPubDeparture(getScheduleTime(loc.optString("public_departure")));
tsl.setPlatform(loc.optString("platform"));
tsl.setLine(loc.optString("line"));
tsl.setPath(loc.optString("path"));
tsl.setEngineeringAllowance(getAllowanceTime(loc.optString("engineering_allowance")));
tsl.setPathingAllowance(getAllowanceTime(loc.optString("pathing_allowance")));
tsl.setPerformanceAllowance(getAllowanceTime(loc.optString("performance_allowance")));
String locType = loc.optString("location_type");
if("LO".equals(locType)) {
tsl.setRecordType(LocationRecordType.ORIGIN);
}else if("LI".equals(locType)) {
tsl.setRecordType(LocationRecordType.INTERMEDIATE);
}else if("LT".equals(locType)) {
tsl.setRecordType(LocationRecordType.TERMINATE);
}
ts.getLocations().add(tsl);
}//loop for location
}
pstmt.setString(1, ts.getSUID());
try(ResultSet rs = pstmt.executeQuery()){
if(rs.next()) {
// log.atDebug().log("SUID: {}, old hash: {}, new hash: {}", ts.getSUID(), Integer.toHexString(rs.getInt(1)), Integer.toHexString(ts.hashCode()));
// System.exit(1000);
if(rs.getInt(1)!=ts.hashCode()) {
updatedSchedule.add(ts);
}
}else {
newSchedule.add(ts);
}
}
}//main loop
log.atInfo().log("New: {}; Updated:{}", newSchedule.size(), updatedSchedule.size());
} catch (FileNotFoundException e) {
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
//TODO: Insert/update the record
try(Connection conn = DatabaseManager.getInstance().getConnection()){
//New records
try(
PreparedStatement psSch = conn.prepareStatement("INSERT INTO ltp_schedule VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement psLoc = conn.prepareStatement("INSERT INTO ltp_location VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){
for(TrainSchedule ts:newSchedule) {
setString(psSch, 1, ts.getSUID());
setString(psSch, 2, ts.getTrainUId());
setString(psSch, 3, ts.getScheduleType().name());
setString(psSch, 4, sdf.format(ts.getStartDate()));
setString(psSch, 5, sdf.format(ts.getEndDate()));
setString(psSch, 6, ts.getDays());
setString(psSch, 7, ts.getBankHolidayInd());
setString(psSch, 8, ts.getSignalId());
setString(psSch, 9, ts.getRsid());
setString(psSch,10, ts.getTrainStatus());
setString(psSch,11, ts.getTrainCategory());
setString(psSch,12, ts.getSection());
setString(psSch,13, ts.getPowerType());
setString(psSch,14, ts.getTimingLoad());
setString(psSch,15, ts.getPlannedSpeed());
setString(psSch,16, ts.getOperatingCharacter());
setString(psSch,17, ts.getClassAvailable());
setString(psSch,18, ts.getSleeper());
setString(psSch,19, ts.getReservation());
setString(psSch,20, ts.getCatering());
setString(psSch,21, ts.getUicCode());
setString(psSch,22, ts.getAtocCode());
if(ts.getLocations().size()>1) {
setString(psSch,23, ts.getLocations().get(0).getTiplocCode());
setTime(psSch,24, ts.getLocations().get(0).getDeparture());
setString(psSch,25, ts.getLocations().get(ts.getLocations().size()-1).getTiplocCode());
setTime(psSch,26, ts.getLocations().get(ts.getLocations().size()-1).getArrival());
}else {
psSch.setNull(23, Types.CHAR);
psSch.setNull(24, Types.TIME);
psSch.setNull(25, Types.CHAR);
psSch.setNull(26, Types.TIME);
}
psSch.setInt(27, ts.hashCode());
int seq = 1;
for(TrainScheduleLocation tsl:ts.getLocations()) {
setString(psLoc, 1, ts.getSUID());
psLoc.setInt(2, seq++);
setString(psLoc, 3, tsl.getTiplocCode());
psLoc.setInt(4, tsl.getTiplocInstance());
setTime(psLoc, 5, tsl.getArrival());
setTime(psLoc, 6, tsl.getDeparture());
setTime(psLoc, 7, tsl.getPass());
setTime(psLoc, 8, tsl.getPubArrival());
setTime(psLoc, 9, tsl.getPubDeparture());
setString(psLoc,10, tsl.getPlatform());
setString(psLoc,11, tsl.getLine());
setString(psLoc,12, tsl.getPath());
setTime(psLoc,13, tsl.getEngineeringAllowance());
setTime(psLoc,14, tsl.getPathingAllowance());
setTime(psLoc,15, tsl.getPerformanceAllowance());
setString(psLoc,16, tsl.getRecordType().getRecordCode());
psLoc.addBatch();
}
psSch.executeUpdate();
psLoc.executeBatch();
conn.commit();
}
}//New Records
//Update records
try(
PreparedStatement psSch = conn.prepareStatement("REPLACE INTO ltp_schedule VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
PreparedStatement psDel = conn.prepareStatement("DELETE FROM ltp_location WHERE suid = ?");
PreparedStatement psLoc = conn.prepareStatement("INSERT INTO ltp_location VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
){
for(TrainSchedule ts:updatedSchedule) {
setString(psSch, 1, ts.getSUID());
setString(psSch, 2, ts.getTrainUId());
setString(psSch, 3, ts.getScheduleType().name());
setString(psSch, 4, sdf.format(ts.getStartDate()));
setString(psSch, 5, sdf.format(ts.getEndDate()));
setString(psSch, 6, ts.getDays());
setString(psSch, 7, ts.getBankHolidayInd());
setString(psSch, 8, ts.getSignalId());
setString(psSch, 9, ts.getRsid());
setString(psSch,10, ts.getTrainStatus());
setString(psSch,11, ts.getTrainCategory());
setString(psSch,12, ts.getSection());
setString(psSch,13, ts.getPowerType());
setString(psSch,14, ts.getTimingLoad());
setString(psSch,15, ts.getPlannedSpeed());
setString(psSch,16, ts.getOperatingCharacter());
setString(psSch,17, ts.getClassAvailable());
setString(psSch,18, ts.getSleeper());
setString(psSch,19, ts.getReservation());
setString(psSch,20, ts.getCatering());
setString(psSch,21, ts.getUicCode());
setString(psSch,22, ts.getAtocCode());
if(ts.getLocations().size()>1) {
setString(psSch,23, ts.getLocations().get(0).getTiplocCode());
setTime(psSch,24, ts.getLocations().get(0).getDeparture());
setString(psSch,25, ts.getLocations().get(ts.getLocations().size()-1).getTiplocCode());
setTime(psSch,26, ts.getLocations().get(ts.getLocations().size()-1).getArrival());
}else {
psSch.setNull(23, Types.CHAR);
psSch.setNull(24, Types.TIME);
psSch.setNull(25, Types.CHAR);
psSch.setNull(26, Types.TIME);
}
psSch.setInt(27, ts.hashCode());
psDel.setString(1, ts.getSUID());
psDel.executeUpdate();
int seq = 1;
for(TrainScheduleLocation tsl:ts.getLocations()) {
setString(psLoc, 1, ts.getSUID());
psLoc.setInt(2, seq++);
setString(psLoc, 3, tsl.getTiplocCode());
psLoc.setInt(4, tsl.getTiplocInstance());
setTime(psLoc, 5, tsl.getArrival());
setTime(psLoc, 6, tsl.getDeparture());
setTime(psLoc, 7, tsl.getPass());
setTime(psLoc, 8, tsl.getPubArrival());
setTime(psLoc, 9, tsl.getPubDeparture());
setString(psLoc,10, tsl.getPlatform());
setString(psLoc,11, tsl.getLine());
setString(psLoc,12, tsl.getPath());
setTime(psLoc,13, tsl.getEngineeringAllowance());
setTime(psLoc,14, tsl.getPathingAllowance());
setTime(psLoc,15, tsl.getPerformanceAllowance());
setString(psLoc,16, tsl.getRecordType().getRecordCode());
psLoc.addBatch();
}
psSch.executeUpdate();
psLoc.executeBatch();
conn.commit();
}
}//Updated Records
// log.atInfo().log("Inserted new {} record(s).",newSchedule.size());
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
if(!targetFile.delete()) {
log.warn("Unable to delete {}", targetFile.getName());
}
log.atInfo().log("Done processing {}", targetFile.getName());
}
private long getScheduleTime(String t) {
//HHMM'H'
if("".equals(t.trim())) {
return 0;
}
int hour = Integer.parseInt(t.substring(0, 2));
int min = Integer.parseInt(t.substring(2, 4));
boolean half = t.length()>=5 && t.substring(4, 5).equals("H");
return hour*3_600_000+min*60_000+(half?30_000:0);
}
private long getAllowanceTime(String t) {
if("".equals(t.trim())) {
return 0;
}
if("H".equals(t.trim()))
return 30_000;
if(t.endsWith("H")) {
//nH
return Integer.parseInt(t.substring(0,1))*60_000+30_000;
}else {
//nn
return Integer.parseInt(t)*60_000;
}
}
}