diff --git a/src/org/leolo/rail/nrd/AssoicationProcessor.java b/src/org/leolo/rail/nrd/AssoicationProcessor.java new file mode 100644 index 0000000..7d9e285 --- /dev/null +++ b/src/org/leolo/rail/nrd/AssoicationProcessor.java @@ -0,0 +1,83 @@ +package org.leolo.rail.nrd; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class AssoicationProcessor implements Runnable { + + private File fileName; + private Logger log = LogManager.getLogger(getClass()); + + AssoicationProcessor(File fileName){ + this.fileName = fileName; + } + + @Override + public void run() { + log.info("Processing {}", fileName.getName()); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + try( + BufferedReader br = new BufferedReader(new FileReader(fileName)); + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)") + ){ + while(true) { + String line = br.readLine(); + if(line==null) { + break; + } + JSONObject obj = new JSONObject(line); + pstmt.setString(1, obj.optString("main_train_uid")); + pstmt.setString(2, obj.optString("assoc_train_uid")); + try { + pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime())); + pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime())); + } catch (ParseException e1) { + log.warn("Unable to parse date! {}", e1.getMessage(), e1); + continue; + } + pstmt.setString(5, obj.optString("assoc_days")); + pstmt.setString(6, obj.optString("category")); + pstmt.setString(7, obj.optString("date_indicator")); + pstmt.setString(8, obj.optString("location")); + if(obj.optString("base_location_suffix")!=null) + pstmt.setString(9, obj.optString("base_location_suffix")); + else + pstmt.setNull(9, Types.VARCHAR); + if(obj.optString("assoc_location_suffix")!=null) + pstmt.setString(10, obj.optString("assoc_location_suffix")); + else + pstmt.setNull(10, Types.VARCHAR); + pstmt.setString(11, obj.optString("diagram_type")); + pstmt.setString(12, obj.optString("CIF_stp_indicator")); + pstmt.addBatch(); + } + pstmt.executeBatch(); + conn.commit(); + log.info("Batch committed."); + } catch (FileNotFoundException e) { + log.error(e.getMessage(), e); + } catch (IOException e) { + log.error(e.getMessage(), e); + } catch (SQLException e) { + log.error(e.getMessage(), e); + } + if(!fileName.delete()) { + log.warn("Unable to delete {}", fileName.getName()); + } + } + +} diff --git a/src/org/leolo/rail/nrd/Constants.java b/src/org/leolo/rail/nrd/Constants.java index 4f1902b..7d22adc 100644 --- a/src/org/leolo/rail/nrd/Constants.java +++ b/src/org/leolo/rail/nrd/Constants.java @@ -1,6 +1,8 @@ package org.leolo.rail.nrd; public class Constants { - public static final int BATCH_SIZE = 10000; + public static final int BATCH_SIZE = 1000; + + @Deprecated public static final int MAX_QUEUE_SIZE = 40000; } diff --git a/src/org/leolo/rail/nrd/FileLoader.java b/src/org/leolo/rail/nrd/FileLoader.java index 7720c99..f893ddf 100644 --- a/src/org/leolo/rail/nrd/FileLoader.java +++ b/src/org/leolo/rail/nrd/FileLoader.java @@ -1,26 +1,17 @@ package org.leolo.rail.nrd; import java.io.BufferedReader; -import java.io.FileInputStream; +import java.io.File; +import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; -import java.net.Authenticator; -import java.net.PasswordAuthentication; import java.net.URL; import java.net.URLConnection; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.Base64; -import java.util.Hashtable; -import java.util.Map; -import java.util.Properties; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import java.util.zip.ZipException; @@ -69,40 +60,52 @@ public class FileLoader { System.exit(4); } log.info("Connected!"); + File tempDir = new File(ConfigurationManager.getInstance().getProperty("file.temp_dir", ".")+"/nrdg"); + if(!tempDir.exists()) { + tempDir.mkdirs(); + } DatabaseManager.getInstance().clear(); + int countA=0, countT=0, countS=0; + int batchA=0, batchT=0, batchS=0; try( GZIPInputStream gis = new GZIPInputStream(fis); - BufferedReader br = new BufferedReader(new InputStreamReader(gis)) + BufferedReader br = new BufferedReader(new InputStreamReader(gis)); ){ + PrintWriter ass = new PrintWriter(new File(tempDir, "ass_0")); + PrintWriter tip = new PrintWriter(new File(tempDir, "tip_0")); + PrintWriter sch = new PrintWriter(new File(tempDir, "sch_0")); int count = 0; - AssoicationHandler asso = new AssoicationHandler(); - TiplocHandler tiploc = new TiplocHandler(); - ScheduleHandler schedule = new ScheduleHandler(); while(true) { String line = br.readLine(); if(line==null) { break; } count++; - if(asso.getQueueSize()+tiploc.getQueueSize()+schedule.getQueueSize() > Constants.MAX_QUEUE_SIZE) { - synchronized(SYNC_TOKEN) { - try { - SYNC_TOKEN.wait(); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - } - } - } JSONObject obj = new JSONObject(line); String objectType = obj.keys().next(); if("JsonTimetableV1".equals(objectType)) { }else if("JsonAssociationV1".equals(objectType)){ - asso.add(obj.getJSONObject("JsonAssociationV1")); + ass.println(obj.getJSONObject("JsonAssociationV1")); + countA++; + if(countA%Constants.BATCH_SIZE==0) { + ass.close(); + ass = new PrintWriter(new File(tempDir, "ass_"+(++batchA))); + } }else if("TiplocV1".equals(objectType)){ - tiploc.add(obj.getJSONObject("TiplocV1")); + tip.println(obj.getJSONObject("TiplocV1")); + countT++; + if(countT%Constants.BATCH_SIZE==0) { + tip.close(); + tip = new PrintWriter(new File(tempDir, "tip_"+(++batchT))); + } }else if("JsonScheduleV1".equals(objectType)){ - schedule.add(obj.getJSONObject("JsonScheduleV1")); + sch.println(obj.getJSONObject("JsonScheduleV1")); + countS++; + if(countS%Constants.BATCH_SIZE==0) { + sch.close(); + sch = new PrintWriter(new File(tempDir, "sch_"+(++batchS))); + } }else if("EOF".equals(objectType)){ //Nothing to do }else { @@ -111,16 +114,15 @@ public class FileLoader { } } + ass.close(); + tip.close(); + sch.close(); log.info("Total count : {}", count); - asso.shutdownAndWait(); - tiploc.shutdownAndWait(); - schedule.shutdownAndWait(); }catch(ZipException e) { log.error(e.getMessage(), e); captureFile(); }catch(IOException e) { log.error(e.getMessage(), e); -// log.error(e.) System.exit(1); } try { @@ -129,7 +131,34 @@ public class FileLoader { log.fatal(e.getMessage(), e); System.exit(1); } + log.info("Done reading. Dispatching now. {}/{}/{}[{}/{}/{}]", countA, countT, countS, batchA, batchT, batchS); + ExecutorService threadPool = java.util.concurrent.Executors.newFixedThreadPool(Integer.parseInt(ConfigurationManager.getInstance().getProperty("thread", "20"))); + for(int i=0;i<=batchA;i++) { + log.info("Queued Assoication - {}", i); + threadPool.execute(new AssoicationProcessor(new File(tempDir, "ass_"+i))); + } + for(int i=0;i<=batchT;i++) { + log.info("Queued Tiploc - {}", i); + threadPool.execute(new TiplocProcessor(new File(tempDir, "tip_"+i))); + } + for(int i=0;i<=batchS;i++) { + log.info("Queued Schedule - {}", i); + threadPool.execute(new ScheduleProcessor(new File(tempDir, "sch_"+i))); + } + threadPool.shutdown(); + try { + threadPool.awaitTermination(3, TimeUnit.HOURS); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + if(!threadPool.isTerminated()) { + log.error("Some job cannot be finished!"); + System.exit(50); + } DatabaseManager.getInstance().shutdown(); + if(!tempDir.delete()) { + log.warn("Unable to remove temp dir!"); + } log.info("Job finished!"); } diff --git a/src/org/leolo/rail/nrd/ScheduleProcessor.java b/src/org/leolo/rail/nrd/ScheduleProcessor.java new file mode 100644 index 0000000..28b450d --- /dev/null +++ b/src/org/leolo/rail/nrd/ScheduleProcessor.java @@ -0,0 +1,192 @@ +package org.leolo.rail.nrd; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONObject; + +public class ScheduleProcessor implements Runnable { + + private File fileName; + private Logger log = LogManager.getLogger(getClass()); + + ScheduleProcessor(File fileName){ + this.fileName = fileName; + } + private static int trainCount = 0; + + private static synchronized String getTUID() { + String id = Integer.toString(++trainCount, 36); + StringBuffer sb = new StringBuffer(); + for(int i = id.length();i<8;i++) { + sb.append("0"); + } + sb.append(id); + return sb.toString(); + } + + private Time parseTime(String time) { + try { + int hour = Integer.parseInt(time.substring(0, 2)); + int min = Integer.parseInt(time.substring(2, 4)); + boolean halfMin = time.length()>4 && 'H' == time.charAt(4); + return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0)); + }catch(RuntimeException e) { + log.error("For time \"{}\":{}", time, e.getMessage(), e); + } + return null; + } + + private String parseSTime(String time) { + if("H".equals(time)) + return "00:00:30"; + int min = Integer.parseInt(time.substring(0,1)); + boolean half = time.length()>1 && 'H' == time.charAt(1); + return "00:0"+min+(half?":30":":00"); + } + + private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{ + if(time==null||"".equals(time)) { + stmt.setNull(pos, Types.TIME); + }else { + stmt.setTime(pos, parseTime(time)); + } + } + + private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{ + if(time==null||"".equals(time)) { + stmt.setNull(pos, Types.TIME); + }else { + stmt.setString(pos, parseSTime(time)); + } + } + + @Override + public void run() { + log.info("Processing {}", fileName.getName()); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + try( + BufferedReader br = new BufferedReader(new FileReader(fileName)); + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?,?)"); + PreparedStatement sDetail = conn.prepareStatement("INSERT INTO train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); + PreparedStatement sError = conn.prepareStatement("INSERT INTO train_error VALUES (?,?)"); + PreparedStatement sLoca = conn.prepareStatement("INSERT INTO train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + ){ + while(true) { + String line = br.readLine(); + if(line==null) { + break; + } + String trainUID = getTUID(); + JSONObject obj = new JSONObject(line);sMain.setString(1, trainUID); + sMain.setString(2, obj.optString("CIF_train_uid")); + try { + sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime())); + sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime())); + } catch (ParseException e1) { + log.error(e1.getMessage(), e1); + continue; + } + sMain.setString(5, obj.optString("schedule_days_runs")); + sMain.setString(6, obj.optString("train_status")); + sMain.setString(7, obj.optString("atoc_code")); + sMain.addBatch(); + JSONObject detail = obj.optJSONObject("schedule_segment"); + setString(sDetail, 1, trainUID); + setString(sDetail, 2, detail.optString("CIF_train_category")); + setString(sDetail, 3, detail.optString("signalling_id")); + setString(sDetail, 4, detail.optString("CIF_headcode")); + setString(sDetail, 5, detail.optString("CIF_course_indicator")); + setString(sDetail, 6, detail.optString("CIF_train_service_code")); + setString(sDetail, 7, detail.optString("CIF_business_sector")); + setString(sDetail, 8, detail.optString("CIF_power_type")); + setString(sDetail, 9, detail.optString("CIF_timing_load")); + setInt (sDetail,10, detail.optString("CIF_speed")); + setString(sDetail,11, detail.optString("CIF_operating_characteristics")); + setString(sDetail,12, detail.optString("CIF_sleepers")); + setString(sDetail,13, detail.optString("CIF_reservations")); + setString(sDetail,14, detail.optString("CIF_connection_indicator")); + setString(sDetail,15, detail.optString("CIF_catering_code")); + setString(sDetail,16, detail.optString("CIF_service_branding")); + + sDetail.addBatch(); + JSONArray locations = detail.optJSONArray("schedule_location"); + if(locations!=null) { + for(int i=0;i