diff --git a/src/org/leolo/rail/nrd/AssoicationProcessor.java b/src/org/leolo/rail/nrd/AssoicationProcessor.java index 44b0be6..f08c88c 100644 --- a/src/org/leolo/rail/nrd/AssoicationProcessor.java +++ b/src/org/leolo/rail/nrd/AssoicationProcessor.java @@ -70,7 +70,7 @@ public class AssoicationProcessor implements Runnable { } pstmt.executeBatch(); conn.commit(); - log.info("Batch committed."); + log.info("Batch {} committed.", fileName.getName()); } catch (FileNotFoundException e) { log.error(e.getMessage(), e); } catch (IOException e) { diff --git a/src/org/leolo/rail/nrd/FileLoader.java b/src/org/leolo/rail/nrd/FileLoader.java index 643ed2b..8c09ddf 100644 --- a/src/org/leolo/rail/nrd/FileLoader.java +++ b/src/org/leolo/rail/nrd/FileLoader.java @@ -94,6 +94,8 @@ public class FileLoader { countA++; if(countA%Constants.BATCH_SIZE==0) { ass.close(); + if(batchA%50==0) + log.info("Created batch for assoication"); ass = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "ass_"+(++batchA))))); } }else if("TiplocV1".equals(objectType)){ @@ -101,6 +103,8 @@ public class FileLoader { countT++; if(countT%Constants.BATCH_SIZE==0) { tip.close(); + if(batchT%50==0) + log.info("Created batch for TIPLOC"); tip = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "tip_"+(++batchT))))); } }else if("JsonScheduleV1".equals(objectType)){ @@ -108,6 +112,8 @@ public class FileLoader { countS++; if(countS%Constants.BATCH_SIZE==0) { sch.close(); + if(batchS%50==0) + log.info("Created batch for schedule"); sch = new PrintWriter(new GZIPOutputStream(new FileOutputStream(new File(tempDir, "sch_"+(++batchS))))); } }else if("EOF".equals(objectType)){ @@ -121,6 +127,10 @@ public class FileLoader { ass.close(); tip.close(); sch.close(); + + log.info("Created batch for assoication"); + log.info("Created batch for TIPLOC"); + log.info("Created batch for schedule"); log.info("Total count : {}", count); }catch(ZipException e) { log.error(e.getMessage(), e); @@ -138,17 +148,21 @@ public class FileLoader { log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20"))); for(int i=0;i<=batchA;i++) { - log.info("Queued Assoication - {}", i); + if(i%50==0) + log.info("Queued Assoication - {}", i); threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); } for(int i=0;i<=batchT;i++) { - log.info("Queued Tiploc - {}", i); + if(i%50==0) + log.info("Queued Tiploc - {}", i); threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); } for(int i=0;i<=batchS;i++) { - log.info("Queued Schedule - {}", i); + if(i%50==0) + log.info("Queued Schedule - {}", i); threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); } + log.info("All entry queued."); threadPool.shutdown(); try { threadPool.awaitTermination(3, TimeUnit.HOURS); diff --git a/src/org/leolo/rail/nrd/ScheduleProcessor.java b/src/org/leolo/rail/nrd/ScheduleProcessor.java index ad7c102..7063ee8 100644 --- a/src/org/leolo/rail/nrd/ScheduleProcessor.java +++ b/src/org/leolo/rail/nrd/ScheduleProcessor.java @@ -12,8 +12,14 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Time; import java.sql.Types; +import java.text.DateFormat; +import java.text.FieldPosition; import java.text.ParseException; +import java.text.ParsePosition; import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Hashtable; +import java.util.Random; import java.util.zip.GZIPInputStream; import org.apache.logging.log4j.LogManager; @@ -25,6 +31,30 @@ public class ScheduleProcessor implements Runnable { private File fileName; private Logger log = LogManager.getLogger(getClass()); + private static TUIDDateFormat tdf = new TUIDDateFormat(); + + public static final String SEQ_ID = "0123456789ABCDEFGHJKLMNPRSTUVWXY"; + private static Random r = new Random(); + private static class TUIDDateFormat extends DateFormat{ + + public static final String MONTH_ID = "MBTQPHSONDUE"; + public static final String DAY_ID = "0123456789ABCDEFGHJKLMNPRSTUVWX"; + + + @Override + public StringBuffer format(Date arg0, StringBuffer arg1, FieldPosition arg2) { + // TODO Auto-generated method stub + arg1.append(MONTH_ID.charAt(arg0.getMonth())); + arg1.append(DAY_ID.charAt(arg0.getDate()-1)); + return arg1; + } + + @Override + public Date parse(String arg0, ParsePosition arg1) { + throw new UnsupportedOperationException(); + } + + } ScheduleProcessor(File fileName){ this.fileName = fileName; @@ -41,12 +71,27 @@ public class ScheduleProcessor implements Runnable { return sb.toString(); } - private Time parseTime(String time) { + private static Hashtable uidMap = new Hashtable<>(); + + private static synchronized String getTUID(String uid, Date startDate, Date endDate) { + String buid = uid+tdf.format(startDate)+tdf.format(endDate); + int count = 0; + if(uidMap.containsKey(buid)) { + count = uidMap.get(buid); + count +=1; + uidMap.put(buid, count); + }else { + uidMap.put(buid, 0); + } + return buid+SEQ_ID.charAt(count); + } + + private String parseTime(String time) { try { int hour = Integer.parseInt(time.substring(0, 2)); int min = Integer.parseInt(time.substring(2, 4)); boolean halfMin = time.length()>4 && 'H' == time.charAt(4); - return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0)); + return hour+":"+min+(halfMin?":30":":00"); }catch(RuntimeException e) { log.error("For time \"{}\":{}", time, e.getMessage(), e); } @@ -65,7 +110,7 @@ public class ScheduleProcessor implements Runnable { if(time==null||"".equals(time)) { stmt.setNull(pos, Types.TIME); }else { - stmt.setTime(pos, parseTime(time)); + stmt.setString(pos, parseTime(time)); } } @@ -94,16 +139,20 @@ public class ScheduleProcessor implements Runnable { if(line==null) { break; } - String trainUID = getTUID(); - JSONObject obj = new JSONObject(line);sMain.setString(1, trainUID); + String trainUID; + JSONObject obj = new JSONObject(line); sMain.setString(2, obj.optString("CIF_train_uid")); try { - sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime())); - sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime())); + java.sql.Date startDate = new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime()); + java.sql.Date endDate = new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime()); + trainUID = getTUID(obj.optString("CIF_train_uid"), startDate, endDate); + sMain.setDate(3, startDate); + sMain.setDate(4, endDate); } catch (ParseException e1) { log.error(e1.getMessage(), e1); continue; } + sMain.setString(1, trainUID); sMain.setString(5, obj.optString("schedule_days_runs")); sMain.setString(6, obj.optString("train_status")); sMain.setString(7, obj.optString("atoc_code")); @@ -165,7 +214,7 @@ public class ScheduleProcessor implements Runnable { sError.executeBatch(); sLoca.executeBatch(); conn.commit(); - log.info("Batch committed."); + log.info("Batch {} committed.", fileName.getName()); } catch (FileNotFoundException e) { log.error(e.getMessage(), e); } catch (IOException e) { diff --git a/src/org/leolo/rail/nrd/TiplocProcessor.java b/src/org/leolo/rail/nrd/TiplocProcessor.java index 02ace43..bf59427 100644 --- a/src/org/leolo/rail/nrd/TiplocProcessor.java +++ b/src/org/leolo/rail/nrd/TiplocProcessor.java @@ -69,7 +69,7 @@ public class TiplocProcessor implements Runnable { } pstmt.executeBatch(); conn.commit(); - log.info("Batch committed."); + log.info("Batch {} committed.", fileName.getName()); } catch (FileNotFoundException e) { log.error(e.getMessage(), e); } catch (IOException e) {