diff --git a/pom.xml b/pom.xml index 2f7935c..087d344 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,16 @@ json 20210307 + + org.apache.logging.log4j + log4j-core + 2.13.3 + + + org.apache.logging.log4j + log4j-api + 2.13.3 + \ No newline at end of file diff --git a/src/log4j2.xml b/src/log4j2.xml new file mode 100644 index 0000000..6e6f5f9 --- /dev/null +++ b/src/log4j2.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/org/leolo/rail/nrd/AssoicationHandler.java b/src/org/leolo/rail/nrd/AssoicationHandler.java new file mode 100644 index 0000000..0124501 --- /dev/null +++ b/src/org/leolo/rail/nrd/AssoicationHandler.java @@ -0,0 +1,139 @@ +package org.leolo.rail.nrd; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class AssoicationHandler implements Runnable{ + private static Logger log = LogManager.getLogger(AssoicationHandler.class); + + private final Object SYNC_TOKEN = new Object(); + private boolean shutdown = false; + private boolean terminated = false; + private Queue queue = new LinkedList<>(); + + + public AssoicationHandler() { + new Thread(this).start(); + } + + public void add(JSONObject obj) { + queue.add(obj); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdown() { + shutdown = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdownAndWait() { + shutdown(); + while(true) { + if(terminated) { + break; + } + synchronized(SYNC_TOKEN) { + log.debug("Waiting for termination."); + try { + SYNC_TOKEN.notifyAll(); + SYNC_TOKEN.wait(1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + + @Override + public void run() { + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmt = conn.prepareStatement("INSERT INTO train_assoc VALUES(null,?,?,?,?,?,?,?,?,?,?,?,?)") + ){ + int currentSize = 0; + int commitCount = 0; + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + while(true) { + JSONObject obj = queue.poll(); + if(shutdown && (queue.isEmpty()||obj==null)) { + pstmt.executeBatch(); + conn.commit(); + log.info("Train assoc committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); + break; + }else if(obj==null) { + continue; + } + synchronized(FileLoader.SYNC_TOKEN) { + FileLoader.SYNC_TOKEN.notifyAll(); + } +// log.info("Assoc {}-{}@{}",obj.optString("main_train_uid"),obj.optString("assoc_train_uid"), obj.optString("location")); + pstmt.setString(1, obj.optString("main_train_uid")); + pstmt.setString(2, obj.optString("assoc_train_uid")); + try { + pstmt.setDate(3, new java.sql.Date(sdf.parse(obj.optString("assoc_start_date")).getTime())); + pstmt.setDate(4, new java.sql.Date(sdf.parse(obj.optString("assoc_end_date")).getTime())); + } catch (ParseException e1) { + log.warn("Unable to parse date! {}", e1.getMessage(), e1); + continue; + } + pstmt.setString(5, obj.optString("assoc_days")); + pstmt.setString(6, obj.optString("category")); + pstmt.setString(7, obj.optString("date_indicator")); + pstmt.setString(8, obj.optString("location")); + if(obj.optString("base_location_suffix")!=null) + pstmt.setString(9, obj.optString("base_location_suffix")); + else + pstmt.setNull(9, Types.VARCHAR); + if(obj.optString("assoc_location_suffix")!=null) + pstmt.setString(10, obj.optString("assoc_location_suffix")); + else + pstmt.setNull(10, Types.VARCHAR); + pstmt.setString(11, obj.optString("diagram_type")); + pstmt.setString(12, obj.optString("CIF_stp_indicator")); + pstmt.addBatch(); + if(++currentSize>=Constants.BATCH_SIZE) { + pstmt.executeBatch(); + conn.commit(); + log.info("Train assoc committed {} entry. Commit #{}", currentSize, ++commitCount); + currentSize = 0; + } + if(queue.isEmpty()) { + log.debug("Empty queue. Wait for more entries"); + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + }catch(SQLException e) { + log.error(e.getMessage(),e); + } + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + + } + + public int getQueueSize() { + return queue.size(); + } + +} diff --git a/src/org/leolo/rail/nrd/ConfigurationManager.java b/src/org/leolo/rail/nrd/ConfigurationManager.java new file mode 100644 index 0000000..9708189 --- /dev/null +++ b/src/org/leolo/rail/nrd/ConfigurationManager.java @@ -0,0 +1,63 @@ +package org.leolo.rail.nrd; + +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; +import java.util.function.BiConsumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ConfigurationManager { + private static Logger log = LogManager.getLogger(ConfigurationManager.class); + + private static ConfigurationManager instance; + private Properties prop = new Properties(); + + public synchronized static ConfigurationManager getInstance() { + if(instance==null) { + instance = new ConfigurationManager(); + } + return instance; + } + + private ConfigurationManager() { + try(FileReader fr = new FileReader("configuration.properties")){ + log.debug("Loading properties file"); + prop.load(fr); + log.info("{} entries loaded", prop.size()); + }catch(IOException e) { + log.fatal(e.getMessage(), e); + System.exit(1); + } + + } + + public void forEach(BiConsumer action) { + prop.forEach(action); + } + + public Object get(Object key) { + return prop.get(key); + } + + public Object getOrDefault(Object key, Object defaultValue) { + return prop.getOrDefault(key, defaultValue); + } + + public String getProperty(String key, String defaultValue) { + return prop.getProperty(key, defaultValue); + } + + public String getProperty(String key) { + return prop.getProperty(key); + } + + public int size() { + return prop.size(); + } + + public boolean containsKey(Object key) { + return prop.containsKey(key); + } +} diff --git a/src/org/leolo/rail/nrd/Constants.java b/src/org/leolo/rail/nrd/Constants.java new file mode 100644 index 0000000..5861841 --- /dev/null +++ b/src/org/leolo/rail/nrd/Constants.java @@ -0,0 +1,6 @@ +package org.leolo.rail.nrd; + +public class Constants { + public static final int BATCH_SIZE = 200; + public static final int MAX_QUEUE_SIZE = 2000; +} diff --git a/src/org/leolo/rail/nrd/DatabaseManager.java b/src/org/leolo/rail/nrd/DatabaseManager.java new file mode 100644 index 0000000..9c2a47c --- /dev/null +++ b/src/org/leolo/rail/nrd/DatabaseManager.java @@ -0,0 +1,99 @@ +package org.leolo.rail.nrd; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.sql.XAConnection; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.mariadb.jdbc.MariaDbPoolDataSource; + +public class DatabaseManager { + + private static Logger log = LogManager.getLogger(DatabaseManager.class); + + private static DatabaseManager instance; + + private MariaDbPoolDataSource ds; + + public synchronized static DatabaseManager getInstance() { + if(instance==null) { + instance = new DatabaseManager(); + } + return instance; + } + + private DatabaseManager() { + ConfigurationManager cm = ConfigurationManager.getInstance(); + if( + !cm.containsKey("db.host")|| + !cm.containsKey("db.user")|| + !cm.containsKey("db.pwd")|| + !cm.containsKey("db.name") + ) { + log.fatal("Missing required property"); + System.exit(1); + } + String url = "jdbc:mariadb://"+cm.getProperty("db.host")+ + ":"+cm.getProperty("db.port", "3306")+ + "/"+cm.getProperty("db.name"); + log.info("Connecting to DB {} as {}", url, cm.get("db.user")); + try { + ds = new MariaDbPoolDataSource(url); + ds.setMaxPoolSize(Integer.parseInt(cm.getOrDefault("db.poolsize", "20").toString())); + ds.setUser(cm.getProperty("db.user").toString()); + ds.setPassword(cm.getProperty("db.pwd").toString()); + } catch (SQLException e) { + log.fatal("Cannot connect to DB",e); + System.exit(-2); + } + } + + public boolean testPool() { + try(Connection conn = ds.getConnection()){ + try(Statement stmt = conn.createStatement()){ + try (ResultSet rs = stmt.executeQuery("SELECT CONNECTION_ID()")){ + if(rs.next()) { + log.debug("Connection ID: {}", rs.getString(1)); + } + } + } + } catch (SQLException e) { + log.warn("Exception when testing the connection., e"); + return false; + } + return true; + } + + public Connection getConnection() throws SQLException { + Connection conn = ds.getConnection(); + conn.setAutoCommit(false); + return conn; + } + + public XAConnection getXAConnection() throws SQLException { + XAConnection conn = ds.getXAConnection(); + return conn; + } + + public void clear() { + try( + Connection conn = getConnection(); + Statement stmt = conn.createStatement(); + ){ + stmt.execute("TRUNCATE TABLE train_assoc"); + stmt.execute("TRUNCATE TABLE tiploc"); + stmt.execute("TRUNCATE TABLE train_schedule"); + stmt.execute("TRUNCATE TABLE train_segment"); + }catch(SQLException e) { + log.error(e.getMessage(), e); + } + } + + public void shutdown() { + ds.close(); + } +} diff --git a/src/org/leolo/rail/nrd/FileLoader.java b/src/org/leolo/rail/nrd/FileLoader.java new file mode 100644 index 0000000..0a943fe --- /dev/null +++ b/src/org/leolo/rail/nrd/FileLoader.java @@ -0,0 +1,118 @@ +package org.leolo.rail.nrd; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Hashtable; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class FileLoader { + + private static Logger log = LogManager.getLogger(FileLoader.class); + + public static final Object SYNC_TOKEN = new Object(); + + public static void main(String [] args) { + new FileLoader().run(); + } + + public void run() { + log.info("Process started"); + ConfigurationManager.getInstance().forEach((k,v)->{ + if(!((String)k).endsWith(".pwd")) { + log.debug("{} -> {}", k, v); + }else { + log.debug("{} -> ****", k); + } + }); + if(DatabaseManager.getInstance().testPool()) { + log.info("Successfully connected to the database"); + } + if(!ConfigurationManager.getInstance().containsKey("file.path")) { + log.fatal("Cannot find file path"); + System.exit(1); + } + InputStream fis = null; + //TODO: get the file from Network Rail + + try { + fis = new FileInputStream(ConfigurationManager.getInstance().getProperty("file.path")); + }catch(IOException e) { + log.fatal(e.getMessage(), e); + System.exit(1); + } + DatabaseManager.getInstance().clear(); + try( + GZIPInputStream gis = new GZIPInputStream(fis); + BufferedReader br = new BufferedReader(new InputStreamReader(gis)) + ){ + int count = 0; + AssoicationHandler asso = new AssoicationHandler(); + TiplocHandler tiploc = new TiplocHandler(); + ScheduleHandler schedule = new ScheduleHandler(); + while(true) { + String line = br.readLine(); + if(line==null) { + break; + } + count++; + if(asso.getQueueSize()+tiploc.getQueueSize()+schedule.getQueueSize() > Constants.MAX_QUEUE_SIZE) { + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + JSONObject obj = new JSONObject(line); + String objectType = obj.keys().next(); + if("JsonTimetableV1".equals(objectType)) { + + }else if("JsonAssociationV1".equals(objectType)){ + asso.add(obj.getJSONObject("JsonAssociationV1")); + }else if("TiplocV1".equals(objectType)){ + tiploc.add(obj.getJSONObject("TiplocV1")); + }else if("JsonScheduleV1".equals(objectType)){ + schedule.add(obj.getJSONObject("JsonScheduleV1")); + }else if("EOF".equals(objectType)){ + //Nothing to do + }else { + log.fatal("Unhandled type {}", objectType); + System.exit(2); + } + + } + log.info("Total count : {}", count); + asso.shutdownAndWait(); + }catch(IOException e) { + log.error(e.getMessage(), e); + System.exit(1); + } + try { + fis.close(); + }catch(IOException e) { + log.fatal(e.getMessage(), e); + System.exit(1); + } + DatabaseManager.getInstance().shutdown(); + log.info("Job finished!"); + } + +} diff --git a/src/org/leolo/rail/nrd/ScheduleHandler.java b/src/org/leolo/rail/nrd/ScheduleHandler.java new file mode 100644 index 0000000..45ab0c8 --- /dev/null +++ b/src/org/leolo/rail/nrd/ScheduleHandler.java @@ -0,0 +1,141 @@ +package org.leolo.rail.nrd; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Hashtable; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class ScheduleHandler implements Runnable{ + private static Logger log = LogManager.getLogger(ScheduleHandler.class); + + private final Object SYNC_TOKEN = new Object(); + private boolean shutdown = false; + private boolean terminated = false; + private Queue queue = new LinkedList<>(); + + + public ScheduleHandler() { + new Thread(this).start(); + } + + public void add(JSONObject obj) { + queue.add(obj); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdown() { + shutdown = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdownAndWait() { + shutdown(); + while(true) { + if(terminated) { + break; + } + synchronized(SYNC_TOKEN) { + log.debug("Waiting for termination."); + try { + SYNC_TOKEN.notifyAll(); + SYNC_TOKEN.wait(1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + + @Override + public void run() { + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement sMain = conn.prepareStatement("INSERT INTO train_schedule VALUES (?,?,?,?,?,?)"); + PreparedStatement sSeg = conn.prepareStatement("INSERT INTO train_segment VALUES (?,?)") + ){ + int currentSize = 0; + int commitCount = 0; + int trainCount = 0; + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + while(true) { + JSONObject obj = queue.poll(); + if(shutdown && (queue.isEmpty()||obj==null)) { + sMain.executeBatch(); + sSeg.executeBatch(); + conn.commit(); + log.info("Train schedule committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); + break; + }else if(obj==null) { + continue; + } + synchronized(FileLoader.SYNC_TOKEN) { + FileLoader.SYNC_TOKEN.notifyAll(); + } + //Parse data + String trainUID = Integer.toHexString(++trainCount); + sMain.setString(1, trainUID); + sMain.setString(2, obj.optString("CIF_train_uid")); + try { + sMain.setDate(3, new java.sql.Date(sdf.parse(obj.optString("schedule_start_date")).getTime())); + sMain.setDate(4, new java.sql.Date(sdf.parse(obj.optString("schedule_end_date")).getTime())); + } catch (ParseException e1) { + log.error(e1.getMessage(), e1); + continue; + } + sMain.setString(5, obj.optString("schedule_days_runs")); + sMain.setString(6, obj.optString("train_status")); + sMain.addBatch(); + currentSize++; +// sSeg.setString(1, trainUID); +// sSeg.setString(2, obj.optJSONObject("schedule_segment").toString()); +// sSeg.addBatch(); +// currentSize++; + if(currentSize>=Constants.BATCH_SIZE) { + sMain.executeBatch(); + sSeg.executeBatch(); + conn.commit(); + log.info("Train schedule committed {} entry. Commit #{}", currentSize, ++commitCount); + currentSize = 0; + } + if(queue.isEmpty()) { + log.debug("Empty queue. Wait for more entries"); + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + }catch(SQLException e) { + log.error(e.getMessage(),e); + } + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + + } + + public int getQueueSize() { + return queue.size(); + } + + +} diff --git a/src/org/leolo/rail/nrd/TimetableHandler.java b/src/org/leolo/rail/nrd/TimetableHandler.java new file mode 100644 index 0000000..19d301b --- /dev/null +++ b/src/org/leolo/rail/nrd/TimetableHandler.java @@ -0,0 +1,77 @@ +package org.leolo.rail.nrd; + +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class TimetableHandler implements Runnable{ + private static Logger log = LogManager.getLogger(TimetableHandler.class); + + private final Object SYNC_TOKEN = new Object(); + private boolean shutdown = false; + private boolean terminated = false; + private Queue queue = new LinkedList<>(); + + + public TimetableHandler() { + + } + + public void add(JSONObject obj) { + queue.add(obj); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdown() { + shutdown = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdownAndWait() { + shutdown(); + while(true) { + if(terminated) { + break; + } + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + + @Override + public void run() { + while(true) { + if(shutdown && queue.isEmpty()) { + break; + } + //TODO: actual process + if(!queue.isEmpty()) { + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + + } + +} diff --git a/src/org/leolo/rail/nrd/TimetableHandler2.java b/src/org/leolo/rail/nrd/TimetableHandler2.java new file mode 100644 index 0000000..9facdb3 --- /dev/null +++ b/src/org/leolo/rail/nrd/TimetableHandler2.java @@ -0,0 +1,108 @@ +package org.leolo.rail.nrd; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class TimetableHandler2 implements Runnable{ + private static Logger log = LogManager.getLogger(TimetableHandler2.class); + + private final Object SYNC_TOKEN = new Object(); + private boolean shutdown = false; + private boolean terminated = false; + private Queue queue = new LinkedList<>(); + + + public TimetableHandler2() { + new Thread(this).start(); + } + + public void add(JSONObject obj) { + queue.add(obj); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdown() { + shutdown = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdownAndWait() { + shutdown(); + while(true) { + if(terminated) { + break; + } + synchronized(SYNC_TOKEN) { + log.debug("Waiting for termination."); + try { + SYNC_TOKEN.notifyAll(); + SYNC_TOKEN.wait(1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + + @Override + public void run() { + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmt = conn.prepareStatement("") + ){ + int currentSize = 0; + int commitCount = 0; + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + while(true) { + JSONObject obj = queue.poll(); + if(shutdown && (queue.isEmpty()||obj==null)) { + pstmt.executeBatch(); + conn.commit(); + log.info("Train assoc committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); + break; + }else if(obj==null) { + continue; + } + //Parse data + pstmt.addBatch(); + if(++currentSize>=Constants.BATCH_SIZE) { + pstmt.executeBatch(); + conn.commit(); + log.info("Train assoc committed {} entry. Commit #{}", currentSize, ++commitCount); + currentSize = 0; + } + if(queue.isEmpty()) { + log.debug("Empty queue. Wait for more entries"); + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + }catch(SQLException e) { + log.error(e.getMessage(),e); + } + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + + } + +} diff --git a/src/org/leolo/rail/nrd/TiplocHandler.java b/src/org/leolo/rail/nrd/TiplocHandler.java new file mode 100644 index 0000000..6fcb2c3 --- /dev/null +++ b/src/org/leolo/rail/nrd/TiplocHandler.java @@ -0,0 +1,138 @@ +package org.leolo.rail.nrd; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + +public class TiplocHandler implements Runnable{ + private static Logger log = LogManager.getLogger(TiplocHandler.class); + + private final Object SYNC_TOKEN = new Object(); + private boolean shutdown = false; + private boolean terminated = false; + private Queue queue = new LinkedList<>(); + + + public TiplocHandler() { + new Thread(this).start(); + } + + public void add(JSONObject obj) { + queue.add(obj); + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdown() { + shutdown = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + } + + public void shutdownAndWait() { + shutdown(); + while(true) { + if(terminated) { + break; + } + synchronized(SYNC_TOKEN) { + log.debug("Waiting for termination."); + try { + SYNC_TOKEN.notifyAll(); + SYNC_TOKEN.wait(1000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + + @Override + public void run() { + try( + Connection conn = DatabaseManager.getInstance().getConnection(); + PreparedStatement pstmt = conn.prepareStatement("INSERT INTO tiploc VALUES (?,?,?,?,?,?)") + ){ + int currentSize = 0; + int commitCount = 0; +// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + while(true) { + JSONObject obj = queue.poll(); + if(shutdown && (queue.isEmpty()||obj==null)) { + pstmt.executeBatch(); + conn.commit(); + log.info("TIPLOC committed {} entry. Commit #{}[last commit]", currentSize, ++commitCount); + break; + }else if(obj==null) { + continue; + } + synchronized(FileLoader.SYNC_TOKEN) { + FileLoader.SYNC_TOKEN.notifyAll(); + } + //Parse data + pstmt.setString(1, obj.optString("tiploc_code")); + if(obj.has("nalco")) + pstmt.setString(2, obj.optString("nalco")); + else + pstmt.setNull(2, Types.VARCHAR); + if(obj.has("stanox")) + pstmt.setString(3, obj.optString("stanox")); + else + pstmt.setNull(3, Types.VARCHAR); + if(obj.has("crs_code")) + pstmt.setString(4, obj.optString("crs_code")); + else + pstmt.setNull(4, Types.VARCHAR); + if(obj.has("description")) + pstmt.setString(5, obj.optString("description")); + else + pstmt.setNull(5, Types.VARCHAR); + if(obj.has("tps_description")) + pstmt.setString(6, obj.optString("tps_description")); + else + pstmt.setNull(6, Types.VARCHAR); + + pstmt.addBatch(); + if(++currentSize>=Constants.BATCH_SIZE) { + pstmt.executeBatch(); + conn.commit(); + log.info("TIPLOC committed {} entry. Commit #{}", currentSize, ++commitCount); + currentSize = 0; + } + if(queue.isEmpty()) { + log.debug("Empty queue. Wait for more entries"); + synchronized(SYNC_TOKEN) { + try { + SYNC_TOKEN.wait(); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + } + }catch(SQLException e) { + log.error(e.getMessage(),e); + } + terminated = true; + synchronized(SYNC_TOKEN) { + SYNC_TOKEN.notifyAll(); + } + + } + + public int getQueueSize() { + return queue.size(); + } + +}