From 73a6150bf2bc18b0a9fa9aaf846936b57bfe018e Mon Sep 17 00:00:00 2001 From: LO Kam Tao Leo Date: Sun, 6 Mar 2022 14:54:57 +0000 Subject: [PATCH] Restructure packages --- src/org/leolo/rail/LongTermScheduleJob.java | 183 ----------------------- src/org/leolo/rail/NRDataDamon.java | 1 + src/org/leolo/rail/job/DayEndJob.java | 15 ++ src/org/leolo/rail/job/LongTermScheduleJob.java | 190 ++++++++++++++++++++++++ 4 files changed, 206 insertions(+), 183 deletions(-) delete mode 100644 src/org/leolo/rail/LongTermScheduleJob.java create mode 100644 src/org/leolo/rail/job/DayEndJob.java create mode 100644 src/org/leolo/rail/job/LongTermScheduleJob.java diff --git a/src/org/leolo/rail/LongTermScheduleJob.java b/src/org/leolo/rail/LongTermScheduleJob.java deleted file mode 100644 index 5597468..0000000 --- a/src/org/leolo/rail/LongTermScheduleJob.java +++ /dev/null @@ -1,183 +0,0 @@ -package org.leolo.rail; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.net.URL; -import java.net.URLConnection; -import java.sql.SQLException; -import java.util.Base64; -import java.util.Date; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; -import java.util.zip.ZipException; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; -import org.leolo.rail.dao.MetadataDao; -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.SchedulerException; -import org.quartz.Trigger; -import org.quartz.TriggerKey; -import org.quartz.impl.matchers.GroupMatcher; - -public class LongTermScheduleJob implements Job{ - - private Logger log = LogManager.getLogger(LongTermScheduleJob.class); - - @Override - public void execute(JobExecutionContext context) throws JobExecutionException { - MetadataDao mDao = new MetadataDao(); - log.atInfo().log("LongTermScheduleJob started."); - - try { - TriggerKey triggerKey = TriggerKey.triggerKey(Constants.Scheduler.LTSJ_CRON_TRIGGER, Constants.Scheduler.DEFAULT_GROUP_NAME); - Trigger trigger = context.getScheduler().getTrigger(triggerKey); - Date nextFireTime = trigger.getNextFireTime(); - log.info("Next fire at "+nextFireTime); - long timeToNextFire = nextFireTime.getTime() - System.currentTimeMillis(); - if(timeToNextFire > Constants.Scheduler.LTR_SKIP_THRESHOLD && !ConfigurationManager.getInstance().getBoolean("general.debug", false)) { - log.always().log("Too close to next fire time. Skipping"); - return; - } - } catch (SchedulerException e) { - log.atError().log(e.getMessage(),e); - } - InputStream fis = null; - try { - URL url = new URL(ConfigurationManager.getInstance().getProperty("network.schedule.path")); - URLConnection conn = url.openConnection(); - String userpwd = ConfigurationManager.getInstance().getProperty("network.user")+":"+ConfigurationManager.getInstance().getProperty("network.pwd"); - conn.addRequestProperty("Authorization", "Basic "+Base64.getEncoder().encodeToString(userpwd.getBytes())); - conn.connect(); - fis = conn.getInputStream(); - }catch(IOException e) { - log.error(e.getMessage(), e); - System.exit(4); - } - log.info("Connected!"); - File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg"); - if(!tempDir.exists()) { - tempDir.mkdirs(); - } - int countA=0, countT=0, countS=0; - int batchA=0, batchT=0, batchS=0; - try( - GZIPInputStream gis = new GZIPInputStream(fis); - BufferedReader br = new BufferedReader(new InputStreamReader(gis)); - ){ - PrintWriter ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_0")))); - PrintWriter tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_0")))); - PrintWriter sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_0")))); - int count = 0; - while(true) { - String line = br.readLine(); - if(line==null) { - break; - } - count++; - JSONObject obj = new JSONObject(line); - String objectType = obj.keys().next(); - if("JsonTimetableV1".equals(objectType)) { - - }else if("JsonAssociationV1".equals(objectType)){ - ass.println(obj.getJSONObject("JsonAssociationV1")); - countA++; - if(countA%Constants.BATCH_SIZE==0) { - ass.close(); - if(batchA%50==0) - log.info("Created batch for assoication"); - ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA))))); - } - }else if("TiplocV1".equals(objectType)){ - tip.println(obj.getJSONObject("TiplocV1")); - countT++; - if(countT%Constants.BATCH_SIZE==0) { - tip.close(); - if(batchT%50==0) - log.info("Created batch for TIPLOC"); - tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT))))); - } - }else if("JsonScheduleV1".equals(objectType)){ - sch.println(obj.getJSONObject("JsonScheduleV1")); - countS++; - if(countS%Constants.BATCH_SIZE==0) { - sch.close(); - if(batchS%50==0) - log.info("Created batch for schedule"); - sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS))))); - } - }else if("EOF".equals(objectType)){ - //Nothing to do - }else { - log.fatal("Unhandled type {}", objectType); - System.exit(2); - } - - } - ass.close(); - tip.close(); - sch.close(); - - log.info("Created batch for assoication"); - log.info("Created batch for TIPLOC"); - log.info("Created batch for schedule"); - log.info("Total count : {}", count); - }catch(ZipException e) { - log.error(e.getMessage(), e); - }catch(IOException e) { - log.error(e.getMessage(), e); - System.exit(1); - } - try { - fis.close(); - }catch(IOException e) { - log.fatal(e.getMessage(), e); - System.exit(1); - } - log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); - ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(1); - for(int i=0;i<=batchA;i++) { - if(i%50==0) - log.info("Queued Assoication - {}", i); - threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); - } - for(int i=0;i<=batchT;i++) { - if(i%50==0) - log.info("Queued Tiploc - {}", i); - threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); - } - for(int i=0;i<=batchS;i++) { - if(i%50==0) - log.info("Queued Schedule - {}", i); - threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); - } - log.info("All entry queued."); - threadPool.shutdown(); - try { - threadPool.awaitTermination(3, TimeUnit.HOURS); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - } - if(!threadPool.isTerminated()) { - log.error("Some job cannot be finished!"); - System.exit(50); - } - try { - mDao.setDate(Constants.Metadata.LAST_NETOWRK_RAIL_SCHEDULE_UPD, new Date()); - } catch (SQLException e) { - log.atError().log(e.getMessage(), e); - } - log.atInfo().log("LTS job done!"); - } - -} diff --git a/src/org/leolo/rail/NRDataDamon.java b/src/org/leolo/rail/NRDataDamon.java index ad60022..8482b97 100644 --- a/src/org/leolo/rail/NRDataDamon.java +++ b/src/org/leolo/rail/NRDataDamon.java @@ -12,6 +12,7 @@ import org.apache.activemq.transport.stomp.StompFrame; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.leolo.rail.dao.MetadataDao; +import org.leolo.rail.job.LongTermScheduleJob; import org.quartz.CronScheduleBuilder; import org.quartz.DateBuilder; import org.quartz.JobBuilder; diff --git a/src/org/leolo/rail/job/DayEndJob.java b/src/org/leolo/rail/job/DayEndJob.java new file mode 100644 index 0000000..c3bf3af --- /dev/null +++ b/src/org/leolo/rail/job/DayEndJob.java @@ -0,0 +1,15 @@ +package org.leolo.rail.job; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class DayEndJob implements Job { + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + // TODO Auto-generated method stub + + } + +} diff --git a/src/org/leolo/rail/job/LongTermScheduleJob.java b/src/org/leolo/rail/job/LongTermScheduleJob.java new file mode 100644 index 0000000..dc31e7d --- /dev/null +++ b/src/org/leolo/rail/job/LongTermScheduleJob.java @@ -0,0 +1,190 @@ +package org.leolo.rail.job; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.URL; +import java.net.URLConnection; +import java.sql.SQLException; +import java.util.Base64; +import java.util.Date; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.leolo.rail.AssoicationProcessor; +import org.leolo.rail.ConfigurationManager; +import org.leolo.rail.Constants; +import org.leolo.rail.ScheduleProcessor; +import org.leolo.rail.TiplocProcessor; +import org.leolo.rail.Constants.Metadata; +import org.leolo.rail.Constants.Scheduler; +import org.leolo.rail.dao.MetadataDao; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerKey; +import org.quartz.impl.matchers.GroupMatcher; + +public class LongTermScheduleJob implements Job{ + + private Logger log = LogManager.getLogger(LongTermScheduleJob.class); + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + MetadataDao mDao = new MetadataDao(); + log.atInfo().log("LongTermScheduleJob started."); + + try { + TriggerKey triggerKey = TriggerKey.triggerKey(Constants.Scheduler.LTSJ_CRON_TRIGGER, Constants.Scheduler.DEFAULT_GROUP_NAME); + Trigger trigger = context.getScheduler().getTrigger(triggerKey); + Date nextFireTime = trigger.getNextFireTime(); + log.info("Next fire at "+nextFireTime); + long timeToNextFire = nextFireTime.getTime() - System.currentTimeMillis(); + if(timeToNextFire > Constants.Scheduler.LTR_SKIP_THRESHOLD && !ConfigurationManager.getInstance().getBoolean("general.debug", false)) { + log.always().log("Too close to next fire time. Skipping"); + return; + } + } catch (SchedulerException e) { + log.atError().log(e.getMessage(),e); + } + InputStream fis = null; + try { + URL url = new URL(ConfigurationManager.getInstance().getProperty("network.schedule.path")); + URLConnection conn = url.openConnection(); + String userpwd = ConfigurationManager.getInstance().getProperty("network.user")+":"+ConfigurationManager.getInstance().getProperty("network.pwd"); + conn.addRequestProperty("Authorization", "Basic "+Base64.getEncoder().encodeToString(userpwd.getBytes())); + conn.connect(); + fis = conn.getInputStream(); + }catch(IOException e) { + log.error(e.getMessage(), e); + System.exit(4); + } + log.info("Connected!"); + File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg"); + if(!tempDir.exists()) { + tempDir.mkdirs(); + } + int countA=0, countT=0, countS=0; + int batchA=0, batchT=0, batchS=0; + try( + GZIPInputStream gis = new GZIPInputStream(fis); + BufferedReader br = new BufferedReader(new InputStreamReader(gis)); + ){ + PrintWriter ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_0")))); + PrintWriter tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_0")))); + PrintWriter sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_0")))); + int count = 0; + while(true) { + String line = br.readLine(); + if(line==null) { + break; + } + count++; + JSONObject obj = new JSONObject(line); + String objectType = obj.keys().next(); + if("JsonTimetableV1".equals(objectType)) { + + }else if("JsonAssociationV1".equals(objectType)){ + ass.println(obj.getJSONObject("JsonAssociationV1")); + countA++; + if(countA%Constants.BATCH_SIZE==0) { + ass.close(); + if(batchA%50==0) + log.info("Created batch for assoication"); + ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA))))); + } + }else if("TiplocV1".equals(objectType)){ + tip.println(obj.getJSONObject("TiplocV1")); + countT++; + if(countT%Constants.BATCH_SIZE==0) { + tip.close(); + if(batchT%50==0) + log.info("Created batch for TIPLOC"); + tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT))))); + } + }else if("JsonScheduleV1".equals(objectType)){ + sch.println(obj.getJSONObject("JsonScheduleV1")); + countS++; + if(countS%Constants.BATCH_SIZE==0) { + sch.close(); + if(batchS%50==0) + log.info("Created batch for schedule"); + sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS))))); + } + }else if("EOF".equals(objectType)){ + //Nothing to do + }else { + log.fatal("Unhandled type {}", objectType); + System.exit(2); + } + + } + ass.close(); + tip.close(); + sch.close(); + + log.info("Created batch for assoication"); + log.info("Created batch for TIPLOC"); + log.info("Created batch for schedule"); + log.info("Total count : {}", count); + }catch(ZipException e) { + log.error(e.getMessage(), e); + }catch(IOException e) { + log.error(e.getMessage(), e); + System.exit(1); + } + try { + fis.close(); + }catch(IOException e) { + log.fatal(e.getMessage(), e); + System.exit(1); + } + log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); + ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(1); + for(int i=0;i<=batchA;i++) { + if(i%50==0) + log.info("Queued Assoication - {}", i); + threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); + } + for(int i=0;i<=batchT;i++) { + if(i%50==0) + log.info("Queued Tiploc - {}", i); + threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); + } + for(int i=0;i<=batchS;i++) { + if(i%50==0) + log.info("Queued Schedule - {}", i); + threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); + } + log.info("All entry queued."); + threadPool.shutdown(); + try { + threadPool.awaitTermination(3, TimeUnit.HOURS); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + if(!threadPool.isTerminated()) { + log.error("Some job cannot be finished!"); + System.exit(50); + } + try { + mDao.setDate(Constants.Metadata.LAST_NETOWRK_RAIL_SCHEDULE_UPD, new Date()); + } catch (SQLException e) { + log.atError().log(e.getMessage(), e); + } + log.atInfo().log("LTS job done!"); + } + +}