package org.leolo.rail.job; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.URL; import java.net.URLConnection; import java.sql.SQLException; import java.util.Base64; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; import org.leolo.rail.AssoicationProcessor; import org.leolo.rail.ConfigurationManager; import org.leolo.rail.Constants; import org.leolo.rail.ScheduleProcessor; import org.leolo.rail.TiplocProcessor; import org.leolo.rail.Constants.Metadata; import org.leolo.rail.Constants.Scheduler; import org.leolo.rail.dao.MetadataDao; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; public class LongTermScheduleJob implements Job{ private Logger log = LogManager.getLogger(LongTermScheduleJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { MetadataDao mDao = new MetadataDao(); log.atInfo().log("LongTermScheduleJob started."); try { TriggerKey triggerKey = TriggerKey.triggerKey(Constants.Scheduler.LTSJ_CRON_TRIGGER, Constants.Scheduler.DEFAULT_GROUP_NAME); Trigger trigger = context.getScheduler().getTrigger(triggerKey); Date nextFireTime = trigger.getNextFireTime(); log.info("Next fire at "+nextFireTime); long timeToNextFire = nextFireTime.getTime() - System.currentTimeMillis(); if(timeToNextFire > Constants.Scheduler.LTR_SKIP_THRESHOLD && !ConfigurationManager.getInstance().getBoolean("general.debug", false)) { log.always().log("Too close to next fire time. Skipping"); return; } } catch (SchedulerException e) { log.atError().log(e.getMessage(),e); } InputStream fis = null; try { URL url = new URL(ConfigurationManager.getInstance().getProperty("network.schedule.path")); URLConnection conn = url.openConnection(); String userpwd = ConfigurationManager.getInstance().getProperty("network.user")+":"+ConfigurationManager.getInstance().getProperty("network.pwd"); conn.addRequestProperty("Authorization", "Basic "+Base64.getEncoder().encodeToString(userpwd.getBytes())); conn.connect(); fis = conn.getInputStream(); }catch(IOException e) { log.error(e.getMessage(), e); System.exit(4); } log.info("Connected!"); File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg"); if(!tempDir.exists()) { tempDir.mkdirs(); } int countA=0, countT=0, countS=0; int batchA=0, batchT=0, batchS=0; try( GZIPInputStream gis = new GZIPInputStream(fis); BufferedReader br = new BufferedReader(new InputStreamReader(gis)); ){ PrintWriter ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_0")))); PrintWriter tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_0")))); PrintWriter sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_0")))); int count = 0; while(true) { String line = br.readLine(); if(line==null) { break; } count++; JSONObject obj = new JSONObject(line); String objectType = obj.keys().next(); if("JsonTimetableV1".equals(objectType)) { }else if("JsonAssociationV1".equals(objectType)){ ass.println(obj.getJSONObject("JsonAssociationV1")); countA++; if(countA%Constants.BATCH_SIZE==0) { ass.close(); if(batchA%50==0) log.info("Created batch for assoication"); ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA))))); } }else if("TiplocV1".equals(objectType)){ tip.println(obj.getJSONObject("TiplocV1")); countT++; if(countT%Constants.BATCH_SIZE==0) { tip.close(); if(batchT%50==0) log.info("Created batch for TIPLOC"); tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT))))); } }else if("JsonScheduleV1".equals(objectType)){ sch.println(obj.getJSONObject("JsonScheduleV1")); countS++; if(countS%Constants.BATCH_SIZE==0) { sch.close(); if(batchS%50==0) log.info("Created batch for schedule"); sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS))))); } }else if("EOF".equals(objectType)){ //Nothing to do }else { log.fatal("Unhandled type {}", objectType); System.exit(2); } } ass.close(); tip.close(); sch.close(); log.info("Created batch for assoication"); log.info("Created batch for TIPLOC"); log.info("Created batch for schedule"); log.info("Total count : {}", count); }catch(ZipException e) { log.error(e.getMessage(), e); }catch(IOException e) { log.error(e.getMessage(), e); System.exit(1); } try { fis.close(); }catch(IOException e) { log.fatal(e.getMessage(), e); System.exit(1); } log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(1); for(int i=0;i<=batchA;i++) { if(i%50==0) log.info("Queued Assoication - {}", i); threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); } for(int i=0;i<=batchT;i++) { if(i%50==0) log.info("Queued Tiploc - {}", i); threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); } for(int i=0;i<=batchS;i++) { if(i%50==0) log.info("Queued Schedule - {}", i); threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); } log.info("All entry queued."); threadPool.shutdown(); try { threadPool.awaitTermination(3, TimeUnit.HOURS); } catch (InterruptedException e) { log.error(e.getMessage(), e); } if(!threadPool.isTerminated()) { log.error("Some job cannot be finished!"); System.exit(50); } try { mDao.setDate(Constants.Metadata.LAST_NETOWRK_RAIL_SCHEDULE_UPD, new Date()); } catch (SQLException e) { log.atError().log(e.getMessage(), e); } log.atInfo().log("LTS job done!"); } }