From 35c7e72c499e09bcf506545e484d47ee4a0e9a31 Mon Sep 17 00:00:00 2001 From: LO Kam Tao Leo Date: Wed, 9 Feb 2022 10:34:40 +0000 Subject: [PATCH] Handle a predownload file --- src/org/leolo/rail/nrd/AssoicationHandler.java | 15 ++- src/org/leolo/rail/nrd/Constants.java | 4 +- src/org/leolo/rail/nrd/DatabaseManager.java | 4 +- src/org/leolo/rail/nrd/FileLoader.java | 2 + src/org/leolo/rail/nrd/ScheduleHandler.java | 164 ++++++++++++++++++++++--- src/org/leolo/rail/nrd/TiplocHandler.java | 25 +++- 6 files changed, 186 insertions(+), 28 deletions(-) diff --git a/src/org/leolo/rail/nrd/AssoicationHandler.java b/src/org/leolo/rail/nrd/AssoicationHandler.java index 0124501..78534d1 100644 --- a/src/org/leolo/rail/nrd/AssoicationHandler.java +++ b/src/org/leolo/rail/nrd/AssoicationHandler.java @@ -27,7 +27,11 @@ public class AssoicationHandler implements Runnable{ } public void add(JSONObject obj) { - queue.add(obj); + if(obj!=null) { + queue.add(obj); + }else { + log.warn("Trying to add a null object!"); + } synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } @@ -43,11 +47,12 @@ public class AssoicationHandler implements Runnable{ public void shutdownAndWait() { shutdown(); while(true) { + log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null); + if(terminated) { break; } synchronized(SYNC_TOKEN) { - log.debug("Waiting for termination."); try { SYNC_TOKEN.notifyAll(); SYNC_TOKEN.wait(1000); @@ -57,15 +62,15 @@ public class AssoicationHandler implements Runnable{ } } } - + + private int currentSize = 0; + private int commitCount = 0; @Override public void run() { try( Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)") ){ - int currentSize = 0; - int commitCount = 0; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); while(true) { JSONObject obj = queue.poll(); diff --git a/src/org/leolo/rail/nrd/Constants.java b/src/org/leolo/rail/nrd/Constants.java index 5861841..e26fc4b 100644 --- a/src/org/leolo/rail/nrd/Constants.java +++ b/src/org/leolo/rail/nrd/Constants.java @@ -1,6 +1,6 @@ package org.leolo.rail.nrd; public class Constants { - public static final int BATCH_SIZE = 200; - public static final int MAX_QUEUE_SIZE = 2000; + public static final int BATCH_SIZE = 500; + public static final int MAX_QUEUE_SIZE = 20000; } diff --git a/src/org/leolo/rail/nrd/DatabaseManager.java b/src/org/leolo/rail/nrd/DatabaseManager.java index 9c2a47c..f05054d 100644 --- a/src/org/leolo/rail/nrd/DatabaseManager.java +++ b/src/org/leolo/rail/nrd/DatabaseManager.java @@ -87,7 +87,9 @@ public class DatabaseManager { stmt.execute("TRUNCATE TABLE train_assoc"); stmt.execute("TRUNCATE TABLE tiploc"); stmt.execute("TRUNCATE TABLE train_schedule"); - stmt.execute("TRUNCATE TABLE train_segment"); + stmt.execute("TRUNCATE TABLE train_schedule_detail"); + stmt.execute("TRUNCATE TABLE train_schedule_location"); + stmt.execute("TRUNCATE TABLE train_error"); }catch(SQLException e) { log.error(e.getMessage(), e); } diff --git a/src/org/leolo/rail/nrd/FileLoader.java b/src/org/leolo/rail/nrd/FileLoader.java index 0a943fe..64c3bda 100644 --- a/src/org/leolo/rail/nrd/FileLoader.java +++ b/src/org/leolo/rail/nrd/FileLoader.java @@ -101,6 +101,8 @@ public class FileLoader { } log.info("Total count : {}", count); asso.shutdownAndWait(); + tiploc.shutdownAndWait(); + schedule.shutdownAndWait(); }catch(IOException e) { log.error(e.getMessage(), e); System.exit(1); diff --git a/src/org/leolo/rail/nrd/ScheduleHandler.java b/src/org/leolo/rail/nrd/ScheduleHandler.java index 45ab0c8..e08d177 100644 --- a/src/org/leolo/rail/nrd/ScheduleHandler.java +++ b/src/org/leolo/rail/nrd/ScheduleHandler.java @@ -4,6 +4,8 @@ import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Time; +import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Hashtable; @@ -14,6 +16,7 @@ import java.util.Random; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.json.JSONArray; import org.json.JSONObject; public class ScheduleHandler implements Runnable{ @@ -30,7 +33,11 @@ public class ScheduleHandler implements Runnable{ } public void add(JSONObject obj) { - queue.add(obj); + if(obj!=null) { + queue.add(obj); + }else { + log.warn("Trying to add a null object!"); + } synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } @@ -50,7 +57,7 @@ public class ScheduleHandler implements Runnable{ break; } synchronized(SYNC_TOKEN) { - log.debug("Waiting for termination."); + log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null); try { SYNC_TOKEN.notifyAll(); SYNC_TOKEN.wait(1000); @@ -60,34 +67,94 @@ public class ScheduleHandler implements Runnable{ } } } + + private int currentSize = 0; + private int commitCount = 0; + private int trainCount = 0; + + private synchronized String getTUID() { + String id = Integer.toString(++trainCount, 36); + StringBuffer sb = new StringBuffer(); + for(int i = id.length();i<8;i++) { + sb.append("0"); + } + sb.append(id); + return sb.toString(); + } + + private Time parseTime(String time) { + try { + int hour = Integer.parseInt(time.substring(0, 2)); + int min = Integer.parseInt(time.substring(2, 4)); + boolean halfMin = time.length()>4 && 'H' == time.charAt(4); + return new Time(hour*3_600_000+min*60_000+(halfMin?30_000:0)); + }catch(RuntimeException e) { + log.error("For time \"{}\":{}", time, e.getMessage(), e); + } + return null; + } + + private String parseSTime(String time) { + if("H".equals(time)) + return "00:00:30"; + int min = Integer.parseInt(time.substring(0,1)); + boolean half = time.length()>1 && 'H' == time.charAt(1); + return "00:0"+min+(half?":30":":00"); + } + + private void setTime(PreparedStatement stmt, int pos, String time) throws SQLException{ + if(time==null||"".equals(time)) { + stmt.setNull(pos, Types.TIME); + }else { + stmt.setTime(pos, parseTime(time)); + } + } + + private void setSTime(PreparedStatement stmt, int pos, String time) throws SQLException{ + if(time==null||"".equals(time)) { + stmt.setNull(pos, Types.TIME); + }else { + stmt.setString(pos, parseSTime(time)); + } + } @Override public void run() { try( Connection conn = DatabaseManager.getInstance().getConnection(); - PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?)"); - PreparedStatement sSeg = conn.prepareStatement("INSERT INTO train_segment VALUES (?,?)") + PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?,?)"); + PreparedStatement sDetail = conn.prepareStatement("INSERT INTO train_schedule_detail VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); + PreparedStatement sError = conn.prepareStatement("INSERT INTO train_error VALUES (?,?)"); + PreparedStatement sLoca = conn.prepareStatement("INSERT INTO train_schedule_location VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)") ){ - int currentSize = 0; - int commitCount = 0; - int trainCount = 0; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); while(true) { JSONObject obj = queue.poll(); if(shutdown && (queue.isEmpty()||obj==null)) { sMain.executeBatch(); - sSeg.executeBatch(); + sDetail.executeBatch(); + sError.executeBatch(); + sLoca.executeBatch(); conn.commit(); log.info("Train schedule committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); break; }else if(obj==null) { + log.debug("Empty queue. Wait for more entries"); + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + log.debug("Woke up."); continue; } synchronized(FileLoader.SYNC_TOKEN) { FileLoader.SYNC_TOKEN.notifyAll(); } //Parse data - String trainUID = Integer.toHexString(++trainCount); + String trainUID = getTUID(); sMain.setString(1, trainUID); sMain.setString(2, obj.optString("CIF_train_uid")); try { @@ -99,21 +166,73 @@ public class ScheduleHandler implements Runnable{ } sMain.setString(5, obj.optString("schedule_days_runs")); sMain.setString(6, obj.optString("train_status")); + sMain.setString(7, obj.optString("atoc_code")); sMain.addBatch(); currentSize++; -// sSeg.setString(1, trainUID); -// sSeg.setString(2, obj.optJSONObject("schedule_segment").toString()); -// sSeg.addBatch(); -// currentSize++; + JSONObject detail = obj.optJSONObject("schedule_segment"); + setString(sDetail, 1, trainUID); + setString(sDetail, 2, detail.optString("CIF_train_category")); + setString(sDetail, 3, detail.optString("signalling_id")); + setString(sDetail, 4, detail.optString("CIF_headcode")); + setString(sDetail, 5, detail.optString("CIF_course_indicator")); + setString(sDetail, 6, detail.optString("CIF_train_service_code")); + setString(sDetail, 7, detail.optString("CIF_business_sector")); + setString(sDetail, 8, detail.optString("CIF_power_type")); + setString(sDetail, 9, detail.optString("CIF_timing_load")); + setInt (sDetail,10, detail.optString("CIF_speed")); + setString(sDetail,11, detail.optString("CIF_operating_characteristics")); + setString(sDetail,12, detail.optString("CIF_sleepers")); + setString(sDetail,13, detail.optString("CIF_reservations")); + setString(sDetail,14, detail.optString("CIF_connection_indicator")); + setString(sDetail,15, detail.optString("CIF_catering_code")); + setString(sDetail,16, detail.optString("CIF_service_branding")); + + sDetail.addBatch(); + currentSize++; + JSONArray locations = detail.optJSONArray("schedule_location"); + if(locations!=null) { + for(int i=0;i=Constants.BATCH_SIZE) { sMain.executeBatch(); - sSeg.executeBatch(); + sDetail.executeBatch(); + sError.executeBatch(); + sLoca.executeBatch(); conn.commit(); log.info("Train schedule committed {} entry. Commit #{}", currentSize, ++commitCount); currentSize = 0; } if(queue.isEmpty()) { - log.debug("Empty queue. Wait for more entries"); +// log.debug("Empty queue. Wait for more entries"); synchronized(SYNC_TOKEN) { try { SYNC_TOKEN.wait(); @@ -121,6 +240,7 @@ public class ScheduleHandler implements Runnable{ log.error(e.getMessage(), e); } } +// log.debug("Woke up."); } } }catch(SQLException e) { @@ -133,6 +253,20 @@ public class ScheduleHandler implements Runnable{ } + private void setInt(PreparedStatement stmt, int i, String string) throws SQLException{ + if(string==null || "".equals(string)) + stmt.setNull(i, Types.INTEGER); + else + stmt.setInt(i, Integer.parseInt(string)); + } + + private void setString(PreparedStatement stmt, int i, String string) throws SQLException{ + if(string==null) + stmt.setNull(i, Types.VARCHAR); + else + stmt.setString(i, string); + } + public int getQueueSize() { return queue.size(); } diff --git a/src/org/leolo/rail/nrd/TiplocHandler.java b/src/org/leolo/rail/nrd/TiplocHandler.java index 6fcb2c3..47be2de 100644 --- a/src/org/leolo/rail/nrd/TiplocHandler.java +++ b/src/org/leolo/rail/nrd/TiplocHandler.java @@ -27,7 +27,12 @@ public class TiplocHandler implements Runnable{ } public void add(JSONObject obj) { - queue.add(obj); + if(obj!=null) { +// log.debug("Size {} >>", queue.size()); + queue.add(obj); + }else { + log.warn("Trying to add a null object!"); + } synchronized(SYNC_TOKEN) { SYNC_TOKEN.notifyAll(); } @@ -43,11 +48,12 @@ public class TiplocHandler implements Runnable{ public void shutdownAndWait() { shutdown(); while(true) { + log.debug("Waiting for termination. cur {}/{}; com {}; queue {}[{}]", currentSize, Constants.BATCH_SIZE, commitCount, queue.size(), queue.peek()==null); + if(terminated) { break; } synchronized(SYNC_TOKEN) { - log.debug("Waiting for termination."); try { SYNC_TOKEN.notifyAll(); SYNC_TOKEN.wait(1000); @@ -57,6 +63,8 @@ public class TiplocHandler implements Runnable{ } } } + private int currentSize = 0; + private int commitCount = 0; @Override public void run() { @@ -64,8 +72,6 @@ public class TiplocHandler implements Runnable{ Connection conn = DatabaseManager.getInstance().getConnection(); PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)") ){ - int currentSize = 0; - int commitCount = 0; // SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); while(true) { JSONObject obj = queue.poll(); @@ -75,6 +81,15 @@ public class TiplocHandler implements Runnable{ log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); break; }else if(obj==null) { + log.debug("Empty queue. Wait for more entries"); + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + log.debug("Woke up."); continue; } synchronized(FileLoader.SYNC_TOKEN) { @@ -111,7 +126,7 @@ public class TiplocHandler implements Runnable{ currentSize = 0; } if(queue.isEmpty()) { - log.debug("Empty queue. Wait for more entries"); +// log.debug("Empty queue. Wait for more entries"); synchronized(SYNC_TOKEN) { try { SYNC_TOKEN.wait();