diff --git a/src/main/java/org/leolo/nrdatad/Constants.java b/src/main/java/org/leolo/nrdatad/Constants.java index 62a507d..068a451 100644 --- a/src/main/java/org/leolo/nrdatad/Constants.java +++ b/src/main/java/org/leolo/nrdatad/Constants.java @@ -8,6 +8,8 @@ public final class Constants { public static class CronJob{ public static final String REFERENCE_DATA = "refd"; public static final String SCHEDULE_IMPORT = "scji"; + + public static final int SCHEDULE_BATCH_SIZE = 10000; } public static class Configuration { @@ -15,6 +17,7 @@ public final class Constants { public static final String ALWAYS_RUN_REF_DATA = "cron.ref.always"; public static final String NETWORK_RAIL_USER = "network.user"; public static final String NETWORK_RAIL_PASSWORD = "network.pwd"; + public static final String TEMP_DIR = "file.temp_dir"; } public static class NetworkRailURI{ diff --git a/src/main/java/org/leolo/nrdatad/cron/ScheduleImportJob.java b/src/main/java/org/leolo/nrdatad/cron/ScheduleImportJob.java index 293fcd2..a8b5018 100644 --- a/src/main/java/org/leolo/nrdatad/cron/ScheduleImportJob.java +++ b/src/main/java/org/leolo/nrdatad/cron/ScheduleImportJob.java @@ -2,14 +2,120 @@ package org.leolo.nrdatad.cron; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.leolo.nrdatad.ConfigurationManager; +import org.leolo.nrdatad.Constants; +import org.leolo.nrdatad.util.HttpUtil; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import java.io.*; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; + public class ScheduleImportJob implements Job { Logger log = LogManager.getLogger(); + + private File baseTempDir; + private ConfigurationManager conf = ConfigurationManager.getInstance(); + + public static final String TYPE_HEADER = "JsonTimetableV1"; + public static final String TYPE_ASSOCIATION = "JsonAssociationV1"; + public static final String TYPE_TIPLOC = "TiplocV1"; + public static final String TYPE_SCHEDULE = "JsonScheduleV1"; + public static final String TYPE_EOF = "EOF"; + + @Override public void execute(JobExecutionContext context) throws JobExecutionException { log.atInfo().log("Loading schedule from SCHEDULE stream"); + final String RUN_ID = Long.toHexString(System.currentTimeMillis()&0xffffff); + log.atInfo().log("RUN_ID is {}", RUN_ID); + baseTempDir = new File(conf.getProperty(Constants.Configuration.TEMP_DIR)+"/nrdatad/SCHEDULE-"+RUN_ID); + baseTempDir.mkdirs(); + ExecutorService threadPool = Executors.newFixedThreadPool(5); + int countH = 0, countA=0, countT=0, countS=0; + int batchH = 0, batchA=0, batchT=0, batchS=0; + try(BufferedReader br = new BufferedReader(new InputStreamReader(HttpUtil.getHttpGZipStream( + Constants.NetworkRailURI.SCHEDULE_URL, + conf.getProperty(Constants.Configuration.NETWORK_RAIL_USER), + conf.getProperty(Constants.Configuration.NETWORK_RAIL_PASSWORD) + ))) + ){ + PrintWriter pa = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(baseTempDir, "A_0")))); + PrintWriter pt = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(baseTempDir, "T_0")))); + PrintWriter ps = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(baseTempDir, "S_0")))); + int count = 0; + while(true){ + String line = br.readLine(); + if(line==null){ + break; + } + JSONObject obj = new JSONObject(line); + String objectType = obj.keys().next(); + count++; + if(TYPE_HEADER.equals(objectType)){ + //TODO: Deal with the header. + countH++; + } else if (TYPE_ASSOCIATION.equals(objectType)){ + pa.println(obj.getJSONObject(TYPE_ASSOCIATION)); + if(++countA%Constants.CronJob.SCHEDULE_BATCH_SIZE==0){ + log.atInfo().log("New batch for association."); + pa.flush(); + pa.close(); + final int OLD_BATCH = batchA; + threadPool.execute(()->{processAssociation("A_"+OLD_BATCH);}); + pa = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(baseTempDir, "A_"+(++batchA))))); + } + } else if (TYPE_TIPLOC.equals(objectType)) { + pt.println(obj.getJSONObject(TYPE_TIPLOC)); + if(++countT%Constants.CronJob.SCHEDULE_BATCH_SIZE==0){ + log.atInfo().log("New batch for tiploc."); + pt.flush(); + pt.close(); + pt = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(baseTempDir, "T_"+(++batchT))))); + } + } else if (TYPE_SCHEDULE.equals(objectType)) { + ps.println(obj.getJSONObject(TYPE_SCHEDULE)); + if(++countS%Constants.CronJob.SCHEDULE_BATCH_SIZE==0){ + log.atInfo().log("New batch for schedule."); + ps.flush(); + ps.close(); + ps = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(baseTempDir, "S_"+(++batchS))))); + } + } else if (TYPE_EOF.equals(objectType)) { + log.atInfo().log("EOF found!"); + } else { + log.atInfo().log("Unmatched type {}", objectType); + } + } + pa.flush(); + pa.close(); + pt.flush(); + pt.close(); + ps.flush(); + ps.close(); + final int OLD_BATCH_A = batchA; + threadPool.execute(()->{processAssociation("A_"+OLD_BATCH_A);}); + log.atInfo().log("There are {} records on total. Including {} headers, {} association, {} TIPLOC and {} schedules", count, countH, countA, countT, countS); + }catch (IOException | URISyntaxException e){ + log.atError().withThrowable(e).log("Unable to read SCHEDULE feed"); + return; + } + threadPool.shutdown(); + try { + threadPool.awaitTermination(12, TimeUnit.HOURS); + } catch (InterruptedException e) { + log.atWarn().log("SCHEDULE import job interrupted"); + } + } + + private void processAssociation(String file){ + log.atDebug().log("Processing {}",file); + } } diff --git a/src/main/java/org/leolo/nrdatad/model/TrainSchedule.java b/src/main/java/org/leolo/nrdatad/model/TrainSchedule.java new file mode 100644 index 0000000..a648d72 --- /dev/null +++ b/src/main/java/org/leolo/nrdatad/model/TrainSchedule.java @@ -0,0 +1,98 @@ +package org.leolo.nrdatad.model; + +import java.util.Date; + +public class TrainSchedule { + + private int scheduleVersion = 0; + private String runsOnBankHoliday; + private String trainStatus;//TODO: change it into enum + private String trainUid; + private String runsOn; + private ShortTermPlanningIndicator shortTermPlanningIndicator; + private String atocCode; + private String uicCode; + private String tractionClass; + private Date startDate; + private Date endDate; + + public String getRunsOnBankHoliday() { + return runsOnBankHoliday; + } + + public void setRunsOnBankHoliday(String runsOnBankHoliday) { + this.runsOnBankHoliday = runsOnBankHoliday; + } + + public String getTrainStatus() { + return trainStatus; + } + + public void setTrainStatus(String trainStatus) { + this.trainStatus = trainStatus; + } + + public String getTrainUid() { + return trainUid; + } + + public void setTrainUid(String trainUid) { + this.trainUid = trainUid; + } + + public String getRunsOn() { + return runsOn; + } + + public void setRunsOn(String runsOn) { + this.runsOn = runsOn; + } + + public ShortTermPlanningIndicator getShortTermPlanningIndicator() { + return shortTermPlanningIndicator; + } + + public void setShortTermPlanningIndicator(ShortTermPlanningIndicator shortTermPlanningIndicator) { + this.shortTermPlanningIndicator = shortTermPlanningIndicator; + } + + public String getAtocCode() { + return atocCode; + } + + public void setAtocCode(String atocCode) { + this.atocCode = atocCode; + } + + public String getUicCode() { + return uicCode; + } + + public void setUicCode(String uicCode) { + this.uicCode = uicCode; + } + + public String getTractionClass() { + return tractionClass; + } + + public void setTractionClass(String tractionClass) { + this.tractionClass = tractionClass; + } + + public Date getStartDate() { + return startDate; + } + + public void setStartDate(Date startDate) { + this.startDate = startDate; + } + + public Date getEndDate() { + return endDate; + } + + public void setEndDate(Date endDate) { + this.endDate = endDate; + } +} diff --git a/src/main/java/org/leolo/nrdatad/model/TrainScheduleLocation.java b/src/main/java/org/leolo/nrdatad/model/TrainScheduleLocation.java index 7673887..053a623 100644 --- a/src/main/java/org/leolo/nrdatad/model/TrainScheduleLocation.java +++ b/src/main/java/org/leolo/nrdatad/model/TrainScheduleLocation.java @@ -3,6 +3,7 @@ package org.leolo.nrdatad.model; public class TrainScheduleLocation { private int recordSequence = 0; + private TrainScheduleSector recordSector; private TrainScheduleLocationRecordIdentity recordIdentity; private String tiploc; private int tiplocInstance; diff --git a/src/main/java/org/leolo/nrdatad/util/HttpUtil.java b/src/main/java/org/leolo/nrdatad/util/HttpUtil.java index a9a9fa1..d9a834e 100644 --- a/src/main/java/org/leolo/nrdatad/util/HttpUtil.java +++ b/src/main/java/org/leolo/nrdatad/util/HttpUtil.java @@ -7,6 +7,7 @@ import org.leolo.nrdatad.Constants; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.net.URL; @@ -56,6 +57,18 @@ public class HttpUtil { return sb.toString(); } + public static InputStream getHttpGZipStream(URL url, String userName, String password) throws IOException, URISyntaxException { + URLConnection conn = url.openConnection(); + String userpwd = userName+":"+password; + conn.addRequestProperty("Authorization", "Basic "+ Base64.getEncoder().encodeToString(userpwd.getBytes())); + conn.connect(); + return new GZIPInputStream(conn.getInputStream()); + } + + public static InputStream getHttpGZipStream(String url, String userName, String password) throws IOException, URISyntaxException { + return getHttpGZipStream(new URL(url), userName, password); + } + public static String sendHttpRequestForGZipFile(String url, String userName, String password) throws IOException, URISyntaxException { return sendHttpRequestForGZipFile(new URL(url), userName, password); } diff --git a/src/main/java/org/leolo/nrdatad/util/TimeUtil.java b/src/main/java/org/leolo/nrdatad/util/TimeUtil.java new file mode 100644 index 0000000..121fb3f --- /dev/null +++ b/src/main/java/org/leolo/nrdatad/util/TimeUtil.java @@ -0,0 +1,58 @@ +package org.leolo.nrdatad.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Date; + +public class TimeUtil { + public static final long ONE_SECOND = 1_000; + public static final long ONE_MINUTE = 60_000; + public static final long ONE_HOUR = 3_600_000; + public static final long ONE_DAY = 86_400_000; + + private static Logger log = LogManager.getLogger(); + + public static long extractTime(Date date){ + return date.getTime()%ONE_DAY; + } + + public static long parseTime(String time){ + return parseTime(time, 0); + } + + + public static long parseTime(String time, int dayOffSet){ + time = time.strip().toUpperCase(); + long result; + //This given string can be 1, 2, 4 or 5 characters long + int strLen = time.length(); + if (strLen == 0 ) { + result = 0; + } else if (strLen==1||strLen==2) { + if("H".equals(time)) { + result = 30 * ONE_SECOND; + } else if(time.endsWith("H")){ + result = Integer.parseInt(time.substring(0, 1))*ONE_MINUTE + 30*ONE_SECOND; + }else{ + result = Integer.parseInt(time)*ONE_MINUTE; + } + } else if (strLen==4||strLen==5) { + if(time.endsWith("H")){ + result = Integer.parseInt(time.substring(0, 2))*ONE_HOUR + + Integer.parseInt(time.substring(2, 4))*ONE_MINUTE + + 30*ONE_SECOND; + }else{ + result = Integer.parseInt(time.substring(0, 2))*ONE_HOUR + + Integer.parseInt(time.substring(2, 4))*ONE_MINUTE; + } + } else { + log.atWarn().log("String \"{}\" is not a legal value.", time); + throw new RuntimeException(time+" is Illegal value for time"); + } + + result += dayOffSet*ONE_DAY; + return result; + } + +} diff --git a/src/test/java/org/leolo/nrdatad/util/TimeUtilTest.java b/src/test/java/org/leolo/nrdatad/util/TimeUtilTest.java new file mode 100644 index 0000000..5c6647a --- /dev/null +++ b/src/test/java/org/leolo/nrdatad/util/TimeUtilTest.java @@ -0,0 +1,75 @@ +package org.leolo.nrdatad.util; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TimeUtilTest { + + @Test public void testAllowance(){ + assertEquals(60000, TimeUtil.parseTime("1")); + assertEquals(0, TimeUtil.parseTime("")); + assertEquals(0, TimeUtil.parseTime(" ")); + assertEquals(0, TimeUtil.parseTime(" ")); + assertEquals(60000, TimeUtil.parseTime("1 ")); + assertEquals(90000, TimeUtil.parseTime("1H")); + assertEquals(30000, TimeUtil.parseTime("H")); + assertEquals(30000, TimeUtil.parseTime("H ")); + assertEquals(30000, TimeUtil.parseTime(" H")); + assertEquals(600000, TimeUtil.parseTime("10")); + assertEquals(900000, TimeUtil.parseTime("15")); + assertEquals(1200000, TimeUtil.parseTime("20")); + } + + @Test public void testWTTTime(){ + assertEquals(3_660_000, TimeUtil.parseTime("0101 ")); + assertEquals(3_690_000, TimeUtil.parseTime("0101H")); + assertEquals(36_000_000, TimeUtil.parseTime("1000 ")); + assertEquals(37_800_000, TimeUtil.parseTime("1030 ")); + assertEquals(36_030_000, TimeUtil.parseTime("1000H")); + assertEquals(37_830_000, TimeUtil.parseTime("1030H")); + assertEquals(72_000_000, TimeUtil.parseTime("2000 ")); + assertEquals(73_800_000, TimeUtil.parseTime("2030 ")); + assertEquals(72_030_000, TimeUtil.parseTime("2000H")); + assertEquals(73_830_000, TimeUtil.parseTime("2030H")); + assertEquals(3_660_000, TimeUtil.parseTime("0101 ", 0)); + assertEquals(3_690_000, TimeUtil.parseTime("0101H", 0)); + assertEquals(36_000_000, TimeUtil.parseTime("1000 ", 0)); + assertEquals(37_800_000, TimeUtil.parseTime("1030 ", 0)); + assertEquals(36_030_000, TimeUtil.parseTime("1000H", 0)); + assertEquals(37_830_000, TimeUtil.parseTime("1030H", 0)); + assertEquals(72_000_000, TimeUtil.parseTime("2000 ", 0)); + assertEquals(73_800_000, TimeUtil.parseTime("2030 ", 0)); + assertEquals(72_030_000, TimeUtil.parseTime("2000H", 0)); + assertEquals(73_830_000, TimeUtil.parseTime("2030H", 0)); + // 86_400_000 + assertEquals( 90_060_000, TimeUtil.parseTime("0101 ", 1)); + assertEquals( 90_090_000, TimeUtil.parseTime("0101H", 1)); + assertEquals(122_400_000, TimeUtil.parseTime("1000 ", 1)); + assertEquals(124_200_000, TimeUtil.parseTime("1030 ", 1)); + assertEquals(122_430_000, TimeUtil.parseTime("1000H", 1)); + assertEquals(124_230_000, TimeUtil.parseTime("1030H", 1)); + assertEquals(158_400_000, TimeUtil.parseTime("2000 ", 1)); + assertEquals(160_200_000, TimeUtil.parseTime("2030 ", 1)); + assertEquals(158_430_000, TimeUtil.parseTime("2000H", 1)); + assertEquals(160_230_000, TimeUtil.parseTime("2030H", 1)); + } + + @Test public void testGBTTTime(){ + assertEquals(3_660_000, TimeUtil.parseTime("0101")); + assertEquals(36_000_000, TimeUtil.parseTime("1000")); + assertEquals(37_800_000, TimeUtil.parseTime("1030")); + assertEquals(72_000_000, TimeUtil.parseTime("2000")); + assertEquals(73_800_000, TimeUtil.parseTime("2030")); + assertEquals(3_660_000, TimeUtil.parseTime("0101", 0)); + assertEquals(36_000_000, TimeUtil.parseTime("1000", 0)); + assertEquals(37_800_000, TimeUtil.parseTime("1030", 0)); + assertEquals(72_000_000, TimeUtil.parseTime("2000", 0)); + assertEquals(73_800_000, TimeUtil.parseTime("2030", 0)); + assertEquals( 90_060_000, TimeUtil.parseTime("0101", 1)); + assertEquals(122_400_000, TimeUtil.parseTime("1000", 1)); + assertEquals(124_200_000, TimeUtil.parseTime("1030", 1)); + assertEquals(158_400_000, TimeUtil.parseTime("2000", 1)); + assertEquals(160_200_000, TimeUtil.parseTime("2030", 1)); + } +} \ No newline at end of file